import { Eq, OptLazy, Reducer } from '../../common/mod.ts';import { Stream, StreamSource } from '../../stream/mod.ts';
export type Transformer<T, R = T> = Reducer<T, StreamSource<R>>;
export namespace Transformer { export type NonEmpty<T, R = T> = Reducer<T, StreamSource.NonEmpty<R>>;
export const window: { <T>(windowSize: number, skipAmount?: number): Transformer<T, T[]>; <T, R>( windowSize: number, skipAmount?: number, collector?: Reducer<T, R> ): Transformer<T, R>; } = <T, R>( windowSize: number, skipAmount = windowSize, collector = Reducer.toArray() ) => { return Reducer.create< T, Stream<R>, Set<{ result: unknown; size: number; halted: boolean; halt: () => void }> >( () => new Set(), (state, elem, index) => { for (const current of state) { if (current.size >= windowSize || current.halted) { state.delete(current); }
current.result = collector.next( current.result, elem, current.size, current.halt ); current.size++; }
if (index % skipAmount === 0) { const newState = { result: OptLazy(collector.init), size: 1, halted: false, halt(): void { this.halted = true; }, };
newState.result = collector.next( OptLazy(collector.init), elem, 0, newState.halt );
state.add(newState); }
return state; }, (current) => { return Stream.from(current) .collect((v, _, skip) => v.size === windowSize ? Stream.of<R>(collector.stateToResult(v.result) as any) : skip ) .first(Stream.empty()); } ); };
export function distinctPrevious<T>(eq: Eq<T> = Eq.objectIs): Transformer<T> { return Reducer.create( () => [] as T[], (current, elem) => { current.push(elem);
if (current.length > 2) { current.shift(); }
return current; }, (state) => { if (state.length > 0) { if (state.length === 1) { return Stream.of(state[0]); } if (!eq(state[0], state[1])) { return Stream.of(state[1]); } }
return Stream.empty(); } ); }}