Skip to main content

datastream

Building blocks to read and transform data streams

Sources

fromFile

Opens a file and creates a stream of lines

type fromFile = (
  path: string,
  funcs?: PipeFunction[],
) => Promise<{ iterator: AsyncIterableIterator<any>; rid: number }>;

fromStdin

Reads a stream from stdin

type fromStdin = (
  funcs: PipeFunction[] = [],
) => { iterator: AsyncIterableIterator<string>, rid: number }

fromNdjsonFile

Opens a ndjson (new line delimited JSON) file and creates a stream of JSON objects

type fromNdjsonFile = (
  path: string,
  funcs?: PipeFunction[],
) => Promise<{ iterator: AsyncIterableIterator<any>; rid: number }>;

fromNdjsonStdin

Reads and parses a stream of ndjson from stdin

type fromNdjsonStdin = (
  funcs?: PipeFunction[],
) => { iterator: AsyncIterableIterator<any> };

fromDsvFile

Opens a dsv (delimiter separated values) file and creates a stream of JSON objects

The first line is expected to be the column labels

type fromDsvFile = (
  path: string,
  config?: {
    delimiter?: string; // default "," (csv)
    numeric?: string[]; // list of numeric columns
    bool?: string[]; // list of boolean columns
  },
  funcs?: PipeFunction[],
) => Promise<{ iterator: AsyncIterableIterator<any>; rid: number }>;

fromDsvStdin

Reads a stream of dsv from stdin and parses it as JSON objects

type fromDsvStdin = (
  config?: {
    delimiter?: string; // default "," (csv)
    numeric?: string[]; // list of numeric columns
    bool?: string[]; // list of boolean columns
  },
  funcs?: PipeFunction[],
) => { iterator: AsyncIterableIterator<any> };

Transforms

Functions to modify the stream of data

All transforms return a PipeFunction

type PipeFunction = (
  d: AsyncIterableIterator<any>,
) => AsyncIterableIterator<any>;

map

type map = <A = any, B = any>(func: (d: A, i: number) => B) => PipeFunction;

filter

type filter = <T = any>(func: (d: T, i: number) => boolean) => PipeFunction;

offset

type offset = (n: number) => PipeFunction;

limit

type limit = (n: number) => PipeFunction;

Output

toArray

type toArray = <T = any>({ iterator: AsyncIterableIterator<T>, rid?: number }) => Promise<T[]>;

find

type find = <T>(
  func: (d: T) => boolean,
) => ({ iterator: AsyncIterableIterator<T>, rid?: number }) =>
  Promise<T | undefined>;

reduce

type reduce = <A = any, B = any>(
  func: (r: B, d: A, i: number) => B,
  start: B,
) => ({
  iterator: AsyncIterableIterator<A>,
  rid?: number,
}) => Promise<B>;

toNdjsonStdout

type toNdjsonStdout = <T = any>({
  iterator: AsyncIterableIterator<T>,
  rid?: number,
}) => Promise<void>;

toDsvStdout

type toDsvStdout = <T = any>(
  { iterator: AsyncIterableIterator<T>, rid?: number },
  delimiter?: string,
) => Promise<void>;

toNdjsonFile

type toNdjsonFile = <T = any>(
  { iterator: AsyncIterableIterator<T>, rid?: number },
  path: string,
) => Promise<void>;

toDsvFile

type toDsvFile = <T = any>(
  { iterator: AsyncIterableIterator<T>, rid?: number },
  path: string,
  delimiter?: string,
) => Promise<void>;