Skip to main content
Module

x/rimbu/mod.ts>AsyncStream

Rimbu is a TypeScript library focused on immutable, performant, and type-safe collections and other tools.
Go to Latest
namespace AsyncStream
Re-export
import { AsyncStream } from "https://deno.land/x/rimbu@0.13.1/mod.ts";

Interfaces

A non-empty and possibly infinite asynchronous sequence of elements of type T. See the Stream documentation and the AsyncStream API documentation

interface AsyncStream
Re-export
import { type AsyncStream } from "https://deno.land/x/rimbu@0.13.1/mod.ts";

A possibly infinite asynchronous sequence of elements of type T. See the Stream documentation and the AsyncStream API documentation

Examples

Example 1

const s1 = AsyncStream.empty<number>()
const s2 = AsyncStream.of(1, 3, 2)
const s3 = AsyncStream.from(Stream.range({ start: 10, amount: 15 }))

Methods

asyncStream(): this

Returns an async stream of elements of type T.

equals(other: AsyncStreamSource<T>, eq?: Eq<T>): Promise<boolean>

Returns true if the sequence of elements in this stream are equal to the sequence in the other stream according to the provided eq function.

assumeNonEmpty(): AsyncStream.NonEmpty<T>

Returns the stream as a non-empty instance.

Returns the current stream preceded by the given value

Returns the current stream succeeded by the given value

forEach(f: (
value: T,
index: number,
halt: () => void,
) => MaybePromise<void>
, state?: TraverseState
): Promise<void>

Performs given function f for each element of the Stream, using given state as initial traversal state.

forEachPure<A extends readonly unknown[]>(f: (value: T, ...args: A) => MaybePromise<void>, ...args: A): Promise<void>

Performs given function f for each element of the Stream, with the optionally given args as extra arguments.

indexed(startIndex?: number): AsyncStream<[number, T]>

Returns an AsyncStream where each element in this stream is paired with its index

map<T2>(mapFun: (value: T, index: number) => MaybePromise<T2>): AsyncStream<T2>

Returns an AsyncStream where mapFun is applied to each element.

mapPure<T2, A extends readonly unknown[]>(mapFun: (value: T, ...args: A) => MaybePromise<T2>, ...args: A): AsyncStream<T2>

Returns an AsyncStream where the given mapFun is applied to each value in the stream, with optionally as extra arguments the given args.

flatMap<T2>(flatMapFun: (
value: T,
index: number,
halt: () => void,
) => AsyncStreamSource<T2>
): AsyncStream<T2>

Returns an AsyncStream consisting of the concatenation of flatMapFun applied to each element.

flatZip<T2>(flatMapFun: (
value: T,
index: number,
halt: () => void,
) => AsyncStreamSource<T2>
): AsyncStream<[T, T2]>

Returns an AsyncStream consisting of the concatenation of flatMapFun applied to each element, zipped with the element that was provided to the function.

transform<R>(transformer: AsyncTransformer<T, R>): AsyncStream<R>

Returns an AsyncStream consisting of the concatenation of AsyncStreamSource elements resulting from applying the given reducer to each element.

filter(pred: (
value: T,
index: number,
halt: () => void,
) => MaybePromise<boolean>
): AsyncStream<T>

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns true.

filterNot(pred: (
value: T,
index: number,
halt: () => void,
) => MaybePromise<boolean>
): AsyncStream<T>

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns false.

filterPure<A extends readonly unknown[]>(pred: (value: T, ...args: A) => MaybePromise<boolean>, ...args: A): AsyncStream<T>

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns true.

filterNotPure<A extends readonly unknown[]>(pred: (value: T, ...args: A) => MaybePromise<boolean>, ...args: A): AsyncStream<T>

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns false.

collect<R>(collectFun: AsyncCollectFun<T, R>): AsyncStream<R>

Returns an AsyncStream containing the resulting elements from applying the given collectFun to each element in this Stream.

first(): Promise<T | undefined>

Returns the first element of the AsyncStream, or a fallback value (default undefined) if the stream is empty.

