import { Buffer } from "../buffer.ts";import { notImplemented } from "../_utils.ts";import { HandleWrap } from "./handle_wrap.ts";import { AsyncWrap, providerType } from "./async_wrap.ts";import { codeMap } from "./uv.ts";import { writeAll } from "../../streams/conversion.ts";
enum StreamBaseStateFields { kReadBytesOrError, kArrayBufferOffset, kBytesWritten, kLastWriteWasAsync, kNumStreamBaseStateFields,}
export const kReadBytesOrError = StreamBaseStateFields.kReadBytesOrError;export const kArrayBufferOffset = StreamBaseStateFields.kArrayBufferOffset;export const kBytesWritten = StreamBaseStateFields.kBytesWritten;export const kLastWriteWasAsync = StreamBaseStateFields.kLastWriteWasAsync;export const kNumStreamBaseStateFields = StreamBaseStateFields.kNumStreamBaseStateFields;
export const streamBaseState = new Uint8Array(5);
streamBaseState[kLastWriteWasAsync] = 1;
export class WriteWrap<H extends HandleWrap> extends AsyncWrap { handle!: H; oncomplete!: (status: number) => void; async!: boolean; bytes!: number; buffer!: unknown; callback!: unknown; _chunks!: unknown[];
constructor() { super(providerType.WRITEWRAP); }}
export class ShutdownWrap<H extends HandleWrap> extends AsyncWrap { handle!: H; oncomplete!: (status: number) => void; callback!: () => void;
constructor() { super(providerType.SHUTDOWNWRAP); }}
export const kStreamBaseField = Symbol("kStreamBaseField");
const SUGGESTED_SIZE = 64 * 1024;
export class LibuvStreamWrap extends HandleWrap { [kStreamBaseField]?: Deno.Reader & Deno.Writer & Deno.Closer;
reading!: boolean; #reading = false; #currentReads: Set<Promise<void>> = new Set(); #currentWrites: Set<Promise<void>> = new Set(); destroyed = false; writeQueueSize = 0; bytesRead = 0; bytesWritten = 0;
onread!: (_arrayBuffer: Uint8Array, _nread: number) => Uint8Array | undefined;
constructor( provider: providerType, stream?: Deno.Reader & Deno.Writer & Deno.Closer, ) { super(provider); this.#attachToObject(stream); }
readStart(): number { if (!this.#reading) { this.#reading = true; const readPromise = this.#read(); this.#currentReads.add(readPromise); readPromise.then( () => this.#currentReads.delete(readPromise), () => this.#currentReads.delete(readPromise), ); }
return 0; }
readStop(): number { this.#reading = false;
return 0; }
shutdown(req: ShutdownWrap<LibuvStreamWrap>): number { (async () => { const status = await this._onClose();
try { req.oncomplete(status); } catch { } })();
return 0; }
useUserBuffer(_userBuf: unknown): number { notImplemented(); }
writeBuffer(req: WriteWrap<LibuvStreamWrap>, data: Uint8Array): number { const currentWrite = this.#write(req, data); this.#currentWrites.add(currentWrite); currentWrite.then( () => this.#currentWrites.delete(currentWrite), () => this.#currentWrites.delete(currentWrite), );
return 0; }
writev( _req: WriteWrap<LibuvStreamWrap>, _chunks: any, _allBuffers: boolean, ): number { notImplemented(); }
writeAsciiString(req: WriteWrap<LibuvStreamWrap>, data: string): number { const buffer = new TextEncoder().encode(data);
return this.writeBuffer(req, buffer); }
writeUtf8String(req: WriteWrap<LibuvStreamWrap>, data: string): number { const buffer = new TextEncoder().encode(data);
return this.writeBuffer(req, buffer); }
writeUcs2String(_req: WriteWrap<LibuvStreamWrap>, _data: string): number { notImplemented(); }
writeLatin1String(req: WriteWrap<LibuvStreamWrap>, data: string): number { const buffer = Buffer.from(data, "latin1"); return this.writeBuffer(req, buffer); }
override async _onClose(): Promise<number> { let status = 0; this.#reading = false;
try { this[kStreamBaseField]?.close(); } catch { status = codeMap.get("ENOTCONN")!; }
await Promise.allSettled(this.#currentWrites); await Promise.allSettled(this.#currentReads);
return status; }
#attachToObject(stream?: Deno.Reader & Deno.Writer & Deno.Closer): void { this[kStreamBaseField] = stream; }
async #read(): Promise<void> { let buf = new Uint8Array(SUGGESTED_SIZE);
let nread: number | null; try { nread = await this[kStreamBaseField]!.read(buf); } catch (e) { if ( e instanceof Deno.errors.Interrupted || e instanceof Deno.errors.BadResource ) { nread = codeMap.get("EOF")!; } else { nread = codeMap.get("UNKNOWN")!; }
buf = new Uint8Array(0); }
nread ??= codeMap.get("EOF")!;
streamBaseState[kReadBytesOrError] = nread;
if (nread > 0) { this.bytesRead += nread; }
buf = buf.slice(0, nread);
streamBaseState[kArrayBufferOffset] = 0;
try { this.onread!(buf, nread); } catch { }
if (nread >= 0 && this.#reading) { const readPromise = this.#read(); this.#currentReads.add(readPromise); readPromise.then( () => this.#currentReads.delete(readPromise), () => this.#currentReads.delete(readPromise), ); } }
async #write(req: WriteWrap<LibuvStreamWrap>, data: Uint8Array) { const { byteLength } = data;
try { await writeAll(this[kStreamBaseField]!, data); } catch { const status = codeMap.get("UNKNOWN")!;
try { req.oncomplete(status); } catch { }
return; }
streamBaseState[kBytesWritten] = byteLength; this.bytesWritten += byteLength;
try { req.oncomplete(0); } catch { }
return; }}