Skip to main content
The Deno 2 Release Candidate is here
Learn more
Module

x/poolifier/src/mod.ts>AbstractPool

Fast and small web worker pool
Latest
class AbstractPool
implements IPool<Worker, Data, Response>
Abstract
import { AbstractPool } from "https://deno.land/x/poolifier@v0.3.14/src/mod.ts";

Base class that implements some shared logic for all poolifier pools.

Constructors

new
AbstractPool(
minimumNumberOfWorkers: number,
fileURL: URL,
maximumNumberOfWorkers?: number,
)

Constructs a new poolifier pool.

Type Parameters

Worker extends IWorker
optional
Data = unknown
optional
Response = unknown

Properties

private
destroying: boolean

Whether the pool is destroying or not.

private
readonly
handleWorkerNodeBackPressureEvent: (event: CustomEvent<WorkerNodeEventDetail>) => void
private
readonly
handleWorkerNodeIdleEvent: (event: CustomEvent<WorkerNodeEventDetail>, previousStolenTask?: Task<Data>) => void
private
readonly
ready: boolean

The pool readiness boolean status.

private
readyEventEmitted: boolean

Whether the pool ready event has been emitted or not.

private
started: boolean

Whether the pool is started or not.

private
starting: boolean

Whether the pool is starting or not.

private
readonly
startTimestamp

The start timestamp of the pool.

private
readonly
taskFunctions: Map<string, TaskFunction<Data, Response>>

The task functions added at runtime map:

  • key: The task function name.
  • value: The task function itself.
private
readonly
utilization: number

The approximate pool utilization.

private
readonly
workerNodeStealTask: (workerNodeKey: number) => Task<Data> | undefined
protected
abstract
readonly
busy: boolean

Whether the pool is busy or not.

The pool busyness boolean status.

protected
readonly
empty: boolean

The pool emptiness boolean status.

protected
readonly
full: boolean

Whether the pool is full or not.

The pool filling boolean status.

protected
promiseResponseMap: Map<string, PromiseResponseWrapper<Response>>

The task execution response promise map:

  • key: The message id of each submitted task.
  • value: An object that contains the worker, the execution response promise resolve and reject callbacks.

When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.

protected
abstract
readonly
type: PoolType

The pool type.

If it is 'dynamic', it provides the max property.

protected
abstract
readonly
worker: WorkerType

The worker type.

protected
optional
workerChoiceStrategyContext: WorkerChoiceStrategyContext<Worker, Data, Response>

Worker choice strategy context referencing a worker choice algorithm implementation.

protected
readonly
workerMessageListener: (message: MessageValue<Response>) => unknown

This method is the message listener registered on each worker.

optional
eventTarget: EventTarget
readonly
info: PoolInfo
readonly
workerNodes: Array<IWorkerNode<Worker, Data>>

Methods

private
addWorkerNode(workerNode: IWorkerNode<Worker, Data>): number

Adds the given worker node in the pool worker nodes.

private
buildTasksQueueOptions(tasksQueueOptions: TasksQueueOptions | undefined): TasksQueueOptions
private
cannotStealTask(): boolean
private
checkAndEmitEmptyEvent(): void
private
checkAndEmitReadyEvent(): void

Checks if the worker id sent in the received message from a worker is valid.

private
checkMinimumNumberOfWorkers(minimumNumberOfWorkers: number | undefined): void
private
checkPoolOptions(opts: PoolOptions): void
private
checkPoolType(): void
private
checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined): void
private
chooseWorkerNode(): number

Chooses a worker node for the next task.

The default worker choice strategy uses a round robin algorithm to distribute the tasks.

Creates a worker node.

private
deleteTaskFunctionWorkerUsages(name: string): void
private
dequeueTask(workerNodeKey: number): Task<Data> | undefined
private
enqueueTask(workerNodeKey: number, task: Task<Data>): number
private
executeTask(workerNodeKey: number, task: Task<Data>): void

Executes the given task on the worker given its worker node key.

private
flushTasksQueues(): void
private
getWorkerNodeKeyByWorkerId(workerId: string | undefined): number

Gets the worker node key given its worker id.