first<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>
single(): Promise<T | undefined>

Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value.

single<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>
last(): Promise<T | undefined>

Returns the last element of the AsyncStream, or a fallback value (default undefined) if the stream is empty.

last<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>
count(): Promise<number>

Returns the amount of elements in the AsyncStream.

countElement(value: T, eq?: Eq<T>): Promise<number>

Returns the amount of elements that are equal according to the given eq to the given value in the AsyncStream.

countNotElement(value: T, eq?: Eq<T>): Promise<number>

Returns the amount of elements that are not equal according to the given eq to the given value in the AsyncStream.

find(pred: (value: T, index: number) => MaybePromise<boolean>, occurrance?: number): Promise<T | undefined>

Returns the first element for which the given pred function returns true, or a fallback value otherwise.

find<O>(
pred: (value: T, index: number) => MaybePromise<boolean>,
occurrance: number | undefined,
otherwise: AsyncOptLazy<O>,
): Promise<T | O>
elementAt(index: number): Promise<T | undefined>

Returns the element in the AsyncStream at the given index, or a fallback value (default undefined) otherwise.

elementAt<O>(index: number, otherwise: AsyncOptLazy<O>): Promise<T | O>
indicesWhere(pred: (value: T) => MaybePromise<boolean>): AsyncStream<number>

Returns an AsyncStream containing the indices of the elements for which the given pred function returns true.

indicesOf(searchValue: T, eq?: Eq<T>): AsyncStream<number>

Returns an AsyncStream containing the indicies of the occurrance of the given searchValue, according to given eq function.

indexWhere(pred: (value: T, index: number) => MaybePromise<boolean>, occurrance?: number): Promise<number | undefined>

Returns the index of the given occurrance instance of the element in the AsyncStream that satisfies given pred function, or undefined if no such instance is found.

indexOf(
searchValue: T,
occurrance?: number,
eq?: Eq<T>,
): Promise<number | undefined>

Returns the index of the occurrance instance of given searchValue in the AsyncStream, using given eq function, or undefined if no such value is found.

some(pred: (value: T, index: number) => MaybePromise<boolean>): Promise<boolean>

Returns true if any element of the AsyncStream satifies given pred function.

every(pred: (value: T, index: number) => MaybePromise<boolean>): Promise<boolean>

Returns true if every element of the AsyncStream satifies given pred function.

contains(
value: T,
amount?: number,
eq?: Eq<T>,
): Promise<boolean>

Returns true if the AsyncStream contains given amount instances of given value, using given eq function.

containsSlice(source: AsyncStreamSource.NonEmpty<T>, eq?: Eq<T>): Promise<boolean>

Returns true if this stream contains the same sequence of elements as the given source, false otherwise.

