Skip to main content
Module

x/kvmq/mod.ts>Worker

Library inspired by BullMQ for Deno
Latest
class Worker
extends EventTarget
import { Worker } from "https://deno.land/x/kvmq@v0.3.0/mod.ts";

Represents a worker that processes jobs from a queue.

Remember to call processJobs to start processing jobs.

Constructors

new
Worker(
db: Deno.Kv,
handler: JobHandler<State>,
options?: WorkerOptions,
)

Constructs a new worker for the given queue.

DB and key must match the ones used to construct the queue. You can also use Queue.createWorker as a shorthand to construct a worker for a queue.

This constructor is useful if your worker is in separate process from the queue.

Type Parameters

State

Type of custom state data that is passed to the worker when processing a job.

Properties

readonly
activeJobs: ReadonlySet<Promise<void>>

Set of promises for finishing jobs that are currently being processed.

When jobs are finished, they remove themselves from this set.

To check the number of currently running jobs, use worker.activeJobs.size(). To wait for all currently running jobs to finish, use await Promise.all(worker.activeJobs).

readonly
db: Deno.Kv

Kv database to use for accessing the queue.

handler: JobHandler<State>

The function that processes the jobs.

readonly
isProcessing: boolean

Whether the worker is currently processing jobs.

readonly
key: Deno.KvKeyPart

Key prefix to use for accessing queue's data.

options: Required<WorkerOptions>

Worker options.

readonly
processingFinished: Promise<void>

Promise for finishing currently running processors.

This promise gets replaced with a new one every time processJobs is called. If you call and forget processJobs, you can use this to get the promise again and await it.

This doesn't include the jobs that already started processing. To wait for them too use activeJobs.

Methods

addEventListener<K extends keyof WorkerEventMap<State>>(
type: K,
listener: (this: Worker<State>, ev: WorkerEventMap<State>[K]) => void,
options?: boolean | AddEventListenerOptions,
): void

See WorkerEventMap for available events.

addEventListener(
type: string,
options?: boolean | AddEventListenerOptions,
): void
processJobs(options?: { signal?: AbortSignal; }): Promise<void>

Starts processing jobs.

If you already called this method and it's still running, the current call will first wait for previous one to finish.

Pass an abort signal to stop processing jobs at a later time. Aborting won't wait for the already started jobs to finish processing. To also wait for all currently running jobs, use await Promise.all(worker.activeJobs).

Returns a promise that resolves when the job popping loop exits. The only ways to exit this loop is to use the signal argument or stopProcessing. It can reject when getting or updating jobs in the database fails. Whenever an error occurs in the processing handler, the worker will emit an error event.

Aborts all currently running processors.

This is an alternative to passing an abort signal to processJobs.