private
handleTask(workerNodeKey: number, task: Task<Data>): void
private
hasBackPressure(): boolean
private
initializeEventTarget(): void
private
isWorkerNodeBusy(workerNodeKey: number): boolean
private
redistributeQueuedTasks(workerNodeKey: number): void
private
resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(workerNodeKey: number, taskName: string): void
private
resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey: number): void
private
sendKillMessageToWorker(workerNodeKey: number): Promise<void>
private
sendStatisticsMessageToWorker(workerNodeKey: number): void

Sends the statistics message to worker given its worker node key.

private
sendTaskFunctionOperationToWorker(workerNodeKey: number, message: MessageValue<Data>): Promise<boolean>
private
sendTaskFunctionOperationToWorkers(message: MessageValue<Data>): Promise<boolean>
private
setTasksQueueSize(size: number): void
private
setTaskStealing(): void
private
shallExecuteTask(workerNodeKey: number): boolean
private
shallUpdateTaskFunctionWorkerUsage(workerNodeKey: number): boolean

Whether the worker node shall update its task function worker usage or not.

Starts the minimum number of workers.

private
tasksQueueSize(workerNodeKey: number): number
private
unsetTaskStealing(): void
private
updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(workerNodeKey: number, taskName: string): void
private
updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey: number): void
private
updateTaskStolenStatisticsWorkerUsage(workerNodeKey: number, taskName: string): void
protected
afterTaskExecutionHook(workerNodeKey: number, message: MessageValue<Response>): void

Hook executed after the worker task execution. Can be overridden.

protected
afterWorkerNodeSetup(workerNodeKey: number): void

Method hooked up after a worker node has been newly created. Can be overridden.

protected
beforeTaskExecutionHook(workerNodeKey: number, task: Task<Data>): void

Hook executed before the worker task execution. Can be overridden.

Emits dynamic worker creation events.

Creates a new, completely set up dynamic worker node.

protected
createAndSetupWorkerNode(): number

Creates a new, completely set up worker node.

protected
abstract
deregisterWorkerMessageListener<Message extends Data | Response>(workerNodeKey: number, listener: (message: MessageValue<Message>) => void): void

Deregisters a listener callback on the worker given its worker node key.

protected
destroyWorkerNode(workerNodeKey: number): Promise<void>

Terminates the worker node given its worker node key.

protected
flagWorkerNodeAsNotReady(workerNodeKey: number): void
protected
flushTasksQueue(workerNodeKey: number): number
protected
getWorkerInfo(workerNodeKey: number): WorkerInfo | undefined

Gets the worker information given its worker node key.

protected
internalBusy(): boolean

Whether worker nodes are executing concurrently their tasks quota or not.

protected
abstract
isMain(): boolean

Should return whether the worker is the main worker or not.

protected
abstract
registerOnceWorkerMessageListener<Message extends Data | Response>(workerNodeKey: number, listener: (message: MessageValue<Message>) => void): void

Registers once a listener callback on the worker given its worker node key.

protected
abstract
registerWorkerMessageListener<Message extends Data | Response>(workerNodeKey: number, listener: (message: MessageValue<Message>) => void): void

Registers a listener callback on the worker given its worker node key.

protected
removeWorkerNode(workerNode: IWorkerNode<Worker, Data>): void

Removes the worker node from the pool worker nodes.

protected
abstract
sendStartupMessageToWorker(workerNodeKey: number): void

Sends the startup message to worker given its worker node key.

protected
abstract
sendToWorker(
workerNodeKey: number,
message: MessageValue<Data>,
transferList?: Transferable[],
): void

Sends a message to worker given its worker node key.

protected
setupHook(): void

Setup hook to execute code before worker nodes are created in the abstract constructor. Can be overridden.

protected
abstract
shallCreateDynamicWorker(): boolean

Conditions for dynamic worker creation.

addTaskFunction(name: string, fn: TaskFunction<Data, Response>): Promise<boolean>
destroy(): Promise<void>
enableTasksQueue(enable: boolean, tasksQueueOptions?: TasksQueueOptions): void
execute(
data?: Data,
name?: string,
transferList?: Transferable[],
): Promise<Response>
hasTaskFunction(name: string): boolean
removeTaskFunction(name: string): Promise<boolean>
setDefaultTaskFunction(name: string): Promise<boolean>
setTasksQueueOptions(tasksQueueOptions: TasksQueueOptions | undefined): void
setWorkerChoiceStrategy(workerChoiceStrategy: WorkerChoiceStrategy, workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions): void
setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined): void
start(): void