Skip to main content
Module

std/streams/conversion.ts

Deno standard library
Go to Latest
File
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
import { Buffer } from "../io/buffer.ts";
const DEFAULT_CHUNK_SIZE = 16_640;const DEFAULT_BUFFER_SIZE = 32 * 1024;
function isCloser(value: unknown): value is Deno.Closer { return typeof value === "object" && value != null && "close" in value && // deno-lint-ignore no-explicit-any typeof (value as Record<string, any>)["close"] === "function";}
/** Create a `Deno.Reader` from an iterable of `Uint8Array`s. * * ```ts * import { readerFromIterable, copy } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * const file = await Deno.open("metrics.txt", { write: true }); * const reader = readerFromIterable((async function* () { * while (true) { * await new Promise((r) => setTimeout(r, 1000)); * const message = `data: ${JSON.stringify(Deno.metrics())}\n\n`; * yield new TextEncoder().encode(message); * } * })()); * await copy(reader, file); * ``` */export function readerFromIterable( iterable: Iterable<Uint8Array> | AsyncIterable<Uint8Array>,): Deno.Reader { const iterator: Iterator<Uint8Array> | AsyncIterator<Uint8Array> = (iterable as AsyncIterable<Uint8Array>)[Symbol.asyncIterator]?.() ?? (iterable as Iterable<Uint8Array>)[Symbol.iterator]?.(); const buffer = new Buffer(); return { async read(p: Uint8Array): Promise<number | null> { if (buffer.length == 0) { const result = await iterator.next(); if (result.done) { return null; } else { if (result.value.byteLength <= p.byteLength) { p.set(result.value); return result.value.byteLength; } p.set(result.value.subarray(0, p.byteLength)); await writeAll(buffer, result.value.subarray(p.byteLength)); return p.byteLength; } } else { const n = await buffer.read(p); if (n == null) { return this.read(p); } return n; } }, };}
/** Create a `Writer` from a `WritableStreamDefaultWriter`. */export function writerFromStreamWriter( streamWriter: WritableStreamDefaultWriter<Uint8Array>,): Deno.Writer { return { async write(p: Uint8Array): Promise<number> { await streamWriter.ready; await streamWriter.write(p); return p.length; }, };}
/** Create a `Reader` from a `ReadableStreamDefaultReader`. */export function readerFromStreamReader( streamReader: ReadableStreamDefaultReader<Uint8Array>,): Deno.Reader { const buffer = new Buffer();
return { async read(p: Uint8Array): Promise<number | null> { if (buffer.empty()) { const res = await streamReader.read(); if (res.done) { return null; // EOF }
await writeAll(buffer, res.value); }
return buffer.read(p); }, };}
export interface WritableStreamFromWriterOptions { /** * If the `writer` is also a `Deno.Closer`, automatically close the `writer` * when the stream is closed, aborted, or a write error occurs. * * Defaults to `true`. */ autoClose?: boolean;}
/** Create a `WritableStream` from a `Writer`. */export function writableStreamFromWriter( writer: Deno.Writer, options: WritableStreamFromWriterOptions = {},): WritableStream<Uint8Array> { const { autoClose = true } = options;
return new WritableStream({ async write(chunk, controller) { try { await writeAll(writer, chunk); } catch (e) { controller.error(e); if (isCloser(writer) && autoClose) { writer.close(); } } }, close() { if (isCloser(writer) && autoClose) { writer.close(); } }, abort() { if (isCloser(writer) && autoClose) { writer.close(); } }, });}
/** Create a `ReadableStream` from any kind of iterable. * * ```ts * import { readableStreamFromIterable } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * const r1 = readableStreamFromIterable(["foo, bar, baz"]); * const r2 = readableStreamFromIterable(async function* () { * await new Promise(((r) => setTimeout(r, 1000))); * yield "foo"; * await new Promise(((r) => setTimeout(r, 1000))); * yield "bar"; * await new Promise(((r) => setTimeout(r, 1000))); * yield "baz"; * }()); * ``` * * If the produced iterator (`iterable[Symbol.asyncIterator]()` or * `iterable[Symbol.iterator]()`) is a generator, or more specifically is found * to have a `.throw()` method on it, that will be called upon * `readableStream.cancel()`. This is the case for the second input type above: * * ```ts * import { readableStreamFromIterable } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * const r3 = readableStreamFromIterable(async function* () { * try { * yield "foo"; * } catch (error) { * console.log(error); // Error: Cancelled by consumer. * } * }()); * const reader = r3.getReader(); * console.log(await reader.read()); // { value: "foo", done: false } * await reader.cancel(new Error("Cancelled by consumer.")); * ``` */export function readableStreamFromIterable<T>( iterable: Iterable<T> | AsyncIterable<T>,): ReadableStream<T> { const iterator: Iterator<T> | AsyncIterator<T> = (iterable as AsyncIterable<T>)[Symbol.asyncIterator]?.() ?? (iterable as Iterable<T>)[Symbol.iterator]?.(); return new ReadableStream({ async pull(controller) { const { value, done } = await iterator.next(); if (done) { controller.close(); } else { controller.enqueue(value); } }, async cancel(reason) { if (typeof iterator.throw == "function") { try { await iterator.throw(reason); } catch { /* `iterator.throw()` always throws on site. We catch it. */ } } }, });}
/** * Convert the generator function into a TransformStream. * * ```ts * import { readableStreamFromIterable, toTransformStream } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * const readable = readableStreamFromIterable([0, 1, 2]) * .pipeThrough(toTransformStream(async function* (src) { * for await (const chunk of src) { * yield chunk * 100; * } * })); * * for await (const chunk of readable) { * console.log(chunk); * } * // output: 0, 100, 200 * ``` * * @param transformer A function to transform. * @param writableStrategy An object that optionally defines a queuing strategy for the stream. * @param readableStrategy An object that optionally defines a queuing strategy for the stream. */export function toTransformStream<I, O>( transformer: (src: ReadableStream<I>) => Iterable<O> | AsyncIterable<O>, writableStrategy?: QueuingStrategy<I>, readableStrategy?: QueuingStrategy<O>,): TransformStream<I, O> { const { writable, readable, } = new TransformStream<I, I>(undefined, writableStrategy);
const iterable = transformer(readable); const iterator: Iterator<O> | AsyncIterator<O> = (iterable as AsyncIterable<O>)[Symbol.asyncIterator]?.() ?? (iterable as Iterable<O>)[Symbol.iterator]?.(); return { writable, readable: new ReadableStream<O>({ async pull(controller) { let result: IteratorResult<O>; try { result = await iterator.next(); } catch (error) { // Propagate error to stream from iterator // If the stream status is "errored", it will be thrown, but ignore. await readable.cancel(error).catch(() => {}); controller.error(error); return; } if (result.done) { controller.close(); return; } controller.enqueue(result.value); }, async cancel(reason) { // Propagate cancellation to readable and iterator if (typeof iterator.throw == "function") { try { await iterator.throw(reason); } catch { /* `iterator.throw()` always throws on site. We catch it. */ } } await readable.cancel(reason); }, }, readableStrategy), };}
export interface ReadableStreamFromReaderOptions { /** If the `reader` is also a `Deno.Closer`, automatically close the `reader` * when `EOF` is encountered, or a read error occurs. * * Defaults to `true`. */ autoClose?: boolean;
/** The size of chunks to allocate to read, the default is ~16KiB, which is * the maximum size that Deno operations can currently support. */ chunkSize?: number;
/** The queuing strategy to create the `ReadableStream` with. */ strategy?: { highWaterMark?: number | undefined; size?: undefined };}
/** * Create a `ReadableStream<Uint8Array>` from from a `Deno.Reader`. * * When the pull algorithm is called on the stream, a chunk from the reader * will be read. When `null` is returned from the reader, the stream will be * closed along with the reader (if it is also a `Deno.Closer`). * * An example converting a `Deno.FsFile` into a readable stream: * * ```ts * import { readableStreamFromReader } from "https://deno.land/std@$STD_VERSION/streams/mod.ts"; * * const file = await Deno.open("./file.txt", { read: true }); * const fileStream = readableStreamFromReader(file); * ``` */export function readableStreamFromReader( reader: Deno.Reader | (Deno.Reader & Deno.Closer), options: ReadableStreamFromReaderOptions = {},): ReadableStream<Uint8Array> { const { autoClose = true, chunkSize = DEFAULT_CHUNK_SIZE, strategy, } = options;
return new ReadableStream({ async pull(controller) { const chunk = new Uint8Array(chunkSize); try { const read = await reader.read(chunk); if (read === null) { if (isCloser(reader) && autoClose) { reader.close(); } controller.close(); return; } controller.enqueue(chunk.subarray(0, read)); } catch (e) { controller.error(e); if (isCloser(reader)) { reader.close(); } } }, cancel() { if (isCloser(reader) && autoClose) { reader.close(); } }, }, strategy);}
/** Read Reader `r` until EOF (`null`) and resolve to the content as * Uint8Array`. * * ```ts * import { Buffer } from "https://deno.land/std@$STD_VERSION/io/buffer.ts"; * import { readAll } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * // Example from stdin * const stdinContent = await readAll(Deno.stdin); * * // Example from file * const file = await Deno.open("my_file.txt", {read: true}); * const myFileContent = await readAll(file); * file.close(); * * // Example from buffer * const myData = new Uint8Array(100); * // ... fill myData array with data * const reader = new Buffer(myData.buffer); * const bufferContent = await readAll(reader); * ``` */export async function readAll(r: Deno.Reader): Promise<Uint8Array> { const buf = new Buffer(); await buf.readFrom(r); return buf.bytes();}
/** Synchronously reads Reader `r` until EOF (`null`) and returns the content * as `Uint8Array`. * * ```ts * import { Buffer } from "https://deno.land/std@$STD_VERSION/io/buffer.ts"; * import { readAllSync } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * // Example from stdin * const stdinContent = readAllSync(Deno.stdin); * * // Example from file * const file = Deno.openSync("my_file.txt", {read: true}); * const myFileContent = readAllSync(file); * file.close(); * * // Example from buffer * const myData = new Uint8Array(100); * // ... fill myData array with data * const reader = new Buffer(myData.buffer); * const bufferContent = readAllSync(reader); * ``` */export function readAllSync(r: Deno.ReaderSync): Uint8Array { const buf = new Buffer(); buf.readFromSync(r); return buf.bytes();}
/** Write all the content of the array buffer (`arr`) to the writer (`w`). * * ```ts * import { Buffer } from "https://deno.land/std@$STD_VERSION/io/buffer.ts"; * import { writeAll } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
* // Example writing to stdout * let contentBytes = new TextEncoder().encode("Hello World"); * await writeAll(Deno.stdout, contentBytes); * * // Example writing to file * contentBytes = new TextEncoder().encode("Hello World"); * const file = await Deno.open('test.file', {write: true}); * await writeAll(file, contentBytes); * file.close(); * * // Example writing to buffer * contentBytes = new TextEncoder().encode("Hello World"); * const writer = new Buffer(); * await writeAll(writer, contentBytes); * console.log(writer.bytes().length); // 11 * ``` */export async function writeAll(w: Deno.Writer, arr: Uint8Array) { let nwritten = 0; while (nwritten < arr.length) { nwritten += await w.write(arr.subarray(nwritten)); }}
/** Synchronously write all the content of the array buffer (`arr`) to the * writer (`w`). * * ```ts * import { Buffer } from "https://deno.land/std@$STD_VERSION/io/buffer.ts"; * import { writeAllSync } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * // Example writing to stdout * let contentBytes = new TextEncoder().encode("Hello World"); * writeAllSync(Deno.stdout, contentBytes); * * // Example writing to file * contentBytes = new TextEncoder().encode("Hello World"); * const file = Deno.openSync('test.file', {write: true}); * writeAllSync(file, contentBytes); * file.close(); * * // Example writing to buffer * contentBytes = new TextEncoder().encode("Hello World"); * const writer = new Buffer(); * writeAllSync(writer, contentBytes); * console.log(writer.bytes().length); // 11 * ``` */export function writeAllSync(w: Deno.WriterSync, arr: Uint8Array) { let nwritten = 0; while (nwritten < arr.length) { nwritten += w.writeSync(arr.subarray(nwritten)); }}
/** Turns a Reader, `r`, into an async iterator. * * ```ts * import { iterateReader } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * let f = await Deno.open("/etc/passwd"); * for await (const chunk of iterateReader(f)) { * console.log(chunk); * } * f.close(); * ``` * * Second argument can be used to tune size of a buffer. * Default size of the buffer is 32kB. * * ```ts * import { iterateReader } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * let f = await Deno.open("/etc/passwd"); * const it = iterateReader(f, { * bufSize: 1024 * 1024 * }); * for await (const chunk of it) { * console.log(chunk); * } * f.close(); * ``` */export async function* iterateReader( r: Deno.Reader, options?: { bufSize?: number; },): AsyncIterableIterator<Uint8Array> { const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; const b = new Uint8Array(bufSize); while (true) { const result = await r.read(b); if (result === null) { break; }
yield b.slice(0, result); }}
/** Turns a ReaderSync, `r`, into an iterator. * * ```ts * import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * let f = Deno.openSync("/etc/passwd"); * for (const chunk of iterateReaderSync(f)) { * console.log(chunk); * } * f.close(); * ``` * * Second argument can be used to tune size of a buffer. * Default size of the buffer is 32kB. * * ```ts * import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
* let f = await Deno.open("/etc/passwd"); * const iter = iterateReaderSync(f, { * bufSize: 1024 * 1024 * }); * for (const chunk of iter) { * console.log(chunk); * } * f.close(); * ``` * * Iterator uses an internal buffer of fixed size for efficiency; it returns * a view on that buffer on each iteration. It is therefore caller's * responsibility to copy contents of the buffer if needed; otherwise the * next iteration will overwrite contents of previously returned chunk. */export function* iterateReaderSync( r: Deno.ReaderSync, options?: { bufSize?: number; },): IterableIterator<Uint8Array> { const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; const b = new Uint8Array(bufSize); while (true) { const result = r.readSync(b); if (result === null) { break; }
yield b.slice(0, result); }}
/** Copies from `src` to `dst` until either EOF (`null`) is read from `src` or * an error occurs. It resolves to the number of bytes copied or rejects with * the first error encountered while copying. * * ```ts * import { copy } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts"; * * const source = await Deno.open("my_file.txt"); * const bytesCopied1 = await copy(source, Deno.stdout); * const destination = await Deno.create("my_file_2.txt"); * const bytesCopied2 = await copy(source, destination); * ``` * * @param src The source to copy from * @param dst The destination to copy to * @param options Can be used to tune size of the buffer. Default size is 32kB */export async function copy( src: Deno.Reader, dst: Deno.Writer, options?: { bufSize?: number; },): Promise<number> { let n = 0; const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; const b = new Uint8Array(bufSize); let gotEOF = false; while (gotEOF === false) { const result = await src.read(b); if (result === null) { gotEOF = true; } else { let nwritten = 0; while (nwritten < result) { nwritten += await dst.write(b.subarray(nwritten, result)); } n += nwritten; } } return n;}