import { AsyncOptLazy, AsyncReducer, Eq } from '../../common/mod.ts';import { AsyncStream, AsyncStreamSource } from '../../stream/async/index.ts';
export type AsyncTransformer<T, R = T> = AsyncReducer<T, AsyncStreamSource<R>>;
export namespace AsyncTransformer { export type NonEmpty<T, R = T> = AsyncReducer< T, AsyncStreamSource.NonEmpty<R> >;
export const window: { <T>(windowSize: number, skipAmount?: number): AsyncTransformer<T, T[]>; <T, R>( windowSize: number, skipAmount?: number, collector?: AsyncReducer<T, R> ): AsyncTransformer<T, R>; } = <T, R>( windowSize: number, skipAmount = windowSize, collector = AsyncReducer.toArray() ) => { return AsyncReducer.create< T, AsyncStream<R>, Set<{ result: unknown; size: number; halted: boolean; halt: () => void }> >( () => new Set(), async (state, elem, index) => { for (const current of state) { if (current.size >= windowSize || current.halted) { state.delete(current); }
current.result = await collector.next( current.result, elem, current.size, current.halt ); current.size++; }
if (index % skipAmount === 0) { const newState = { result: await AsyncOptLazy.toMaybePromise(collector.init), size: 1, halted: false, halt(): void { this.halted = true; }, };
newState.result = collector.next( await AsyncOptLazy.toMaybePromise(collector.init), elem, 0, newState.halt );
state.add(newState); }
return state; }, (current) => { return AsyncStream.from(current) .collect((v, _, skip) => v.size === windowSize ? AsyncStream.of<R>(collector.stateToResult(v.result) as any) : skip ) .first(AsyncStream.empty()); } ); };
export function distinctPrevious<T>( eq: Eq<T> = Eq.objectIs ): AsyncTransformer<T> { return AsyncReducer.create( () => [] as T[], (current, elem) => { current.push(elem);
if (current.length > 2) { current.shift(); }
return current; }, (state) => { if (state.length > 0) { if (state.length === 1) { return AsyncStream.of(state[0]); } if (!eq(state[0], state[1])) { return AsyncStream.of(state[1]); } }
return AsyncStream.empty(); } ); }}