takeWhile(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T>

Returns an AsyncStream that contains the elements of this stream up to the first element that does not satisfy given pred function.

dropWhile(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T>

Returns an AsyncStream that contains the elements of this stream starting from the first element that does not satisfy given pred function.

take(amount: number): AsyncStream<T>

Returns an AsyncStream that contains the elements of this stream up to a maximum of amount elements.

drop(amount: number): AsyncStream<T>

Returns an AsyncStream that skips the first amount elements of this stream and returns the rest.

repeat(amount?: number): AsyncStream<T>

Returns an AsyncStream that returns the elements from this stream given amount of times.

concat<T2 = T>(...others: ArrayNonEmpty<AsyncStreamSource.NonEmpty<T2>>): AsyncStream.NonEmpty<T | T2>

Returns an AsyncStream containing the elements of this stream followed by all elements produced by the others array of AsyncStreamSources.

concat<T2 = T>(...others: ArrayNonEmpty<AsyncStreamSource<T2>>): AsyncStream<T | T2>
min(): Promise<T | undefined>

Returns the mimimum element of the AsyncStream according to a default compare function, or the provided otherwise fallback value if the stream is empty.

min<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>
minBy(compare: (v1: T, v2: T) => number): Promise<T | undefined>

Returns the mimimum element of the AsyncStream according to the provided compare function, or the provided otherwise fallback value if the stream is empty.

minBy<O>(compare: (v1: T, v2: T) => number, otherwise: AsyncOptLazy<O>): Promise<T | O>
max(): Promise<T | undefined>

Returns the maximum element of the AsyncStream according to a default compare function, or the provided otherwise fallback value if the stream is empty.

max<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>
maxBy(compare: (v1: T, v2: T) => number): Promise<T | undefined>

Returns the maximum element of the AsyncStream according to the provided compare function, or the provided otherwise fallback value if the stream is empty.

maxBy<O>(compare: (v1: T, v2: T) => number, otherwise: AsyncOptLazy<O>): Promise<T | O>
intersperse(sep: AsyncStreamSource<T>): AsyncStream<T>

Returns an AsyncStream with all elements from the given sep AsyncStreamSource between two elements of this stream.

join(options?: { sep?: string | undefined; start?: string | undefined; end?: string | undefined; valueToString?: ((value: T) => MaybePromise<string>) | undefined; ifEmpty?: string | undefined; }): Promise<string>

Returns a string resulting from converting each element to string with options.valueToString, interspersed with options.sep, starting with options.start and ending with options.end.

mkGroup(options: { sep?: AsyncStreamSource<T>; start?: AsyncStreamSource<T>; end?: AsyncStreamSource<T>; }): AsyncStream<T>

Returns an AsyncStream starting with options.sep, then returning the elements of this Stream interspersed with options.sep, and ending with options.end.

splitWhere(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T[]>

Returns an AsyncStream of arrays of stream elements, where each array is filled with elements of this stream up to the next element that satisfies give function pred.

splitOn(sepElem: T, eq?: Eq<T>): AsyncStream<T[]>

Returns an AsyncStream of arrays of stream elements, where each array is filled with elements of this stream up to the next element that equals given sepElem according to the given eq function.

distinctPrevious(eq?: Eq<T>): AsyncStream<T>

Returns an AsyncStream containing non-repetitive elements of the source stream, where repetitive elements are compared using the optionally given eq equality function.

window(windowSize: number, skipAmount?: number): AsyncStream<T[]>

Returns an AsyncStream containing windows of windowSize consecutive elements of the source stream, with each window starting skipAmount elements after the previous one.

window<R>(
windowSize: number,
skipAmount?: number,
collector?: AsyncReducer<T, R>,
): AsyncStream<R>
fold<R>(init: AsyncOptLazy<R>, next: (
current: R,
value: T,
index: number,
halt: () => void,
) => MaybePromise<R>
): Promise<R>

Returns the value resulting from applying the given the given next function to a current state (initially the given init value), and the next stream value, and returning the new state. When all elements are processed, the resulting state is returned.

foldStream<R>(init: AsyncOptLazy<R>, next: (
current: R,
value: T,
index: number,
halt: () => void,
) => MaybePromise<R>
): AsyncStream<R>

Returns an AsyncStream containing the values resulting from applying the given the given next function to a current state (initially the given init value), and the next stream value, and returning the new state.

reduce<R>(reducer: AsyncReducer<T, R>): Promise<R>

Applies the given (Async)Reducer to each element in the AsyncStream, and returns the final result.

reduceStream<R>(reducer: AsyncReducer<T, R>): AsyncStream<R>

Returns an AsyncStream where the given (Async)Reducer is applied to each element in the stream.

reduceAll<R extends [unknown, unknown, ...unknown[]]>(...reducers: [K in keyof R]: AsyncReducer<T, R[K]>): Promise<R>

Returns a tuple where each tuple element corresponds to result of applying all AsyncStream elements to the corresponding (Async)Reducer instance of the given reducers.

reduceAllStream<R extends [unknown, unknown, ...unknown[]]>(...reducers: [K in keyof R]: AsyncReducer<T, R[K]>): AsyncStream<R>

Returns an AsyncStream of tuples where each tuple element corresponds to result of applying all stream elements to the corresponding (Async)Reducer instance of the given reducers. Returns one element per input stream element.

toArray(): Promise<T[]>

Returns an Array containing all elements in the AsyncStream.

toString(): string

Returns a string representation of the AsyncStream.

toJSON(): Promise<ToJSON<T[], "AsyncStream">>

Returns a JSON representation of the AsyncStream.

asyncStream(): this

Returns an async stream of elements of type T.

equals(other: AsyncStreamSource<T>, eq?: Eq<T>): Promise<boolean>

Returns true if the sequence of elements in this stream are equal to the sequence in the other stream according to the provided eq function.

assumeNonEmpty(): AsyncStream.NonEmpty<T>

Returns the stream as a non-empty instance.

Returns the current stream preceded by the given value

Returns the current stream succeeded by the given value

forEach(f: (
value: T,
index: number,
halt: () => void,
) => MaybePromise<void>
, state?: TraverseState
): Promise<void>

Performs given function f for each element of the Stream, using given state as initial traversal state.

forEachPure<A extends readonly unknown[]>(f: (value: T, ...args: A) => MaybePromise<void>, ...args: A): Promise<void>

Performs given function f for each element of the Stream, with the optionally given args as extra arguments.

indexed(startIndex?: number): AsyncStream<[number, T]>

Returns an AsyncStream where each element in this stream is paired with its index

map<T2>(mapFun: (value: T, index: number) => MaybePromise<T2>): AsyncStream<T2>

Returns an AsyncStream where mapFun is applied to each element.

mapPure<T2, A extends readonly unknown[]>(mapFun: (value: T, ...args: A) => MaybePromise<T2>, ...args: A): AsyncStream<T2>

Returns an AsyncStream where the given mapFun is applied to each value in the stream, with optionally as extra arguments the given args.

flatMap<T2>(flatMapFun: (
value: T,
index: number,
halt: () => void,
) => AsyncStreamSource<T2>
): AsyncStream<T2>

Returns an AsyncStream consisting of the concatenation of flatMapFun applied to each element.

flatZip<T2>(flatMapFun: (
value: T,
index: number,
halt: () => void,
) => AsyncStreamSource<T2>
): AsyncStream<[T, T2]>

Returns an AsyncStream consisting of the concatenation of flatMapFun applied to each element, zipped with the element that was provided to the function.

transform<R>(transformer: AsyncTransformer<T, R>): AsyncStream<R>

Returns an AsyncStream consisting of the concatenation of AsyncStreamSource elements resulting from applying the given reducer to each element.

filter(pred: (
value: T,
index: number,
halt: () => void,
) => MaybePromise<boolean>
): AsyncStream<T>

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns true.

filterNot(pred: (
value: T,
index: number,
halt: () => void,
) => MaybePromise<boolean>
): AsyncStream<T>

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns false.

filterPure<A extends readonly unknown[]>(pred: (value: T, ...args: A) => MaybePromise<boolean>, ...args: A): AsyncStream<T>

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns true.

filterNotPure<A extends readonly unknown[]>(pred: (value: T, ...args: A) => MaybePromise<boolean>, ...args: A): AsyncStream<T>

Returns an AsyncStream containing only those elements from this stream for which the given pred function returns false.

collect<R>(collectFun: AsyncCollectFun<T, R>): AsyncStream<R>

Returns an AsyncStream containing the resulting elements from applying the given collectFun to each element in this Stream.

first(): Promise<T | undefined>

Returns the first element of the AsyncStream, or a fallback value (default undefined) if the stream is empty.

first<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>
single(): Promise<T | undefined>

Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value.

single<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>
last(): Promise<T | undefined>

Returns the last element of the AsyncStream, or a fallback value (default undefined) if the stream is empty.

last<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>
count(): Promise<number>

Returns the amount of elements in the AsyncStream.

countElement(value: T, eq?: Eq<T>): Promise<number>

Returns the amount of elements that are equal according to the given eq to the given value in the AsyncStream.

countNotElement(value: T, eq?: Eq<T>): Promise<number>

Returns the amount of elements that are not equal according to the given eq to the given value in the AsyncStream.

find(pred: (value: T, index: number) => MaybePromise<boolean>, occurrance?: number): Promise<T | undefined>

Returns the first element for which the given pred function returns true, or a fallback value otherwise.

find<O>(
pred: (value: T, index: number) => MaybePromise<boolean>,
occurrance: number | undefined,
otherwise: AsyncOptLazy<O>,
): Promise<T | O>
elementAt(index: number): Promise<T | undefined>

Returns the element in the AsyncStream at the given index, or a fallback value (default undefined) otherwise.

elementAt<O>(index: number, otherwise: AsyncOptLazy<O>): Promise<T | O>
indicesWhere(pred: (value: T) => MaybePromise<boolean>): AsyncStream<number>

Returns an AsyncStream containing the indices of the elements for which the given pred function returns true.

indicesOf(searchValue: T, eq?: Eq<T>): AsyncStream<number>

Returns an AsyncStream containing the indicies of the occurrance of the given searchValue, according to given eq function.

indexWhere(pred: (value: T, index: number) => MaybePromise<boolean>, occurrance?: number): Promise<number | undefined>

Returns the index of the given occurrance instance of the element in the AsyncStream that satisfies given pred function, or undefined if no such instance is found.

indexOf(
searchValue: T,
occurrance?: number,
eq?: Eq<T>,
): Promise<number | undefined>

Returns the index of the occurrance instance of given searchValue in the AsyncStream, using given eq function, or undefined if no such value is found.

some(pred: (value: T, index: number) => MaybePromise<boolean>): Promise<boolean>

Returns true if any element of the AsyncStream satifies given pred function.

every(pred: (value: T, index: number) => MaybePromise<boolean>): Promise<boolean>

Returns true if every element of the AsyncStream satifies given pred function.

contains(
value: T,
amount?: number,
eq?: Eq<T>,
): Promise<boolean>

Returns true if the AsyncStream contains given amount instances of given value, using given eq function.

containsSlice(source: AsyncStreamSource.NonEmpty<T>, eq?: Eq<T>): Promise<boolean>

Returns true if this stream contains the same sequence of elements as the given source, false otherwise.

takeWhile(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T>

Returns an AsyncStream that contains the elements of this stream up to the first element that does not satisfy given pred function.

dropWhile(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T>

Returns an AsyncStream that contains the elements of this stream starting from the first element that does not satisfy given pred function.

take(amount: number): AsyncStream<T>

Returns an AsyncStream that contains the elements of this stream up to a maximum of amount elements.

drop(amount: number): AsyncStream<T>

Returns an AsyncStream that skips the first amount elements of this stream and returns the rest.

repeat(amount?: number): AsyncStream<T>

Returns an AsyncStream that returns the elements from this stream given amount of times.

concat<T2 = T>(...others: ArrayNonEmpty<AsyncStreamSource.NonEmpty<T2>>): AsyncStream.NonEmpty<T | T2>

Returns an AsyncStream containing the elements of this stream followed by all elements produced by the others array of AsyncStreamSources.

concat<T2 = T>(...others: ArrayNonEmpty<AsyncStreamSource<T2>>): AsyncStream<T | T2>
min(): Promise<T | undefined>

Returns the mimimum element of the AsyncStream according to a default compare function, or the provided otherwise fallback value if the stream is empty.

min<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>
minBy(compare: (v1: T, v2: T) => number): Promise<T | undefined>

Returns the mimimum element of the AsyncStream according to the provided compare function, or the provided otherwise fallback value if the stream is empty.

minBy<O>(compare: (v1: T, v2: T) => number, otherwise: AsyncOptLazy<O>): Promise<T | O>
max(): Promise<T | undefined>

Returns the maximum element of the AsyncStream according to a default compare function, or the provided otherwise fallback value if the stream is empty.

max<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>
maxBy(compare: (v1: T, v2: T) => number): Promise<T | undefined>

Returns the maximum element of the AsyncStream according to the provided compare function, or the provided otherwise fallback value if the stream is empty.

maxBy<O>(compare: (v1: T, v2: T) => number, otherwise: AsyncOptLazy<O>): Promise<T | O>
intersperse(sep: AsyncStreamSource<T>): AsyncStream<T>

Returns an AsyncStream with all elements from the given sep AsyncStreamSource between two elements of this stream.

join(options?: { sep?: string | undefined; start?: string | undefined; end?: string | undefined; valueToString?: ((value: T) => MaybePromise<string>) | undefined; ifEmpty?: string | undefined; }): Promise<string>

Returns a string resulting from converting each element to string with options.valueToString, interspersed with options.sep, starting with options.start and ending with options.end.

mkGroup(options: { sep?: AsyncStreamSource<T>; start?: AsyncStreamSource<T>; end?: AsyncStreamSource<T>; }): AsyncStream<T>

Returns an AsyncStream starting with options.sep, then returning the elements of this Stream interspersed with options.sep, and ending with options.end.

splitWhere(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T[]>

Returns an AsyncStream of arrays of stream elements, where each array is filled with elements of this stream up to the next element that satisfies give function pred.

splitOn(sepElem: T, eq?: Eq<T>): AsyncStream<T[]>

Returns an AsyncStream of arrays of stream elements, where each array is filled with elements of this stream up to the next element that equals given sepElem according to the given eq function.

distinctPrevious(eq?: Eq<T>): AsyncStream<T>

Returns an AsyncStream containing non-repetitive elements of the source stream, where repetitive elements are compared using the optionally given eq equality function.

window(windowSize: number, skipAmount?: number): AsyncStream<T[]>

Returns an AsyncStream containing windows of windowSize consecutive elements of the source stream, with each window starting skipAmount elements after the previous one.

window<R>(
windowSize: number,
skipAmount?: number,
collector?: AsyncReducer<T, R>,
): AsyncStream<R>
fold<R>(init: AsyncOptLazy<R>, next: (
current: R,
value: T,
index: number,
halt: () => void,
) => MaybePromise<R>
): Promise<R>

Returns the value resulting from applying the given the given next function to a current state (initially the given init value), and the next stream value, and returning the new state. When all elements are processed, the resulting state is returned.

foldStream<R>(init: AsyncOptLazy<R>, next: (
current: R,
value: T,
index: number,
halt: () => void,
) => MaybePromise<R>
): AsyncStream<R>

Returns an AsyncStream containing the values resulting from applying the given the given next function to a current state (initially the given init value), and the next stream value, and returning the new state.

reduce<R>(reducer: AsyncReducer<T, R>): Promise<R>

Applies the given (Async)Reducer to each element in the AsyncStream, and returns the final result.

reduceStream<R>(reducer: AsyncReducer<T, R>): AsyncStream<R>

Returns an AsyncStream where the given (Async)Reducer is applied to each element in the stream.

reduceAll<R extends [unknown, unknown, ...unknown[]]>(...reducers: [K in keyof R]: AsyncReducer<T, R[K]>): Promise<R>

Returns a tuple where each tuple element corresponds to result of applying all AsyncStream elements to the corresponding (Async)Reducer instance of the given reducers.

reduceAllStream<R extends [unknown, unknown, ...unknown[]]>(...reducers: [K in keyof R]: AsyncReducer<T, R[K]>): AsyncStream<R>

Returns an AsyncStream of tuples where each tuple element corresponds to result of applying all stream elements to the corresponding (Async)Reducer instance of the given reducers. Returns one element per input stream element.

toArray(): Promise<T[]>

Returns an Array containing all elements in the AsyncStream.

toString(): string

Returns a string representation of the AsyncStream.

toJSON(): Promise<ToJSON<T[], "AsyncStream">>

Returns a JSON representation of the AsyncStream.

variable AsyncStream
Re-export
import { AsyncStream } from "https://deno.land/x/rimbu@0.13.1/mod.ts";