import { bufferWhen } from "https://deno.land/x/rxjs@v1.0.2/mod.ts";
Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.
Collects values from the past as an array. When it starts collecting values, it calls a function that returns an Observable that tells when to close the buffer and restart collecting.
Opens a buffer immediately, then closes the buffer when the observable
returned by calling closingSelector
function emits a value. When it closes
the buffer, it immediately opens a new buffer and repeats the process.
Example
Emit an array of the last clicks every [1-5] random seconds
import { fromEvent, bufferWhen, interval } from 'rxjs';
const clicks = fromEvent(document, 'click');
const buffered = clicks.pipe(
bufferWhen(() => interval(1000 + Math.random() * 4000))
);
buffered.subscribe(x => console.log(x));
Parameters
closingSelector: () => ObservableInput<any>
A function that takes no arguments and returns an Observable that signals buffer closure.