import { Buffer } from "../buffer.ts";import { notImplemented } from "../_utils.ts";import { HandleWrap } from "./handle_wrap.ts";import { AsyncWrap, providerType } from "./async_wrap.ts";import { codeMap } from "./uv.ts";import { writeAll } from "../../streams/write_all.ts";
enum StreamBaseStateFields { kReadBytesOrError, kArrayBufferOffset, kBytesWritten, kLastWriteWasAsync, kNumStreamBaseStateFields,}
export const kReadBytesOrError = StreamBaseStateFields.kReadBytesOrError;export const kArrayBufferOffset = StreamBaseStateFields.kArrayBufferOffset;export const kBytesWritten = StreamBaseStateFields.kBytesWritten;export const kLastWriteWasAsync = StreamBaseStateFields.kLastWriteWasAsync;export const kNumStreamBaseStateFields = StreamBaseStateFields.kNumStreamBaseStateFields;
export const streamBaseState = new Uint8Array(5);
streamBaseState[kLastWriteWasAsync] = 1;
export class WriteWrap<H extends HandleWrap> extends AsyncWrap { handle!: H; oncomplete!: (status: number) => void; async!: boolean; bytes!: number; buffer!: unknown; callback!: unknown; _chunks!: unknown[];
constructor() { super(providerType.WRITEWRAP); }}
export class ShutdownWrap<H extends HandleWrap> extends AsyncWrap { handle!: H; oncomplete!: (status: number) => void; callback!: () => void;
constructor() { super(providerType.SHUTDOWNWRAP); }}
export const kStreamBaseField = Symbol("kStreamBaseField");
const SUGGESTED_SIZE = 64 * 1024;
export class LibuvStreamWrap extends HandleWrap { [kStreamBaseField]?: Deno.Reader & Deno.Writer & Deno.Closer;
reading!: boolean; #reading = false; destroyed = false; writeQueueSize = 0; bytesRead = 0; bytesWritten = 0;
onread!: (_arrayBuffer: Uint8Array, _nread: number) => Uint8Array | undefined;
constructor( provider: providerType, stream?: Deno.Reader & Deno.Writer & Deno.Closer, ) { super(provider); this.#attachToObject(stream); }
readStart(): number { if (!this.#reading) { this.#reading = true; this.#read(); }
return 0; }
readStop(): number { this.#reading = false;
return 0; }
shutdown(req: ShutdownWrap<LibuvStreamWrap>): number { const status = this._onClose();
try { req.oncomplete(status); } catch { }
return 0; }
useUserBuffer(_userBuf: unknown): number { notImplemented("LibuvStreamWrap.prototype.useUserBuffer"); }
writeBuffer(req: WriteWrap<LibuvStreamWrap>, data: Uint8Array): number { this.#write(req, data);
return 0; }
writev( req: WriteWrap<LibuvStreamWrap>, chunks: Buffer[] | (string | Buffer)[], allBuffers: boolean, ): number { const count = allBuffers ? chunks.length : chunks.length >> 1; const buffers: Buffer[] = new Array(count);
if (!allBuffers) { for (let i = 0; i < count; i++) { const chunk = chunks[i * 2];
if (Buffer.isBuffer(chunk)) { buffers[i] = chunk; }
const encoding: string = chunks[i * 2 + 1] as string; buffers[i] = Buffer.from(chunk as string, encoding); } } else { for (let i = 0; i < count; i++) { buffers[i] = chunks[i] as Buffer; } }
return this.writeBuffer(req, Buffer.concat(buffers)); }
writeAsciiString(req: WriteWrap<LibuvStreamWrap>, data: string): number { const buffer = new TextEncoder().encode(data);
return this.writeBuffer(req, buffer); }
writeUtf8String(req: WriteWrap<LibuvStreamWrap>, data: string): number { const buffer = new TextEncoder().encode(data);
return this.writeBuffer(req, buffer); }
writeUcs2String(_req: WriteWrap<LibuvStreamWrap>, _data: string): number { notImplemented("LibuvStreamWrap.prototype.writeUcs2String"); }
writeLatin1String(req: WriteWrap<LibuvStreamWrap>, data: string): number { const buffer = Buffer.from(data, "latin1"); return this.writeBuffer(req, buffer); }
override _onClose(): number { let status = 0; this.#reading = false;
try { this[kStreamBaseField]?.close(); } catch { status = codeMap.get("ENOTCONN")!; }
return status; }
#attachToObject(stream?: Deno.Reader & Deno.Writer & Deno.Closer) { this[kStreamBaseField] = stream; }
async #read() { let buf = new Uint8Array(SUGGESTED_SIZE);
let nread: number | null; try { nread = await this[kStreamBaseField]!.read(buf); } catch (e) { if ( e instanceof Deno.errors.Interrupted || e instanceof Deno.errors.BadResource ) { nread = codeMap.get("EOF")!; } else if ( e instanceof Deno.errors.ConnectionReset || e instanceof Deno.errors.ConnectionAborted ) { nread = codeMap.get("ECONNRESET")!; } else { nread = codeMap.get("UNKNOWN")!; }
buf = new Uint8Array(0); }
nread ??= codeMap.get("EOF")!;
streamBaseState[kReadBytesOrError] = nread;
if (nread > 0) { this.bytesRead += nread; }
buf = buf.slice(0, nread);
streamBaseState[kArrayBufferOffset] = 0;
try { this.onread!(buf, nread); } catch { }
if (nread >= 0 && this.#reading) { this.#read(); } }
async #write(req: WriteWrap<LibuvStreamWrap>, data: Uint8Array) { const { byteLength } = data;
try { await writeAll(this[kStreamBaseField]!, data); } catch (e) { let status: number;
if ( e instanceof Deno.errors.BadResource || e instanceof Deno.errors.BrokenPipe ) { status = codeMap.get("EBADF")!; } else { status = codeMap.get("UNKNOWN")!; }
try { req.oncomplete(status); } catch { }
return; }
streamBaseState[kBytesWritten] = byteLength; this.bytesWritten += byteLength;
try { req.oncomplete(0); } catch { }
return; }}