v0.0.2
Building blocks to read and transform data streams
Repository
Current version released
3 years ago
Dependencies
std
Versions
datastream
Building blocks to read and transform data streams
Sources
fromFile
Opens a file and creates a stream of lines
type fromFile = (
path: string,
funcs?: PipeFunction[],
) => Promise<AsyncIterableIterator<string>>;
fromStdin
Reads a stream from stdin
type fromStdin = (funcs: PipeFunction[] = []) => Promise<AsyncIterableIterator<string>>
fromNdjsonFile
Opens a ndjson (new line delimited JSON) file and creates a stream of JSON objects
type fromNdjsonFile = (
path: string,
funcs?: PipeFunction[],
) => Promise<AsyncIterableIterator<any>>;
fromNdjsonStdin
Reads and parses a stream of ndjson from stdin
type fromNdjsonStdin = (
funcs?: PipeFunction[],
) => AsyncIterableIterator<any>;
fromDsvFile
Opens a dsv (delimiter separated values) file and creates a stream of JSON objects
The first line is expected to be the column labels
type fromDsvFile = (
path: string,
config?: {
delimiter?: string; // default "," (csv)
numeric?: string[]; // list of numeric columns
bool?: string[]; // list of boolean columns
},
funcs?: PipeFunction[],
) => Promise<AsyncIterableIterator<any>>;
fromDsvStdin
Reads a stream of dsv from stdin and parses it as JSON objects
type fromDsvStdin = (
config?: {
delimiter?: string; // default "," (csv)
numeric?: string[]; // list of numeric columns
bool?: string[]; // list of boolean columns
},
funcs?: PipeFunction[]
) => <AsyncIterableIterator<any>>
Transforms
Functions to modify the stream of data
All transforms return a PipeFunction
type PipeFunction = (
d: AsyncIterableIterator<any>,
) => AsyncIterableIterator<any>;
map
type map = <A = any, B = any>(func: (d: A, i: number) => B) => PipeFunction;
filter
type filter = <T = any>(func: (d: T, i: number) => boolean) => PipeFunction;
offset
type offset = (n: number) => PipeFunction;
limit
type limit = (n: number) => PipeFunction;
Output
toArray
type toArray = <T = any>(iterable: AsyncIterableIterator<T>) => Promise<T[]>;
reduce
type reduce = <A = any, B = any>(
func: (r: B, d: A, i: number) => B,
start: B,
) => (iterable: AsyncIterableIterator<A>) => Promise<B>;
toNdjsonStdout
type toNdjsonStdout = <T = any>(
iterable: AsyncIterableIterator<T>,
) => Promise<void>;
toDsvStdout
type toDsvStdout = <T = any>(
iterable: AsyncIterableIterator<T>,
delimiter?: string,
) => Promise<void>;
toNdjsonFile
type toNdjsonFile = <T = any>(
iterable: AsyncIterableIterator<T>,
path: string,
) => Promise<void>;
toDsvFile
type toDsvFile = <T = any>(
iterable: AsyncIterableIterator<T>,
path: string,
delimiter?: string,
) => Promise<void>;