Skip to main content

datastream

Building blocks to read and transform data streams

Sources

fromFile

Opens a file and creates a stream of lines

type fromFile = (
  path: string,
  funcs?: PipeFunction[],
) => Promise<AsyncIterableIterator<string>>;

fromStdin

Reads a stream from stdin

type fromStdin = (funcs: PipeFunction[] = []) => Promise<AsyncIterableIterator<string>>

fromNdjsonFile

Opens a ndjson (new line delimited JSON) file and creates a stream of JSON objects

type fromNdjsonFile = (
  path: string,
  funcs?: PipeFunction[],
) => Promise<AsyncIterableIterator<any>>;

fromNdjsonStdin

Reads and parses a stream of ndjson from stdin

type fromNdjsonStdin = (
  funcs?: PipeFunction[],
) => AsyncIterableIterator<any>;

fromDsvFile

Opens a dsv (delimiter separated values) file and creates a stream of JSON objects

The first line is expected to be the column labels

type fromDsvFile = (
  path: string,
  config?: {
    delimiter?: string; // default "," (csv)
    numeric?: string[]; // list of numeric columns
    bool?: string[]; // list of boolean columns
  },
  funcs?: PipeFunction[],
) => Promise<AsyncIterableIterator<any>>;

fromDsvStdin

Reads a stream of dsv from stdin and parses it as JSON objects

type fromDsvStdin = (
  config?: {
    delimiter?: string; // default "," (csv)
    numeric?: string[]; // list of numeric columns
    bool?: string[]; // list of boolean columns
 },
 funcs?: PipeFunction[]
) => <AsyncIterableIterator<any>>

Transforms

Functions to modify the stream of data

All transforms return a PipeFunction

type PipeFunction = (
  d: AsyncIterableIterator<any>,
) => AsyncIterableIterator<any>;

map

type map = <A = any, B = any>(func: (d: A, i: number) => B) => PipeFunction;

filter

type filter = <T = any>(func: (d: T, i: number) => boolean) => PipeFunction;

offset

type offset = (n: number) => PipeFunction;

limit

type limit = (n: number) => PipeFunction;

Output

toArray

type toArray = <T = any>(iterable: AsyncIterableIterator<T>) => Promise<T[]>;

find

type find = <T>(
  func: (d: T) => boolean,
) => (iterable: AsyncIterableIterator<T>) => Promise<T | undefined>;

reduce

type reduce = <A = any, B = any>(
  func: (r: B, d: A, i: number) => B,
  start: B,
) => (iterable: AsyncIterableIterator<A>) => Promise<B>;

toNdjsonStdout

type toNdjsonStdout = <T = any>(
  iterable: AsyncIterableIterator<T>,
) => Promise<void>;

toDsvStdout

type toDsvStdout = <T = any>(
  iterable: AsyncIterableIterator<T>,
  delimiter?: string,
) => Promise<void>;

toNdjsonFile

type toNdjsonFile = <T = any>(
  iterable: AsyncIterableIterator<T>,
  path: string,
) => Promise<void>;

toDsvFile

type toDsvFile = <T = any>(
  iterable: AsyncIterableIterator<T>,
  path: string,
  delimiter?: string,
) => Promise<void>;