import { assert } from "../_util/assert.ts";import { BytesList } from "../bytes/bytes_list.ts";import { concat, copy } from "../bytes/mod.ts";import type { Reader, ReaderSync, Writer, WriterSync } from "./types.d.ts";
const MIN_READ = 32 * 1024;const MAX_SIZE = 2 ** 32 - 2;
export class Buffer { #buf: Uint8Array; #off = 0;
constructor(ab?: ArrayBufferLike | ArrayLike<number>) { this.#buf = ab === undefined ? new Uint8Array(0) : new Uint8Array(ab); }
bytes(options = { copy: true }): Uint8Array { if (options.copy === false) return this.#buf.subarray(this.#off); return this.#buf.slice(this.#off); }
empty(): boolean { return this.#buf.byteLength <= this.#off; }
get length(): number { return this.#buf.byteLength - this.#off; }
get capacity(): number { return this.#buf.buffer.byteLength; }
truncate(n: number): void { if (n === 0) { this.reset(); return; } if (n < 0 || n > this.length) { throw Error("bytes.Buffer: truncation out of range"); } this.#reslice(this.#off + n); }
reset(): void { this.#reslice(0); this.#off = 0; }
#tryGrowByReslice(n: number) { const l = this.#buf.byteLength; if (n <= this.capacity - l) { this.#reslice(l + n); return l; } return -1; }
#reslice(len: number) { assert(len <= this.#buf.buffer.byteLength); this.#buf = new Uint8Array(this.#buf.buffer, 0, len); }
readSync(p: Uint8Array): number | null { if (this.empty()) { this.reset(); if (p.byteLength === 0) { return 0; } return null; } const nread = copy(this.#buf.subarray(this.#off), p); this.#off += nread; return nread; }
read(p: Uint8Array): Promise<number | null> { const rr = this.readSync(p); return Promise.resolve(rr); }
writeSync(p: Uint8Array): number { const m = this.#grow(p.byteLength); return copy(p, this.#buf, m); }
write(p: Uint8Array): Promise<number> { const n = this.writeSync(p); return Promise.resolve(n); }
#grow(n: number) { const m = this.length; if (m === 0 && this.#off !== 0) { this.reset(); } const i = this.#tryGrowByReslice(n); if (i >= 0) { return i; } const c = this.capacity; if (n <= Math.floor(c / 2) - m) { copy(this.#buf.subarray(this.#off), this.#buf); } else if (c + n > MAX_SIZE) { throw new Error("The buffer cannot be grown beyond the maximum size."); } else { const buf = new Uint8Array(Math.min(2 * c + n, MAX_SIZE)); copy(this.#buf.subarray(this.#off), buf); this.#buf = buf; } this.#off = 0; this.#reslice(Math.min(m + n, MAX_SIZE)); return m; }
grow(n: number): void { if (n < 0) { throw Error("Buffer.grow: negative count"); } const m = this.#grow(n); this.#reslice(m); }
async readFrom(r: Reader): Promise<number> { let n = 0; const tmp = new Uint8Array(MIN_READ); while (true) { const shouldGrow = this.capacity - this.length < MIN_READ; const buf = shouldGrow ? tmp : new Uint8Array(this.#buf.buffer, this.length);
const nread = await r.read(buf); if (nread === null) { return n; }
if (shouldGrow) this.writeSync(buf.subarray(0, nread)); else this.#reslice(this.length + nread);
n += nread; } }
readFromSync(r: ReaderSync): number { let n = 0; const tmp = new Uint8Array(MIN_READ); while (true) { const shouldGrow = this.capacity - this.length < MIN_READ; const buf = shouldGrow ? tmp : new Uint8Array(this.#buf.buffer, this.length);
const nread = r.readSync(buf); if (nread === null) { return n; }
if (shouldGrow) this.writeSync(buf.subarray(0, nread)); else this.#reslice(this.length + nread);
n += nread; } }}
const DEFAULT_BUF_SIZE = 4096;const MIN_BUF_SIZE = 16;const MAX_CONSECUTIVE_EMPTY_READS = 100;const CR = "\r".charCodeAt(0);const LF = "\n".charCodeAt(0);
export class BufferFullError extends Error { name = "BufferFullError"; constructor(public partial: Uint8Array) { super("Buffer full"); }}
export class PartialReadError extends Error { name = "PartialReadError"; partial?: Uint8Array; constructor() { super("Encountered UnexpectedEof, data only partially read"); }}
export interface ReadLineResult { line: Uint8Array; more: boolean;}
export class BufReader implements Reader { private buf!: Uint8Array; private rd!: Reader; private r = 0; private w = 0; private eof = false;
static create(r: Reader, size: number = DEFAULT_BUF_SIZE): BufReader { return r instanceof BufReader ? r : new BufReader(r, size); }
constructor(rd: Reader, size: number = DEFAULT_BUF_SIZE) { if (size < MIN_BUF_SIZE) { size = MIN_BUF_SIZE; } this._reset(new Uint8Array(size), rd); }
size(): number { return this.buf.byteLength; }
buffered(): number { return this.w - this.r; }
private async _fill() { if (this.r > 0) { this.buf.copyWithin(0, this.r, this.w); this.w -= this.r; this.r = 0; }
if (this.w >= this.buf.byteLength) { throw Error("bufio: tried to fill full buffer"); }
for (let i = MAX_CONSECUTIVE_EMPTY_READS; i > 0; i--) { const rr = await this.rd.read(this.buf.subarray(this.w)); if (rr === null) { this.eof = true; return; } assert(rr >= 0, "negative read"); this.w += rr; if (rr > 0) { return; } }
throw new Error( `No progress after ${MAX_CONSECUTIVE_EMPTY_READS} read() calls`, ); }
reset(r: Reader): void { this._reset(this.buf, r); }
private _reset(buf: Uint8Array, rd: Reader): void { this.buf = buf; this.rd = rd; this.eof = false; }
async read(p: Uint8Array): Promise<number | null> { let rr: number | null = p.byteLength; if (p.byteLength === 0) return rr;
if (this.r === this.w) { if (p.byteLength >= this.buf.byteLength) { const rr = await this.rd.read(p); const nread = rr ?? 0; assert(nread >= 0, "negative read"); return rr; }
this.r = 0; this.w = 0; rr = await this.rd.read(this.buf); if (rr === 0 || rr === null) return rr; assert(rr >= 0, "negative read"); this.w += rr; }
const copied = copy(this.buf.subarray(this.r, this.w), p, 0); this.r += copied; return copied; }
async readFull(p: Uint8Array): Promise<Uint8Array | null> { let bytesRead = 0; while (bytesRead < p.length) { try { const rr = await this.read(p.subarray(bytesRead)); if (rr === null) { if (bytesRead === 0) { return null; } else { throw new PartialReadError(); } } bytesRead += rr; } catch (err) { if (err instanceof PartialReadError) { err.partial = p.subarray(0, bytesRead); } else if (err instanceof Error) { const e = new PartialReadError(); e.partial = p.subarray(0, bytesRead); e.stack = err.stack; e.message = err.message; e.cause = err.cause; throw err; } throw err; } } return p; }
async readByte(): Promise<number | null> { while (this.r === this.w) { if (this.eof) return null; await this._fill(); } const c = this.buf[this.r]; this.r++; return c; }
async readString(delim: string): Promise<string | null> { if (delim.length !== 1) { throw new Error("Delimiter should be a single character"); } const buffer = await this.readSlice(delim.charCodeAt(0)); if (buffer === null) return null; return new TextDecoder().decode(buffer); }
async readLine(): Promise<ReadLineResult | null> { let line: Uint8Array | null = null;
try { line = await this.readSlice(LF); } catch (err) { if (err instanceof Deno.errors.BadResource) { throw err; } let partial; if (err instanceof PartialReadError) { partial = err.partial; assert( partial instanceof Uint8Array, "bufio: caught error from `readSlice()` without `partial` property", ); }
if (!(err instanceof BufferFullError)) { throw err; }
partial = err.partial;
if ( !this.eof && partial && partial.byteLength > 0 && partial[partial.byteLength - 1] === CR ) { assert(this.r > 0, "bufio: tried to rewind past start of buffer"); this.r--; partial = partial.subarray(0, partial.byteLength - 1); }
if (partial) { return { line: partial, more: !this.eof }; } }
if (line === null) { return null; }
if (line.byteLength === 0) { return { line, more: false }; }
if (line[line.byteLength - 1] == LF) { let drop = 1; if (line.byteLength > 1 && line[line.byteLength - 2] === CR) { drop = 2; } line = line.subarray(0, line.byteLength - drop); } return { line, more: false }; }
async readSlice(delim: number): Promise<Uint8Array | null> { let s = 0; let slice: Uint8Array | undefined;
while (true) { let i = this.buf.subarray(this.r + s, this.w).indexOf(delim); if (i >= 0) { i += s; slice = this.buf.subarray(this.r, this.r + i + 1); this.r += i + 1; break; }
if (this.eof) { if (this.r === this.w) { return null; } slice = this.buf.subarray(this.r, this.w); this.r = this.w; break; }
if (this.buffered() >= this.buf.byteLength) { this.r = this.w; const oldbuf = this.buf; const newbuf = this.buf.slice(0); this.buf = newbuf; throw new BufferFullError(oldbuf); }
s = this.w - this.r;
try { await this._fill(); } catch (err) { if (err instanceof PartialReadError) { err.partial = slice; } else if (err instanceof Error) { const e = new PartialReadError(); e.partial = slice; e.stack = err.stack; e.message = err.message; e.cause = err.cause; throw err; } throw err; } }
return slice; }
async peek(n: number): Promise<Uint8Array | null> { if (n < 0) { throw Error("negative count"); }
let avail = this.w - this.r; while (avail < n && avail < this.buf.byteLength && !this.eof) { try { await this._fill(); } catch (err) { if (err instanceof PartialReadError) { err.partial = this.buf.subarray(this.r, this.w); } else if (err instanceof Error) { const e = new PartialReadError(); e.partial = this.buf.subarray(this.r, this.w); e.stack = err.stack; e.message = err.message; e.cause = err.cause; throw err; } throw err; } avail = this.w - this.r; }
if (avail === 0 && this.eof) { return null; } else if (avail < n && this.eof) { return this.buf.subarray(this.r, this.r + avail); } else if (avail < n) { throw new BufferFullError(this.buf.subarray(this.r, this.w)); }
return this.buf.subarray(this.r, this.r + n); }}
abstract class AbstractBufBase { buf!: Uint8Array; usedBufferBytes = 0; err: Error | null = null;
size(): number { return this.buf.byteLength; }
available(): number { return this.buf.byteLength - this.usedBufferBytes; }
buffered(): number { return this.usedBufferBytes; }}
export class BufWriter extends AbstractBufBase implements Writer { static create(writer: Writer, size: number = DEFAULT_BUF_SIZE): BufWriter { return writer instanceof BufWriter ? writer : new BufWriter(writer, size); }
constructor(private writer: Writer, size: number = DEFAULT_BUF_SIZE) { super(); if (size <= 0) { size = DEFAULT_BUF_SIZE; } this.buf = new Uint8Array(size); }
reset(w: Writer): void { this.err = null; this.usedBufferBytes = 0; this.writer = w; }
async flush() { if (this.err !== null) throw this.err; if (this.usedBufferBytes === 0) return;
try { const p = this.buf.subarray(0, this.usedBufferBytes); let nwritten = 0; while (nwritten < p.length) { nwritten += await this.writer.write(p.subarray(nwritten)); } } catch (e) { if (e instanceof Error) { this.err = e; } throw e; }
this.buf = new Uint8Array(this.buf.length); this.usedBufferBytes = 0; }
async write(data: Uint8Array): Promise<number> { if (this.err !== null) throw this.err; if (data.length === 0) return 0;
let totalBytesWritten = 0; let numBytesWritten = 0; while (data.byteLength > this.available()) { if (this.buffered() === 0) { try { numBytesWritten = await this.writer.write(data); } catch (e) { if (e instanceof Error) { this.err = e; } throw e; } } else { numBytesWritten = copy(data, this.buf, this.usedBufferBytes); this.usedBufferBytes += numBytesWritten; await this.flush(); } totalBytesWritten += numBytesWritten; data = data.subarray(numBytesWritten); }
numBytesWritten = copy(data, this.buf, this.usedBufferBytes); this.usedBufferBytes += numBytesWritten; totalBytesWritten += numBytesWritten; return totalBytesWritten; }}
export class BufWriterSync extends AbstractBufBase implements WriterSync { static create( writer: WriterSync, size: number = DEFAULT_BUF_SIZE, ): BufWriterSync { return writer instanceof BufWriterSync ? writer : new BufWriterSync(writer, size); }
constructor(private writer: WriterSync, size: number = DEFAULT_BUF_SIZE) { super(); if (size <= 0) { size = DEFAULT_BUF_SIZE; } this.buf = new Uint8Array(size); }
reset(w: WriterSync): void { this.err = null; this.usedBufferBytes = 0; this.writer = w; }
flush(): void { if (this.err !== null) throw this.err; if (this.usedBufferBytes === 0) return;
try { const p = this.buf.subarray(0, this.usedBufferBytes); let nwritten = 0; while (nwritten < p.length) { nwritten += this.writer.writeSync(p.subarray(nwritten)); } } catch (e) { if (e instanceof Error) { this.err = e; } throw e; }
this.buf = new Uint8Array(this.buf.length); this.usedBufferBytes = 0; }
writeSync(data: Uint8Array): number { if (this.err !== null) throw this.err; if (data.length === 0) return 0;
let totalBytesWritten = 0; let numBytesWritten = 0; while (data.byteLength > this.available()) { if (this.buffered() === 0) { try { numBytesWritten = this.writer.writeSync(data); } catch (e) { if (e instanceof Error) { this.err = e; } throw e; } } else { numBytesWritten = copy(data, this.buf, this.usedBufferBytes); this.usedBufferBytes += numBytesWritten; this.flush(); } totalBytesWritten += numBytesWritten; data = data.subarray(numBytesWritten); }
numBytesWritten = copy(data, this.buf, this.usedBufferBytes); this.usedBufferBytes += numBytesWritten; totalBytesWritten += numBytesWritten; return totalBytesWritten; }}
function createLPS(pat: Uint8Array): Uint8Array { const lps = new Uint8Array(pat.length); lps[0] = 0; let prefixEnd = 0; let i = 1; while (i < lps.length) { if (pat[i] == pat[prefixEnd]) { prefixEnd++; lps[i] = prefixEnd; i++; } else if (prefixEnd === 0) { lps[i] = 0; i++; } else { prefixEnd = lps[prefixEnd - 1]; } } return lps;}
export async function* readDelim( reader: Reader, delim: Uint8Array,): AsyncIterableIterator<Uint8Array> { const delimLen = delim.length; const delimLPS = createLPS(delim); const chunks = new BytesList(); const bufSize = Math.max(1024, delimLen + 1);
let inspectIndex = 0; let matchIndex = 0; while (true) { const inspectArr = new Uint8Array(bufSize); const result = await reader.read(inspectArr); if (result === null) { yield chunks.concat(); return; } else if (result < 0) { return; } chunks.add(inspectArr, 0, result); let localIndex = 0; while (inspectIndex < chunks.size()) { if (inspectArr[localIndex] === delim[matchIndex]) { inspectIndex++; localIndex++; matchIndex++; if (matchIndex === delimLen) { const matchEnd = inspectIndex - delimLen; const readyBytes = chunks.slice(0, matchEnd); yield readyBytes; chunks.shift(inspectIndex); inspectIndex = 0; matchIndex = 0; } } else { if (matchIndex === 0) { inspectIndex++; localIndex++; } else { matchIndex = delimLPS[matchIndex - 1]; } } } }}
export async function* readStringDelim( reader: Reader, delim: string, decoderOpts?: { encoding?: string; fatal?: boolean; ignoreBOM?: boolean; },): AsyncIterableIterator<string> { const encoder = new TextEncoder(); const decoder = new TextDecoder(decoderOpts?.encoding, decoderOpts); for await (const chunk of readDelim(reader, encoder.encode(delim))) { yield decoder.decode(chunk); }}
export async function* readLines( reader: Reader, decoderOpts?: { encoding?: string; fatal?: boolean; ignoreBOM?: boolean; },): AsyncIterableIterator<string> { const bufReader = new BufReader(reader); let chunks: Uint8Array[] = []; const decoder = new TextDecoder(decoderOpts?.encoding, decoderOpts); while (true) { const res = await bufReader.readLine(); if (!res) { if (chunks.length > 0) { yield decoder.decode(concat(...chunks)); } break; } chunks.push(res.line); if (!res.more) { yield decoder.decode(concat(...chunks)); chunks = []; } }}