import { Deferred, deferred } from "./deferred.ts";
interface TaggedYieldedValue<T> { iterator: AsyncIterableIterator<T>; value: T;}
export class MuxAsyncIterator<T> implements AsyncIterable<T> { private iteratorCount = 0; private yields: Array<TaggedYieldedValue<T>> = []; private signal: Deferred<void> = deferred();
add(iterator: AsyncIterableIterator<T>): void { ++this.iteratorCount; this.callIteratorNext(iterator); }
private async callIteratorNext( iterator: AsyncIterableIterator<T> ): Promise<void> { const { value, done } = await iterator.next(); if (done) { --this.iteratorCount; } else { this.yields.push({ iterator, value }); } this.signal.resolve(); }
async *iterate(): AsyncIterableIterator<T> { while (this.iteratorCount > 0) { await this.signal;
for (let i = 0; i < this.yields.length; i++) { const { iterator, value } = this.yields[i]; yield value; this.callIteratorNext(iterator); }
this.yields.length = 0; this.signal = deferred(); } }
[Symbol.asyncIterator](): AsyncIterableIterator<T> { return this.iterate(); }}