Skip to main content
Module

x/async_channels/mod.ts>Channel

Inspired by Go & Clojure Channels, async_channels provides channels as an asynchronous communication method between asynchronous functions.
Go to Latest
class Channel
implements AsyncIterable<T>
Re-export
import { Channel } from "https://deno.land/x/async_channels@v1.0.0-alpha54/mod.ts";

Constructors

new
Channel(bufferSize?, options?: ChannelOptions)

Constructs a new Channel with an optional buffer.

Type Parameters

T

The type of value that can be sent to or received by this channel.

Properties

protected
current: State
protected
optional
currentVal: T
protected
readonly
queue: Queue<T>
protected
stateEventTarget: EventTarget
protected
transitionEventTarget: EventTarget

Methods

protected
updateState(t: Transition, val?: T): void
protected
waitForState(...states: State[]): Promise<T | undefined>

Closes the channel.

Closing a closed channel have no effect (positive or negative).

Sending a message to a closed channel will throw an AbortedError.

Receiving a message from a closed channel will resolve the promise immediately. See Channel.receive for more information.

debug(...args: unknown[])

duplicate creates multiple channels (determined by n), and consumes this channel. The consumed values are then sent to all channels

error(...args: unknown[])
filter(fn: (val: T) => boolean | Promise<boolean>, pipeOpts?: ChannelPipeOptions): Receiver<T>

filter applies fn to each value in this channel, and returns a new channel that will only contain value for which fn returned true (or a promise that resolves to true).

The returned channel will close after this channel closes (or if the provided signal is triggered).

flat<K>(this: Channel<Iterable<K> | AsyncIterable<K>>, pipeOpts?: ChannelPipeOptions): Receiver<K>

flat returns a receiver channel that contains the flattened (1 level) values of each value of this channel.

The receiver channel will close, when the original channel closes (or if the provided signal is triggered).

flatMap<TOut>(fn: (val: T) => Iterable<TOut> | AsyncIterable<TOut>, pipeOpts?: ChannelPipeOptions): Receiver<TOut>

flatMap returns a receiver channel that contains the flattened (1 level) results of applying fn to each value of this channel.

The receiver channel will close, when the original channel closes (or if the provided signal is triggered).

forEach(fn: (val: T) => unknown | Promise<unknown>, pipeOpts?: ChannelPipeOptions): Receiver<void>

forEach applies fn to each value in this channel, and returns a channel that will contain the results. The returned channel will close after this channel closes (or if the provided signal is triggered).

groupBy<TKey extends (string | symbol)>(fn: (val: T) => TKey | Promise<TKey>, pipeOpts?: ChannelPipeOptions): Record<TKey, Receiver<T>>
map<TOut>(fn: (val: T) => TOut | Promise<TOut>, pipeOpts?: ChannelPipeOptions): Receiver<TOut>

map returns a receiver channel that contains the results of applying fn to each value of this channel.

The receiver channel will close, when the original channel closes (or if the provided signal is triggered).

receive(abortCtrl?: AbortController): Promise<[T, true] | [undefined, false]>

Receive returns a promise that will be resolved with [T, true] when a value is available, or rejected if a provided AbortController is aborted

If the channel is closed, then the promise will be resolved immediately with [undefined, false].

Receiving from a closed channel:

  import {Channel} from "./channel.ts";
  const ch = new Channel();
  ch.close();
  const [val, ok] = await ch.receive()
  console.assert(val === undefined)
  console.assert(ok === false)

Receiving from a buffered channel:

  import {Channel} from "./channel.ts";
  const ch = new Channel(1);
  await ch.send("Hello world!")
  ch.close();
  const [val, ok] = await ch.receive()
  console.assert(val === "Hello world!")
  console.assert(ok === true)

Aborting a receive request:

  import {Channel, AbortedError} from "./channel.ts";
  const ch = new Channel(1);
  await ch.send("Hello world!")
  ch.close();
  const abortCtrl = new AbortController()
  abortCtrl.abort()
  try {
    await ch.receive(abortCtrl);
    console.assert(false, "unreachable");
  } catch (e) {
    console.assert(e instanceof AbortedError);
  }
reduce(fn: (prev: T, current: T) => T | Promise<T>, pipeOpts?: ChannelPipeOptions): Receiver<T>
send(val: T, abortCtrl?: AbortController): Promise<void>

Sends a value on the channel, and returns a promise that will be resolved when a the value is received (see Channel.receive), or rejected if a provided AbortController is aborted.

If the channel is closed, then the promise will be rejected with an InvalidTransitionError.

import {Channel, InvalidTransitionError} from "./channel.ts"

const ch = new Channel()
ch.close();
try {
  await ch.send("should fail")
  console.assert(false, "unreachable")
} catch (e) {
  console.assert(e instanceof InvalidTransitionError)
}
subscribe(
fn: (_: T) => string | number | symbol,
topics: (string | number | symbol)[],
): Record<string | number | symbol, Receiver<T>>
with<TOut, TThis extends Channel<T>>(this: TThis, fn: (t: TThis) => TOut): TOut

Applies fn on this and returns the result.

[Symbol.asyncIterator](): AsyncGenerator<T, void, void>

Creates an AsyncGenerator that yields all values sent to this channel, and returns when the channel closes.

Static Methods

from<T>(input: Iterable<T> | AsyncIterable<T>, pipeOpts?: ChannelPipeOptions): Receiver<T>