import { notImplemented } from "../_utils.ts";import { unreachable } from "../../testing/asserts.ts";import { ConnectionWrap } from "./connection_wrap.ts";import { AsyncWrap, providerType } from "./async_wrap.ts";import { LibuvStreamWrap } from "./stream_wrap.ts";import { codeMap } from "./uv.ts";import { delay } from "../../async/mod.ts";import { kStreamBaseField } from "./stream_wrap.ts";import { ceilPowOf2, INITIAL_ACCEPT_BACKOFF_DELAY, MAX_ACCEPT_BACKOFF_DELAY,} from "./_listen.ts";import { isWindows } from "../../_util/os.ts";import { fs } from "./constants.ts";
export enum socketType { SOCKET, SERVER, IPC,}
export class Pipe extends ConnectionWrap { override reading = false; ipc: boolean;
#pendingInstances = 4;
#address?: string;
#backlog?: number; #listener!: Deno.Listener; #connections = 0;
#closed = false; #acceptBackoffDelay?: number;
constructor(type: number, conn?: Deno.UnixConn) { let provider: providerType; let ipc: boolean;
switch (type) { case socketType.SOCKET: { provider = providerType.PIPEWRAP; ipc = false;
break; } case socketType.SERVER: { provider = providerType.PIPESERVERWRAP; ipc = false;
break; } case socketType.IPC: { provider = providerType.PIPEWRAP; ipc = true;
break; } default: { unreachable(); } }
super(provider, conn);
this.ipc = ipc;
if (conn && provider === providerType.PIPEWRAP) { const localAddr = conn.localAddr as Deno.UnixAddr; this.#address = localAddr.path; } }
open(_fd: number): number { notImplemented("Pipe.prototype.open"); }
bind(name: string) {
this.#address = name;
return 0; }
connect(req: PipeConnectWrap, address: string) { if (isWindows) { notImplemented("Pipe.prototype.connect - Windows"); }
const connectOptions: Deno.UnixConnectOptions = { path: address, transport: "unix", };
Deno.connect(connectOptions).then( (conn: Deno.UnixConn) => { const localAddr = conn.localAddr as Deno.UnixAddr;
this.#address = req.address = localAddr.path; this[kStreamBaseField] = conn;
try { this.afterConnect(req, 0); } catch { } }, (e) => { let code: number;
if (e instanceof Deno.errors.NotFound) { code = codeMap.get("ENOENT")!; } else if (e instanceof Deno.errors.PermissionDenied) { code = codeMap.get("EACCES")!; } else { code = codeMap.get("ECONNREFUSED")!; }
try { this.afterConnect(req, code); } catch { } }, );
return 0; }
listen(backlog: number): number { if (isWindows) { notImplemented("Pipe.prototype.listen - Windows"); }
this.#backlog = isWindows ? this.#pendingInstances : ceilPowOf2(backlog + 1);
const listenOptions = { path: this.#address!, transport: "unix" as const, };
let listener;
try { listener = Deno.listen(listenOptions); } catch (e) { if (e instanceof Deno.errors.AddrInUse) { return codeMap.get("EADDRINUSE")!; } else if (e instanceof Deno.errors.AddrNotAvailable) { return codeMap.get("EADDRNOTAVAIL")!; }
return codeMap.get("UNKNOWN")!; }
const address = listener.addr as Deno.UnixAddr; this.#address = address.path;
this.#listener = listener; this.#accept();
return 0; }
override ref() { if (this.#listener) { this.#listener.ref(); } }
override unref() { if (this.#listener) { this.#listener.unref(); } }
setPendingInstances(instances: number) { this.#pendingInstances = instances; }
fchmod(mode: number) { if ( mode != constants.UV_READABLE && mode != constants.UV_WRITABLE && mode != (constants.UV_WRITABLE | constants.UV_READABLE) ) { return codeMap.get("EINVAL"); }
let desired_mode = 0;
if (mode & constants.UV_READABLE) { desired_mode |= fs.S_IRUSR | fs.S_IRGRP | fs.S_IROTH; } if (mode & constants.UV_WRITABLE) { desired_mode |= fs.S_IWUSR | fs.S_IWGRP | fs.S_IWOTH; }
try { Deno.chmodSync(this.#address!, desired_mode); } catch { return codeMap.get("UNKNOWN")!; }
return 0; }
async #acceptBackoff() { if (!this.#acceptBackoffDelay) { this.#acceptBackoffDelay = INITIAL_ACCEPT_BACKOFF_DELAY; } else { this.#acceptBackoffDelay *= 2; }
if (this.#acceptBackoffDelay >= MAX_ACCEPT_BACKOFF_DELAY) { this.#acceptBackoffDelay = MAX_ACCEPT_BACKOFF_DELAY; }
await delay(this.#acceptBackoffDelay);
this.#accept(); }
async #accept(): Promise<void> { if (this.#closed) { return; }
if (this.#connections > this.#backlog!) { this.#acceptBackoff();
return; }
let connection: Deno.Conn;
try { connection = await this.#listener.accept(); } catch (e) { if (e instanceof Deno.errors.BadResource && this.#closed) { return; }
try { this.onconnection!(codeMap.get("UNKNOWN")!, undefined); } catch { }
this.#acceptBackoff();
return; }
this.#acceptBackoffDelay = undefined;
const connectionHandle = new Pipe(socketType.SOCKET, connection); this.#connections++;
try { this.onconnection!(0, connectionHandle); } catch { }
return this.#accept(); }
override _onClose(): number { this.#closed = true; this.reading = false;
this.#address = undefined;
this.#backlog = undefined; this.#connections = 0; this.#acceptBackoffDelay = undefined;
if (this.provider === providerType.PIPESERVERWRAP) { try { this.#listener.close(); } catch { } }
return LibuvStreamWrap.prototype._onClose.call(this); }}
export class PipeConnectWrap extends AsyncWrap { oncomplete!: ( status: number, handle: ConnectionWrap, req: PipeConnectWrap, readable: boolean, writeable: boolean, ) => void; address!: string;
constructor() { super(providerType.PIPECONNECTWRAP); }}
export enum constants { SOCKET = socketType.SOCKET, SERVER = socketType.SERVER, IPC = socketType.IPC, UV_READABLE = 1, UV_WRITABLE = 2,}