Skip to main content
Module

std/io/buffer.ts

Deno standard library
Go to Latest
File
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.import { assert } from "../_util/assert.ts";import { BytesList } from "../bytes/bytes_list.ts";import { concat, copy } from "../bytes/mod.ts";import type { Reader, ReaderSync, Writer, WriterSync } from "./types.d.ts";
// MIN_READ is the minimum ArrayBuffer size passed to a read call by// buffer.ReadFrom. As long as the Buffer has at least MIN_READ bytes beyond// what is required to hold the contents of r, readFrom() will not grow the// underlying buffer.const MIN_READ = 32 * 1024;const MAX_SIZE = 2 ** 32 - 2;
/** A variable-sized buffer of bytes with `read()` and `write()` methods. * * Buffer is almost always used with some I/O like files and sockets. It allows * one to buffer up a download from a socket. Buffer grows and shrinks as * necessary. * * Buffer is NOT the same thing as Node's Buffer. Node's Buffer was created in * 2009 before JavaScript had the concept of ArrayBuffers. It's simply a * non-standard ArrayBuffer. * * ArrayBuffer is a fixed memory allocation. Buffer is implemented on top of * ArrayBuffer. * * Based on [Go Buffer](https://golang.org/pkg/bytes/#Buffer). */
export class Buffer { #buf: Uint8Array; // contents are the bytes buf[off : len(buf)] #off = 0; // read at buf[off], write at buf[buf.byteLength]
constructor(ab?: ArrayBufferLike | ArrayLike<number>) { this.#buf = ab === undefined ? new Uint8Array(0) : new Uint8Array(ab); }
/** Returns a slice holding the unread portion of the buffer. * * The slice is valid for use only until the next buffer modification (that * is, only until the next call to a method like `read()`, `write()`, * `reset()`, or `truncate()`). If `options.copy` is false the slice aliases the buffer content at * least until the next buffer modification, so immediate changes to the * slice will affect the result of future reads. * @param options Defaults to `{ copy: true }` */ bytes(options = { copy: true }): Uint8Array { if (options.copy === false) return this.#buf.subarray(this.#off); return this.#buf.slice(this.#off); }
/** Returns whether the unread portion of the buffer is empty. */ empty(): boolean { return this.#buf.byteLength <= this.#off; }
/** A read only number of bytes of the unread portion of the buffer. */ get length(): number { return this.#buf.byteLength - this.#off; }
/** The read only capacity of the buffer's underlying byte slice, that is, * the total space allocated for the buffer's data. */ get capacity(): number { return this.#buf.buffer.byteLength; }
/** Discards all but the first `n` unread bytes from the buffer but * continues to use the same allocated storage. It throws if `n` is * negative or greater than the length of the buffer. */ truncate(n: number): void { if (n === 0) { this.reset(); return; } if (n < 0 || n > this.length) { throw Error("bytes.Buffer: truncation out of range"); } this.#reslice(this.#off + n); }
reset(): void { this.#reslice(0); this.#off = 0; }
#tryGrowByReslice(n: number) { const l = this.#buf.byteLength; if (n <= this.capacity - l) { this.#reslice(l + n); return l; } return -1; }
#reslice(len: number) { assert(len <= this.#buf.buffer.byteLength); this.#buf = new Uint8Array(this.#buf.buffer, 0, len); }
/** Reads the next `p.length` bytes from the buffer or until the buffer is * drained. Returns the number of bytes read. If the buffer has no data to * return, the return is EOF (`null`). */ readSync(p: Uint8Array): number | null { if (this.empty()) { // Buffer is empty, reset to recover space. this.reset(); if (p.byteLength === 0) { // this edge case is tested in 'bufferReadEmptyAtEOF' test return 0; } return null; } const nread = copy(this.#buf.subarray(this.#off), p); this.#off += nread; return nread; }
/** Reads the next `p.length` bytes from the buffer or until the buffer is * drained. Resolves to the number of bytes read. If the buffer has no * data to return, resolves to EOF (`null`). * * NOTE: This methods reads bytes synchronously; it's provided for * compatibility with `Reader` interfaces. */ read(p: Uint8Array): Promise<number | null> { const rr = this.readSync(p); return Promise.resolve(rr); }
writeSync(p: Uint8Array): number { const m = this.#grow(p.byteLength); return copy(p, this.#buf, m); }
/** NOTE: This methods writes bytes synchronously; it's provided for * compatibility with `Writer` interface. */ write(p: Uint8Array): Promise<number> { const n = this.writeSync(p); return Promise.resolve(n); }
#grow(n: number) { const m = this.length; // If buffer is empty, reset to recover space. if (m === 0 && this.#off !== 0) { this.reset(); } // Fast: Try to grow by means of a reslice. const i = this.#tryGrowByReslice(n); if (i >= 0) { return i; } const c = this.capacity; if (n <= Math.floor(c / 2) - m) { // We can slide things down instead of allocating a new // ArrayBuffer. We only need m+n <= c to slide, but // we instead let capacity get twice as large so we // don't spend all our time copying. copy(this.#buf.subarray(this.#off), this.#buf); } else if (c + n > MAX_SIZE) { throw new Error("The buffer cannot be grown beyond the maximum size."); } else { // Not enough space anywhere, we need to allocate. const buf = new Uint8Array(Math.min(2 * c + n, MAX_SIZE)); copy(this.#buf.subarray(this.#off), buf); this.#buf = buf; } // Restore this.#off and len(this.#buf). this.#off = 0; this.#reslice(Math.min(m + n, MAX_SIZE)); return m; }
/** Grows the buffer's capacity, if necessary, to guarantee space for * another `n` bytes. After `.grow(n)`, at least `n` bytes can be written to * the buffer without another allocation. If `n` is negative, `.grow()` will * throw. If the buffer can't grow it will throw an error. * * Based on Go Lang's * [Buffer.Grow](https://golang.org/pkg/bytes/#Buffer.Grow). */ grow(n: number): void { if (n < 0) { throw Error("Buffer.grow: negative count"); } const m = this.#grow(n); this.#reslice(m); }
/** Reads data from `r` until EOF (`null`) and appends it to the buffer, * growing the buffer as needed. It resolves to the number of bytes read. * If the buffer becomes too large, `.readFrom()` will reject with an error. * * Based on Go Lang's * [Buffer.ReadFrom](https://golang.org/pkg/bytes/#Buffer.ReadFrom). */ async readFrom(r: Reader): Promise<number> { let n = 0; const tmp = new Uint8Array(MIN_READ); while (true) { const shouldGrow = this.capacity - this.length < MIN_READ; // read into tmp buffer if there's not enough room // otherwise read directly into the internal buffer const buf = shouldGrow ? tmp : new Uint8Array(this.#buf.buffer, this.length);
const nread = await r.read(buf); if (nread === null) { return n; }
// write will grow if needed if (shouldGrow) this.writeSync(buf.subarray(0, nread)); else this.#reslice(this.length + nread);
n += nread; } }
/** Reads data from `r` until EOF (`null`) and appends it to the buffer, * growing the buffer as needed. It returns the number of bytes read. If the * buffer becomes too large, `.readFromSync()` will throw an error. * * Based on Go Lang's * [Buffer.ReadFrom](https://golang.org/pkg/bytes/#Buffer.ReadFrom). */ readFromSync(r: ReaderSync): number { let n = 0; const tmp = new Uint8Array(MIN_READ); while (true) { const shouldGrow = this.capacity - this.length < MIN_READ; // read into tmp buffer if there's not enough room // otherwise read directly into the internal buffer const buf = shouldGrow ? tmp : new Uint8Array(this.#buf.buffer, this.length);
const nread = r.readSync(buf); if (nread === null) { return n; }
// write will grow if needed if (shouldGrow) this.writeSync(buf.subarray(0, nread)); else this.#reslice(this.length + nread);
n += nread; } }}
const DEFAULT_BUF_SIZE = 4096;const MIN_BUF_SIZE = 16;const MAX_CONSECUTIVE_EMPTY_READS = 100;const CR = "\r".charCodeAt(0);const LF = "\n".charCodeAt(0);
export class BufferFullError extends Error { name = "BufferFullError"; constructor(public partial: Uint8Array) { super("Buffer full"); }}
export class PartialReadError extends Error { name = "PartialReadError"; partial?: Uint8Array; constructor() { super("Encountered UnexpectedEof, data only partially read"); }}
/** Result type returned by of BufReader.readLine(). */export interface ReadLineResult { line: Uint8Array; more: boolean;}
/** BufReader implements buffering for a Reader object. */export class BufReader implements Reader { #buf!: Uint8Array; #rd!: Reader; // Reader provided by caller. #r = 0; // buf read position. #w = 0; // buf write position. #eof = false; // private lastByte: number; // private lastCharSize: number;
/** return new BufReader unless r is BufReader */ static create(r: Reader, size: number = DEFAULT_BUF_SIZE): BufReader { return r instanceof BufReader ? r : new BufReader(r, size); }
constructor(rd: Reader, size: number = DEFAULT_BUF_SIZE) { if (size < MIN_BUF_SIZE) { size = MIN_BUF_SIZE; } this.#reset(new Uint8Array(size), rd); }
/** Returns the size of the underlying buffer in bytes. */ size(): number { return this.#buf.byteLength; }
buffered(): number { return this.#w - this.#r; }
// Reads a new chunk into the buffer. #fill = async () => { // Slide existing data to beginning. if (this.#r > 0) { this.#buf.copyWithin(0, this.#r, this.#w); this.#w -= this.#r; this.#r = 0; }
if (this.#w >= this.#buf.byteLength) { throw Error("bufio: tried to fill full buffer"); }
// Read new data: try a limited number of times. for (let i = MAX_CONSECUTIVE_EMPTY_READS; i > 0; i--) { const rr = await this.#rd.read(this.#buf.subarray(this.#w)); if (rr === null) { this.#eof = true; return; } assert(rr >= 0, "negative read"); this.#w += rr; if (rr > 0) { return; } }
throw new Error( `No progress after ${MAX_CONSECUTIVE_EMPTY_READS} read() calls`, ); };
/** Discards any buffered data, resets all state, and switches * the buffered reader to read from r. */ reset(r: Reader): void { this.#reset(this.#buf, r); }
#reset = (buf: Uint8Array, rd: Reader): void => { this.#buf = buf; this.#rd = rd; this.#eof = false; // this.lastByte = -1; // this.lastCharSize = -1; };
/** reads data into p. * It returns the number of bytes read into p. * The bytes are taken from at most one Read on the underlying Reader, * hence n may be less than len(p). * To read exactly len(p) bytes, use io.ReadFull(b, p). */ async read(p: Uint8Array): Promise<number | null> { let rr: number | null = p.byteLength; if (p.byteLength === 0) return rr;
if (this.#r === this.#w) { if (p.byteLength >= this.#buf.byteLength) { // Large read, empty buffer. // Read directly into p to avoid copy. const rr = await this.#rd.read(p); const nread = rr ?? 0; assert(nread >= 0, "negative read"); // if (rr.nread > 0) { // this.lastByte = p[rr.nread - 1]; // this.lastCharSize = -1; // } return rr; }
// One read. // Do not use this.fill, which will loop. this.#r = 0; this.#w = 0; rr = await this.#rd.read(this.#buf); if (rr === 0 || rr === null) return rr; assert(rr >= 0, "negative read"); this.#w += rr; }
// copy as much as we can const copied = copy(this.#buf.subarray(this.#r, this.#w), p, 0); this.#r += copied; // this.lastByte = this.buf[this.r - 1]; // this.lastCharSize = -1; return copied; }
/** reads exactly `p.length` bytes into `p`. * * If successful, `p` is returned. * * If the end of the underlying stream has been reached, and there are no more * bytes available in the buffer, `readFull()` returns `null` instead. * * An error is thrown if some bytes could be read, but not enough to fill `p` * entirely before the underlying stream reported an error or EOF. Any error * thrown will have a `partial` property that indicates the slice of the * buffer that has been successfully filled with data. * * Ported from https://golang.org/pkg/io/#ReadFull */ async readFull(p: Uint8Array): Promise<Uint8Array | null> { let bytesRead = 0; while (bytesRead < p.length) { try { const rr = await this.read(p.subarray(bytesRead)); if (rr === null) { if (bytesRead === 0) { return null; } else { throw new PartialReadError(); } } bytesRead += rr; } catch (err) { if (err instanceof PartialReadError) { err.partial = p.subarray(0, bytesRead); } else if (err instanceof Error) { const e = new PartialReadError(); e.partial = p.subarray(0, bytesRead); e.stack = err.stack; e.message = err.message; e.cause = err.cause; throw err; } throw err; } } return p; }
/** Returns the next byte [0, 255] or `null`. */ async readByte(): Promise<number | null> { while (this.#r === this.#w) { if (this.#eof) return null; await this.#fill(); // buffer is empty. } const c = this.#buf[this.#r]; this.#r++; // this.lastByte = c; return c; }
/** readString() reads until the first occurrence of delim in the input, * returning a string containing the data up to and including the delimiter. * If ReadString encounters an error before finding a delimiter, * it returns the data read before the error and the error itself * (often `null`). * ReadString returns err != nil if and only if the returned data does not end * in delim. * For simple uses, a Scanner may be more convenient. */ async readString(delim: string): Promise<string | null> { if (delim.length !== 1) { throw new Error("Delimiter should be a single character"); } const buffer = await this.readSlice(delim.charCodeAt(0)); if (buffer === null) return null; return new TextDecoder().decode(buffer); }
/** `readLine()` is a low-level line-reading primitive. Most callers should * use `readString('\n')` instead or use a Scanner. * * `readLine()` tries to return a single line, not including the end-of-line * bytes. If the line was too long for the buffer then `more` is set and the * beginning of the line is returned. The rest of the line will be returned * from future calls. `more` will be false when returning the last fragment * of the line. The returned buffer is only valid until the next call to * `readLine()`. * * The text returned from ReadLine does not include the line end ("\r\n" or * "\n"). * * When the end of the underlying stream is reached, the final bytes in the * stream are returned. No indication or error is given if the input ends * without a final line end. When there are no more trailing bytes to read, * `readLine()` returns `null`. * * Calling `unreadByte()` after `readLine()` will always unread the last byte * read (possibly a character belonging to the line end) even if that byte is * not part of the line returned by `readLine()`. */ async readLine(): Promise<ReadLineResult | null> { let line: Uint8Array | null = null;
try { line = await this.readSlice(LF); } catch (err) { if (err instanceof Deno.errors.BadResource) { throw err; } let partial; if (err instanceof PartialReadError) { partial = err.partial; assert( partial instanceof Uint8Array, "bufio: caught error from `readSlice()` without `partial` property", ); }
// Don't throw if `readSlice()` failed with `BufferFullError`, instead we // just return whatever is available and set the `more` flag. if (!(err instanceof BufferFullError)) { throw err; }
partial = err.partial;
// Handle the case where "\r\n" straddles the buffer. if ( !this.#eof && partial && partial.byteLength > 0 && partial[partial.byteLength - 1] === CR ) { // Put the '\r' back on buf and drop it from line. // Let the next call to ReadLine check for "\r\n". assert(this.#r > 0, "bufio: tried to rewind past start of buffer"); this.#r--; partial = partial.subarray(0, partial.byteLength - 1); }
if (partial) { return { line: partial, more: !this.#eof }; } }
if (line === null) { return null; }
if (line.byteLength === 0) { return { line, more: false }; }
if (line[line.byteLength - 1] == LF) { let drop = 1; if (line.byteLength > 1 && line[line.byteLength - 2] === CR) { drop = 2; } line = line.subarray(0, line.byteLength - drop); } return { line, more: false }; }
/** `readSlice()` reads until the first occurrence of `delim` in the input, * returning a slice pointing at the bytes in the buffer. The bytes stop * being valid at the next read. * * If `readSlice()` encounters an error before finding a delimiter, or the * buffer fills without finding a delimiter, it throws an error with a * `partial` property that contains the entire buffer. * * If `readSlice()` encounters the end of the underlying stream and there are * any bytes left in the buffer, the rest of the buffer is returned. In other * words, EOF is always treated as a delimiter. Once the buffer is empty, * it returns `null`. * * Because the data returned from `readSlice()` will be overwritten by the * next I/O operation, most clients should use `readString()` instead. */ async readSlice(delim: number): Promise<Uint8Array | null> { let s = 0; // search start index let slice: Uint8Array | undefined;
while (true) { // Search buffer. let i = this.#buf.subarray(this.#r + s, this.#w).indexOf(delim); if (i >= 0) { i += s; slice = this.#buf.subarray(this.#r, this.#r + i + 1); this.#r += i + 1; break; }
// EOF? if (this.#eof) { if (this.#r === this.#w) { return null; } slice = this.#buf.subarray(this.#r, this.#w); this.#r = this.#w; break; }
// Buffer full? if (this.buffered() >= this.#buf.byteLength) { this.#r = this.#w; // #4521 The internal buffer should not be reused across reads because it causes corruption of data. const oldbuf = this.#buf; const newbuf = this.#buf.slice(0); this.#buf = newbuf; throw new BufferFullError(oldbuf); }
s = this.#w - this.#r; // do not rescan area we scanned before
// Buffer is not full. try { await this.#fill(); } catch (err) { if (err instanceof PartialReadError) { err.partial = slice; } else if (err instanceof Error) { const e = new PartialReadError(); e.partial = slice; e.stack = err.stack; e.message = err.message; e.cause = err.cause; throw err; } throw err; } }
// Handle last byte, if any. // const i = slice.byteLength - 1; // if (i >= 0) { // this.lastByte = slice[i]; // this.lastCharSize = -1 // }
return slice; }
/** `peek()` returns the next `n` bytes without advancing the reader. The * bytes stop being valid at the next read call. * * When the end of the underlying stream is reached, but there are unread * bytes left in the buffer, those bytes are returned. If there are no bytes * left in the buffer, it returns `null`. * * If an error is encountered before `n` bytes are available, `peek()` throws * an error with the `partial` property set to a slice of the buffer that * contains the bytes that were available before the error occurred. */ async peek(n: number): Promise<Uint8Array | null> { if (n < 0) { throw Error("negative count"); }
let avail = this.#w - this.#r; while (avail < n && avail < this.#buf.byteLength && !this.#eof) { try { await this.#fill(); } catch (err) { if (err instanceof PartialReadError) { err.partial = this.#buf.subarray(this.#r, this.#w); } else if (err instanceof Error) { const e = new PartialReadError(); e.partial = this.#buf.subarray(this.#r, this.#w); e.stack = err.stack; e.message = err.message; e.cause = err.cause; throw err; } throw err; } avail = this.#w - this.#r; }
if (avail === 0 && this.#eof) { return null; } else if (avail < n && this.#eof) { return this.#buf.subarray(this.#r, this.#r + avail); } else if (avail < n) { throw new BufferFullError(this.#buf.subarray(this.#r, this.#w)); }
return this.#buf.subarray(this.#r, this.#r + n); }}
abstract class AbstractBufBase { buf: Uint8Array; usedBufferBytes = 0; err: Error | null = null;
constructor(buf: Uint8Array) { this.buf = buf; }
/** Size returns the size of the underlying buffer in bytes. */ size(): number { return this.buf.byteLength; }
/** Returns how many bytes are unused in the buffer. */ available(): number { return this.buf.byteLength - this.usedBufferBytes; }
/** buffered returns the number of bytes that have been written into the * current buffer. */ buffered(): number { return this.usedBufferBytes; }}
/** BufWriter implements buffering for an deno.Writer object. * If an error occurs writing to a Writer, no more data will be * accepted and all subsequent writes, and flush(), will return the error. * After all data has been written, the client should call the * flush() method to guarantee all data has been forwarded to * the underlying deno.Writer. */export class BufWriter extends AbstractBufBase implements Writer { #writer: Writer;
/** return new BufWriter unless writer is BufWriter */ static create(writer: Writer, size: number = DEFAULT_BUF_SIZE): BufWriter { return writer instanceof BufWriter ? writer : new BufWriter(writer, size); }
constructor(writer: Writer, size: number = DEFAULT_BUF_SIZE) { if (size <= 0) { size = DEFAULT_BUF_SIZE; } const buf = new Uint8Array(size); super(buf); this.#writer = writer; }
/** Discards any unflushed buffered data, clears any error, and * resets buffer to write its output to w. */ reset(w: Writer): void { this.err = null; this.usedBufferBytes = 0; this.#writer = w; }
/** Flush writes any buffered data to the underlying io.Writer. */ async flush() { if (this.err !== null) throw this.err; if (this.usedBufferBytes === 0) return;
try { const p = this.buf.subarray(0, this.usedBufferBytes); let nwritten = 0; while (nwritten < p.length) { nwritten += await this.#writer.write(p.subarray(nwritten)); } } catch (e) { if (e instanceof Error) { this.err = e; } throw e; }
this.buf = new Uint8Array(this.buf.length); this.usedBufferBytes = 0; }
/** Writes the contents of `data` into the buffer. If the contents won't fully * fit into the buffer, those bytes that can are copied into the buffer, the * buffer is the flushed to the writer and the remaining bytes are copied into * the now empty buffer. * * @return the number of bytes written to the buffer. */ async write(data: Uint8Array): Promise<number> { if (this.err !== null) throw this.err; if (data.length === 0) return 0;
let totalBytesWritten = 0; let numBytesWritten = 0; while (data.byteLength > this.available()) { if (this.buffered() === 0) { // Large write, empty buffer. // Write directly from data to avoid copy. try { numBytesWritten = await this.#writer.write(data); } catch (e) { if (e instanceof Error) { this.err = e; } throw e; } } else { numBytesWritten = copy(data, this.buf, this.usedBufferBytes); this.usedBufferBytes += numBytesWritten; await this.flush(); } totalBytesWritten += numBytesWritten; data = data.subarray(numBytesWritten); }
numBytesWritten = copy(data, this.buf, this.usedBufferBytes); this.usedBufferBytes += numBytesWritten; totalBytesWritten += numBytesWritten; return totalBytesWritten; }}
/** BufWriterSync implements buffering for a deno.WriterSync object. * If an error occurs writing to a WriterSync, no more data will be * accepted and all subsequent writes, and flush(), will return the error. * After all data has been written, the client should call the * flush() method to guarantee all data has been forwarded to * the underlying deno.WriterSync. */export class BufWriterSync extends AbstractBufBase implements WriterSync { #writer: WriterSync;
/** return new BufWriterSync unless writer is BufWriterSync */ static create( writer: WriterSync, size: number = DEFAULT_BUF_SIZE, ): BufWriterSync { return writer instanceof BufWriterSync ? writer : new BufWriterSync(writer, size); }
constructor(writer: WriterSync, size: number = DEFAULT_BUF_SIZE) { if (size <= 0) { size = DEFAULT_BUF_SIZE; } const buf = new Uint8Array(size); super(buf); this.#writer = writer; }
/** Discards any unflushed buffered data, clears any error, and * resets buffer to write its output to w. */ reset(w: WriterSync): void { this.err = null; this.usedBufferBytes = 0; this.#writer = w; }
/** Flush writes any buffered data to the underlying io.WriterSync. */ flush(): void { if (this.err !== null) throw this.err; if (this.usedBufferBytes === 0) return;
try { const p = this.buf.subarray(0, this.usedBufferBytes); let nwritten = 0; while (nwritten < p.length) { nwritten += this.#writer.writeSync(p.subarray(nwritten)); } } catch (e) { if (e instanceof Error) { this.err = e; } throw e; }
this.buf = new Uint8Array(this.buf.length); this.usedBufferBytes = 0; }
/** Writes the contents of `data` into the buffer. If the contents won't fully * fit into the buffer, those bytes that can are copied into the buffer, the * buffer is the flushed to the writer and the remaining bytes are copied into * the now empty buffer. * * @return the number of bytes written to the buffer. */ writeSync(data: Uint8Array): number { if (this.err !== null) throw this.err; if (data.length === 0) return 0;
let totalBytesWritten = 0; let numBytesWritten = 0; while (data.byteLength > this.available()) { if (this.buffered() === 0) { // Large write, empty buffer. // Write directly from data to avoid copy. try { numBytesWritten = this.#writer.writeSync(data); } catch (e) { if (e instanceof Error) { this.err = e; } throw e; } } else { numBytesWritten = copy(data, this.buf, this.usedBufferBytes); this.usedBufferBytes += numBytesWritten; this.flush(); } totalBytesWritten += numBytesWritten; data = data.subarray(numBytesWritten); }
numBytesWritten = copy(data, this.buf, this.usedBufferBytes); this.usedBufferBytes += numBytesWritten; totalBytesWritten += numBytesWritten; return totalBytesWritten; }}
/** Generate longest proper prefix which is also suffix array. */function createLPS(pat: Uint8Array): Uint8Array { const lps = new Uint8Array(pat.length); lps[0] = 0; let prefixEnd = 0; let i = 1; while (i < lps.length) { if (pat[i] == pat[prefixEnd]) { prefixEnd++; lps[i] = prefixEnd; i++; } else if (prefixEnd === 0) { lps[i] = 0; i++; } else { prefixEnd = lps[prefixEnd - 1]; } } return lps;}
/** Read delimited bytes from a Reader. */export async function* readDelim( reader: Reader, delim: Uint8Array,): AsyncIterableIterator<Uint8Array> { // Avoid unicode problems const delimLen = delim.length; const delimLPS = createLPS(delim); const chunks = new BytesList(); const bufSize = Math.max(1024, delimLen + 1);
// Modified KMP let inspectIndex = 0; let matchIndex = 0; while (true) { const inspectArr = new Uint8Array(bufSize); const result = await reader.read(inspectArr); if (result === null) { // Yield last chunk. yield chunks.concat(); return; } else if (result < 0) { // Discard all remaining and silently fail. return; } chunks.add(inspectArr, 0, result); let localIndex = 0; while (inspectIndex < chunks.size()) { if (inspectArr[localIndex] === delim[matchIndex]) { inspectIndex++; localIndex++; matchIndex++; if (matchIndex === delimLen) { // Full match const matchEnd = inspectIndex - delimLen; const readyBytes = chunks.slice(0, matchEnd); yield readyBytes; // Reset match, different from KMP. chunks.shift(inspectIndex); inspectIndex = 0; matchIndex = 0; } } else { if (matchIndex === 0) { inspectIndex++; localIndex++; } else { matchIndex = delimLPS[matchIndex - 1]; } } } }}
/** Read delimited strings from a Reader. */export async function* readStringDelim( reader: Reader, delim: string, decoderOpts?: { encoding?: string; fatal?: boolean; ignoreBOM?: boolean; },): AsyncIterableIterator<string> { const encoder = new TextEncoder(); const decoder = new TextDecoder(decoderOpts?.encoding, decoderOpts); for await (const chunk of readDelim(reader, encoder.encode(delim))) { yield decoder.decode(chunk); }}
/** Read strings line-by-line from a Reader. */export async function* readLines( reader: Reader, decoderOpts?: { encoding?: string; fatal?: boolean; ignoreBOM?: boolean; },): AsyncIterableIterator<string> { const bufReader = new BufReader(reader); let chunks: Uint8Array[] = []; const decoder = new TextDecoder(decoderOpts?.encoding, decoderOpts); while (true) { const res = await bufReader.readLine(); if (!res) { if (chunks.length > 0) { yield decoder.decode(concat(...chunks)); } break; } chunks.push(res.line); if (!res.more) { yield decoder.decode(concat(...chunks)); chunks = []; } }}