Workerpool
An unopinionated small scale worker pool abstraction which serves as a base interface for more advanced worker managers.
Note: This is NOT a drop-in replacement of the NPM workerpool
, but it’s easy to create the same functionality (see Web Workers section below).
Note: This is not yet a good candidate for large scale distributed queues, cluster awareness such as mutex and event ack have to be in place before that. If you think solutions in such a scale is viable in Deno, consider buy me a coffee and I’ll make it happen.
Terminology
Workerpool
A manager that creates workers on the fly, executing tasks up to defined concurrency.
Runner
Runners are internal wrappers for user provided runner classes, they maintain internal states such as active/busy and the retry counter.
Executable
User implementation of task executors, where they all implements the
Executable
interface.Also called workers for the sake of naming convension, but the usage kept minimal to avoid confusion with Web Workers.
Task
Tasks are named payloads enqueued into a workerpool.
Basic Usage
import { Executable, Workerpool } from "https://deno.land/x/workerpool/mod.ts";
class FooWorker implements Executable {...}
class BarWorker implements Executable {...}
const pool = new Workerpool({
concurrency: 2,
workers: [FooWorker, BarWorker]
});
pool
.enqueue({ name: "FooWorker", payload: {...} })
.enqueue({ name: "BarWorker", payload: {...} })
.start();
Runner Examples
In-memory Queue
As a proof of concept, this is the most basic implementation of an in-memory queue.
type Payload = any;
type MemoryMutexTask = Task<Payload> & { active?: boolean };
const tasks: MemoryMutexTask[] = [];
const pool = new Workerpool<Payload>({
concurrency: 1,
runners: [runnerA, runnerB],
enqueue: (task: MemoryMutexTask) => {
if (tasks.includes(task)) {
task.active = false;
} else {
tasks.push(task);
}
},
dequeue: () => {
// Uncomment the following line for FIFO queues
// if (tasks.find(({ active }) => active)) return;
const task = tasks.find(({ active }) => !active);
if (task) {
task.active = true;
return task;
}
},
success: (task) => {
const index = tasks.indexOf(task);
if (index > -1) {
tasks.splice(index, 1);
}
},
});
Web Workers
Deno has built-in support for workers, our ExecutableWorker
class serves as a simple proxy class via comlink
.
You’ll need a separated script file for the worker.
// worker.ts
import { expose } from "https://cdn.skypack.dev/comlink?dts";
expose({
execute: async (payload: string) => {
// Simulate async actions
await new Promise((resolve) => setTimeout(resolve, 1000));
return `Worker echo: ${payload}`;
},
});
Now register the runners into the workerpool:
import { ExecutableWorker } from "https://deno.land/x/workerpool/mod.ts";
class MyWorker extends ExecutableWorker<string, void> {
constructor() {
super(new URL("./worker.ts", import.meta.url).href);
}
}
const pool = new Workerpool({
concurrency: 1,
workers: [MyWorker],
});
pool
.enqueue({ name: "MyWorker", payload: "Hello World!" })
.enqueue({ name: "MyWorker", payload: "Hello Again!" })
.start();