// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. // This module is browser compatible. import { Deferred, deferred } from "./deferred.ts"; interface TaggedYieldedValue { iterator: AsyncIterator; value: T; } /** The MuxAsyncIterator class multiplexes multiple async iterators into a * single stream. It currently makes an assumption: * - The final result (the value returned and not yielded from the iterator) * does not matter; if there is any, it is discarded. */ export class MuxAsyncIterator implements AsyncIterable { #iteratorCount = 0; #yields: Array> = []; // deno-lint-ignore no-explicit-any #throws: any[] = []; #signal: Deferred = deferred(); add(iterable: AsyncIterable): void { ++this.#iteratorCount; this.#callIteratorNext(iterable[Symbol.asyncIterator]()); } async #callIteratorNext( iterator: AsyncIterator, ) { try { const { value, done } = await iterator.next(); if (done) { --this.#iteratorCount; } else { this.#yields.push({ iterator, value }); } } catch (e) { this.#throws.push(e); } this.#signal.resolve(); } async *iterate(): AsyncIterableIterator { while (this.#iteratorCount > 0) { // Sleep until any of the wrapped iterators yields. await this.#signal; // Note that while we're looping over `yields`, new items may be added. for (let i = 0; i < this.#yields.length; i++) { const { iterator, value } = this.#yields[i]; yield value; this.#callIteratorNext(iterator); } if (this.#throws.length) { for (const e of this.#throws) { throw e; } this.#throws.length = 0; } // Clear the `yields` list and reset the `signal` promise. this.#yields.length = 0; this.#signal = deferred(); } } [Symbol.asyncIterator](): AsyncIterator { return this.iterate(); } }