import { notImplemented } from "./_utils.ts";import { EventEmitter } from "./events.ts";import { isIP, isIPv4, isIPv6, normalizedArgsSymbol } from "./internal/net.ts";import { Duplex } from "./stream.ts";import { asyncIdSymbol, defaultTriggerAsyncIdScope, newAsyncId, ownerSymbol,} from "./internal/async_hooks.ts";import { ERR_INVALID_ADDRESS_FAMILY, ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_VALUE, ERR_INVALID_FD_TYPE, ERR_INVALID_IP_ADDRESS, ERR_MISSING_ARGS, ERR_SERVER_ALREADY_LISTEN, ERR_SERVER_NOT_RUNNING, ERR_SOCKET_CLOSED, errnoException, exceptionWithHostPort, genericNodeError, uvExceptionWithHostPort,} from "./internal/errors.ts";import type { ErrnoException } from "./internal/errors.ts";import { Encodings } from "./_utils.ts";import { isUint8Array } from "./internal/util/types.ts";import { kAfterAsyncWrite, kBuffer, kBufferCb, kBufferGen, kHandle, kUpdateTimer, onStreamRead, setStreamTimeout, writeGeneric, writevGeneric,} from "./internal/stream_base_commons.ts";import { kTimeout } from "./internal/timers.mjs";import { nextTick } from "./_next_tick.ts";import { DTRACE_NET_SERVER_CONNECTION, DTRACE_NET_STREAM_END,} from "./internal/dtrace.ts";import { Buffer } from "./buffer.ts";import type { LookupOneOptions } from "./internal/dns/utils.ts";import { validateAbortSignal, validateFunction, validateInt32, validateNumber, validatePort, validateString,} from "./internal/validators.mjs";import { constants as TCPConstants, TCP, TCPConnectWrap,} from "./internal_binding/tcp_wrap.ts";import { constants as PipeConstants, Pipe, PipeConnectWrap,} from "./internal_binding/pipe_wrap.ts";import { ShutdownWrap } from "./internal_binding/stream_wrap.ts";import { assert } from "../_util/assert.ts";import { isWindows } from "../_util/os.ts";import { ADDRCONFIG, lookup as dnsLookup } from "./dns.ts";import { codeMap } from "./internal_binding/uv.ts";import { guessHandleType } from "./internal_binding/util.ts";import { debuglog } from "./internal/util/debuglog.ts";import type { DuplexOptions } from "./_stream.d.ts";import type { BufferEncoding } from "./_global.d.ts";import type { Abortable } from "./_events.d.ts";
let debug = debuglog("net", (fn) => { debug = fn;});
const kLastWriteQueueSize = Symbol("lastWriteQueueSize");const kSetNoDelay = Symbol("kSetNoDelay");const kBytesRead = Symbol("kBytesRead");const kBytesWritten = Symbol("kBytesWritten");
const DEFAULT_IPV4_ADDR = "0.0.0.0";const DEFAULT_IPV6_ADDR = "::";
type Handle = TCP | Pipe;
interface HandleOptions { pauseOnCreate?: boolean; manualStart?: boolean; handle?: Handle;}
interface OnReadOptions { buffer: Uint8Array | (() => Uint8Array); callback(bytesWritten: number, buf: Uint8Array): boolean;}
interface ConnectOptions { onread?: OnReadOptions;}
interface SocketOptions extends ConnectOptions, HandleOptions, DuplexOptions { fd?: number; allowHalfOpen?: boolean; readable?: boolean; writable?: boolean; signal?: AbortSignal;}
interface TcpNetConnectOptions extends TcpSocketConnectOptions, SocketOptions { timeout?: number;}
interface IpcNetConnectOptions extends IpcSocketConnectOptions, SocketOptions { timeout?: number;}
type NetConnectOptions = TcpNetConnectOptions | IpcNetConnectOptions;
interface AddressInfo { address: string; family?: string; port: number;}
type LookupFunction = ( hostname: string, options: LookupOneOptions, callback: ( err: ErrnoException | null, address: string, family: number, ) => void,) => void;
interface TcpSocketConnectOptions extends ConnectOptions { port: number; host?: string; localAddress?: string; localPort?: number; hints?: number; family?: number; lookup?: LookupFunction;}
interface IpcSocketConnectOptions extends ConnectOptions { path: string;}
type SocketConnectOptions = TcpSocketConnectOptions | IpcSocketConnectOptions;
function _getNewAsyncId(handle?: Handle): number { return !handle || typeof handle.getAsyncId !== "function" ? newAsyncId() : handle.getAsyncId();}
interface NormalizedArgs { 0: Partial<NetConnectOptions | ListenOptions>; 1: ConnectionListener | null; [normalizedArgsSymbol]?: boolean;}
const _noop = (_arrayBuffer: Uint8Array, _nread: number): undefined => { return;};
function _toNumber(x: unknown): number | false { return (x = Number(x)) >= 0 ? (x as number) : false;}
function _isPipeName(s: unknown): s is string { return typeof s === "string" && _toNumber(s) === false;}
function _createHandle(fd: number, isServer: boolean): Handle { validateInt32(fd, "fd", 0);
const type = guessHandleType(fd);
if (type === "PIPE") { return new Pipe(isServer ? PipeConstants.SERVER : PipeConstants.SOCKET); }
if (type === "TCP") { return new TCP(isServer ? TCPConstants.SERVER : TCPConstants.SOCKET); }
throw new ERR_INVALID_FD_TYPE(type);}
export function _normalizeArgs(args: unknown[]): NormalizedArgs { let arr: NormalizedArgs;
if (args.length === 0) { arr = [{}, null]; arr[normalizedArgsSymbol] = true;
return arr; }
const arg0 = args[0] as Partial<NetConnectOptions> | number | string; let options: Partial<SocketConnectOptions> = {};
if (typeof arg0 === "object" && arg0 !== null) { options = arg0; } else if (_isPipeName(arg0)) { (options as IpcSocketConnectOptions).path = arg0; } else { (options as TcpSocketConnectOptions).port = arg0;
if (args.length > 1 && typeof args[1] === "string") { (options as TcpSocketConnectOptions).host = args[1]; } }
const cb = args[args.length - 1];
if (!_isConnectionListener(cb)) { arr = [options, null]; } else { arr = [options, cb]; }
arr[normalizedArgsSymbol] = true;
return arr;}
function _isTCPConnectWrap( req: TCPConnectWrap | PipeConnectWrap,): req is TCPConnectWrap { return "localAddress" in req && "localPort" in req;}
function _afterConnect( status: number, handle: any, req: PipeConnectWrap | TCPConnectWrap, readable: boolean, writable: boolean,) { let socket = handle[ownerSymbol];
if (socket.constructor.name === "ReusedHandle") { socket = socket.handle; }
if (socket.destroyed) { return; }
debug("afterConnect");
assert(socket.connecting);
socket.connecting = false; socket._sockname = null;
if (status === 0) { if (socket.readable && !readable) { socket.push(null); socket.read(); }
if (socket.writable && !writable) { socket.end(); }
socket._unrefTimer();
socket.emit("connect"); socket.emit("ready");
if (readable && !socket.isPaused()) { socket.read(0); } } else { socket.connecting = false; let details;
if (_isTCPConnectWrap(req)) { details = req.localAddress + ":" + req.localPort; }
const ex = exceptionWithHostPort( status, "connect", req.address, (req as TCPConnectWrap).port, details, );
if (_isTCPConnectWrap(req)) { ex.localAddress = req.localAddress; ex.localPort = req.localPort; }
socket.destroy(ex); }}
function _checkBindError(err: number, port: number, handle: TCP) { if (err === 0 && port > 0 && handle.getsockname) { const out: AddressInfo | Record<string, never> = {}; err = handle.getsockname(out);
if (err === 0 && port !== out.port) { err = codeMap.get("EADDRINUSE")!; } }
return err;}
function _isPipe( options: Partial<SocketConnectOptions>,): options is IpcSocketConnectOptions { return "path" in options && !!options.path;}
function _connectErrorNT(socket: Socket, err: Error) { socket.destroy(err);}
function _internalConnect( socket: Socket, address: string, port: number, addressType: number, localAddress: string, localPort: number, flags: number,) { assert(socket.connecting);
let err;
if (localAddress || localPort) { if (addressType === 4) { localAddress = localAddress || DEFAULT_IPV4_ADDR; err = (socket._handle as TCP).bind(localAddress, localPort); } else { localAddress = localAddress || DEFAULT_IPV6_ADDR; err = (socket._handle as TCP).bind6(localAddress, localPort, flags); }
debug( "binding to localAddress: %s and localPort: %d (addressType: %d)", localAddress, localPort, addressType, );
err = _checkBindError(err, localPort, socket._handle as TCP);
if (err) { const ex = exceptionWithHostPort(err, "bind", localAddress, localPort); socket.destroy(ex);
return; } }
if (addressType === 6 || addressType === 4) { const req = new TCPConnectWrap(); req.oncomplete = _afterConnect; req.address = address; req.port = port; req.localAddress = localAddress; req.localPort = localPort;
if (addressType === 4) { err = (socket._handle as TCP).connect(req, address, port); } else { err = (socket._handle as TCP).connect6(req, address, port); } } else { const req = new PipeConnectWrap(); req.oncomplete = _afterConnect; req.address = address;
err = (socket._handle as Pipe).connect(req, address); }
if (err) { let details = "";
const sockname = socket._getsockname();
if (sockname) { details = `${sockname.address}:${sockname.port}`; }
const ex = exceptionWithHostPort(err, "connect", address, port, details); socket.destroy(ex); }}
function _writeAfterFIN( this: Socket, chunk: any, encoding?: | BufferEncoding | null | ((error: Error | null | undefined) => void), cb?: (error: Error | null | undefined) => void,): boolean { if (!this.writableEnded) { return Duplex.prototype.write.call( this, chunk, encoding as BufferEncoding | null, cb, ); }
if (typeof encoding === "function") { cb = encoding; encoding = null; }
const err = genericNodeError( "This socket has been ended by the other party", { code: "EPIPE" }, );
if (typeof cb === "function") { defaultTriggerAsyncIdScope(this[asyncIdSymbol], nextTick, cb, err); }
if (this._server) { nextTick(() => this.destroy(err)); } else { this.destroy(err); }
return false;}
function _tryReadStart(socket: Socket): void { debug("Socket._handle.readStart"); socket._handle!.reading = true; const err = socket._handle!.readStart();
if (err) { socket.destroy(errnoException(err, "read")); }}
function _onReadableStreamEnd(this: Socket): void { if (!this.allowHalfOpen) { this.write = _writeAfterFIN; }}
function _initSocketHandle(socket: Socket): void { socket._undestroy(); socket._sockname = undefined;
if (socket._handle) { (socket._handle as any)[ownerSymbol] = socket; socket._handle.onread = onStreamRead; socket[asyncIdSymbol] = _getNewAsyncId(socket._handle);
let userBuf = socket[kBuffer];
if (userBuf) { const bufGen = socket[kBufferGen];
if (bufGen !== null) { userBuf = bufGen();
if (!isUint8Array(userBuf)) { return; }
socket[kBuffer] = userBuf; }
socket._handle.useUserBuffer(userBuf); } }}
function _lookupAndConnect( self: Socket, options: TcpSocketConnectOptions,): void { const { localAddress, localPort } = options; const host = options.host || "localhost"; let { port } = options;
if (localAddress && !isIP(localAddress)) { throw new ERR_INVALID_IP_ADDRESS(localAddress); }
if (localPort) { validateNumber(localPort, "options.localPort"); }
if (typeof port !== "undefined") { if (typeof port !== "number" && typeof port !== "string") { throw new ERR_INVALID_ARG_TYPE( "options.port", ["number", "string"], port, ); }
validatePort(port); }
port |= 0;
const addressType = isIP(host); if (addressType) { defaultTriggerAsyncIdScope(self[asyncIdSymbol], nextTick, () => { if (self.connecting) { defaultTriggerAsyncIdScope( self[asyncIdSymbol], _internalConnect, self, host, port, addressType, localAddress, localPort, ); } });
return; }
if (options.lookup !== undefined) { validateFunction(options.lookup, "options.lookup"); }
const dnsOpts = { family: options.family, hints: options.hints || 0, };
if ( !isWindows && dnsOpts.family !== 4 && dnsOpts.family !== 6 && dnsOpts.hints === 0 ) { dnsOpts.hints = ADDRCONFIG; }
debug("connect: find host", host); debug("connect: dns options", dnsOpts); self._host = host; const lookup = options.lookup || dnsLookup;
defaultTriggerAsyncIdScope(self[asyncIdSymbol], function () { lookup( host, dnsOpts, function emitLookup( err: ErrnoException | null, ip: string, addressType: number, ) { self.emit("lookup", err, ip, addressType, host);
if (!self.connecting) { return; }
if (err) { nextTick(_connectErrorNT, self, err); } else if (!isIP(ip)) { err = new ERR_INVALID_IP_ADDRESS(ip);
nextTick(_connectErrorNT, self, err); } else if (addressType !== 4 && addressType !== 6) { err = new ERR_INVALID_ADDRESS_FAMILY( `${addressType}`, options.host!, options.port, );
nextTick(_connectErrorNT, self, err); } else { self._unrefTimer();
defaultTriggerAsyncIdScope( self[asyncIdSymbol], _internalConnect, self, ip, port, addressType, localAddress, localPort, ); } }, ); });}
function _afterShutdown(this: ShutdownWrap<TCP>) { const self: any = this.handle[ownerSymbol];
debug("afterShutdown destroyed=%j", self.destroyed, self._readableState);
this.callback();}
function _emitCloseNT(s: Socket | Server) { debug("SERVER: emit close"); s.emit("close");}
export class Socket extends Duplex { [asyncIdSymbol] = -1;
[kHandle]: Handle | null = null; [kSetNoDelay] = false; [kLastWriteQueueSize] = 0; [kTimeout]: any = null; [kBuffer]: Uint8Array | boolean | null = null; [kBufferCb]: OnReadOptions["callback"] | null = null; [kBufferGen]: (() => Uint8Array) | null = null;
[kBytesRead] = 0; [kBytesWritten] = 0;
server = null; _server: any = null;
_peername?: AddressInfo | Record<string, never>; _sockname?: AddressInfo | Record<string, never>; _pendingData: Uint8Array | string | null = null; _pendingEncoding = ""; _host: string | null = null; _parent: any = null;
constructor(options: SocketOptions | number) { if (typeof options === "number") { options = { fd: options }; } else { options = { ...options }; }
options.allowHalfOpen = Boolean(options.allowHalfOpen); options.emitClose = false; options.autoDestroy = true; options.decodeStrings = false;
super(options);
if (options.handle) { this._handle = options.handle; this[asyncIdSymbol] = _getNewAsyncId(this._handle); } else if (options.fd !== undefined) { notImplemented("net.Socket.prototype.constructor with fd option"); }
const onread = options.onread;
if ( onread !== null && typeof onread === "object" && (isUint8Array(onread.buffer) || typeof onread.buffer === "function") && typeof onread.callback === "function" ) { if (typeof onread.buffer === "function") { this[kBuffer] = true; this[kBufferGen] = onread.buffer; } else { this[kBuffer] = onread.buffer; }
this[kBufferCb] = onread.callback; }
this.on("end", _onReadableStreamEnd);
_initSocketHandle(this);
if (this._handle && options.readable !== false) { if (options.pauseOnCreate) { this._handle.reading = false; this._handle.readStop(); this.readableFlowing = false; } else if (!options.manualStart) { this.read(0); } } }
connect( options: SocketConnectOptions | NormalizedArgs, connectionListener?: ConnectionListener, ): this; connect( port: number, host: string, connectionListener?: ConnectionListener, ): this; connect(port: number, connectionListener?: ConnectionListener): this; connect(path: string, connectionListener?: ConnectionListener): this; connect(...args: unknown[]): this { let normalized: NormalizedArgs;
if ( Array.isArray(args[0]) && (args[0] as unknown as NormalizedArgs)[normalizedArgsSymbol] ) { normalized = args[0] as unknown as NormalizedArgs; } else { normalized = _normalizeArgs(args); }
const options = normalized[0]; const cb = normalized[1];
if ( (options as TcpSocketConnectOptions).port === undefined && (options as IpcSocketConnectOptions).path == null ) { throw new ERR_MISSING_ARGS(["options", "port", "path"]); }
if (this.write !== Socket.prototype.write) { this.write = Socket.prototype.write; }
if (this.destroyed) { this._handle = null; this._peername = undefined; this._sockname = undefined; }
const { path } = options as IpcNetConnectOptions; const pipe = _isPipe(options); debug("pipe", pipe, path);
if (!this._handle) { this._handle = pipe ? new Pipe(PipeConstants.SOCKET) : new TCP(TCPConstants.SOCKET);
_initSocketHandle(this); }
if (cb !== null) { this.once("connect", cb); }
this._unrefTimer();
this.connecting = true;
if (pipe) { validateString(path, "options.path"); defaultTriggerAsyncIdScope( this[asyncIdSymbol], _internalConnect, this, path, ); } else { _lookupAndConnect(this, options as TcpSocketConnectOptions); }
return this; }
override pause(): this { if ( this[kBuffer] && !this.connecting && this._handle && this._handle.reading ) { this._handle.reading = false;
if (!this.destroyed) { const err = this._handle.readStop();
if (err) { this.destroy(errnoException(err, "read")); } } }
return Duplex.prototype.pause.call(this) as unknown as this; }
override resume(): this { if ( this[kBuffer] && !this.connecting && this._handle && !this._handle.reading ) { _tryReadStart(this); }
return Duplex.prototype.resume.call(this) as this; }
setTimeout = setStreamTimeout;
setNoDelay(noDelay?: boolean): this { if (!this._handle) { this.once( "connect", noDelay ? this.setNoDelay : () => this.setNoDelay(noDelay), );
return this; }
const newValue = noDelay === undefined ? true : !!noDelay;
if ( "setNoDelay" in this._handle && this._handle.setNoDelay && newValue !== this[kSetNoDelay] ) { this[kSetNoDelay] = newValue; this._handle.setNoDelay(newValue); }
return this; }
setKeepAlive(enable: boolean, initialDelay?: number): this { if (!this._handle) { this.once("connect", () => this.setKeepAlive(enable, initialDelay));
return this; }
if ("setKeepAlive" in this._handle) { this._handle.setKeepAlive(enable, ~~(initialDelay! / 1000)); }
return this; }
address(): AddressInfo | Record<string, never> { return this._getsockname(); }
unref(): this { if (!this._handle) { this.once("connect", this.unref);
return this; }
if (typeof this._handle.unref === "function") { this._handle.unref(); }
return this; }
ref(): this { if (!this._handle) { this.once("connect", this.ref);
return this; }
if (typeof this._handle.ref === "function") { this._handle.ref(); }
return this; }
get bufferSize(): number { if (this._handle) { return this.writableLength; }
return 0; }
get bytesRead(): number { return this._handle ? this._handle.bytesRead : this[kBytesRead]; }
get bytesWritten(): number | undefined { let bytes = this._bytesDispatched; const data = this._pendingData; const encoding = this._pendingEncoding; const writableBuffer = this.writableBuffer;
if (!writableBuffer) { return undefined; }
for (const el of writableBuffer) { bytes += el!.chunk instanceof Buffer ? el!.chunk.length : Buffer.byteLength(el!.chunk, el!.encoding); }
if (Array.isArray(data)) { for (let i = 0; i < data.length; i++) { const chunk = data[i];
if ((data as any).allBuffers || chunk instanceof Buffer) { bytes += chunk.length; } else { bytes += Buffer.byteLength(chunk.chunk, chunk.encoding); } } } else if (data) { if (typeof data !== "string") { bytes += (data as Buffer).length; } else { bytes += Buffer.byteLength(data, encoding); } }
return bytes; }
connecting = false;
get localAddress(): string { return this._getsockname().address; }
get localPort(): number { return this._getsockname().port; }
get remoteAddress(): string | undefined { return this._getpeername().address; }
get remoteFamily(): string | undefined { return `IPv${this._getpeername().family}`; }
get remotePort(): number | undefined { return this._getpeername().port; }
get pending(): boolean { return !this._handle || this.connecting; }
get readyState(): string { if (this.connecting) { return "opening"; } else if (this.readable && this.writable) { return "open"; } else if (this.readable && !this.writable) { return "readOnly"; } else if (!this.readable && this.writable) { return "writeOnly"; } return "closed"; }
override end(cb?: () => void): this; override end(buffer: Uint8Array | string, cb?: () => void): this; override end( data: Uint8Array | string, encoding?: Encodings, cb?: () => void, ): this; override end( data?: Uint8Array | string | (() => void), encoding?: Encodings | (() => void), cb?: () => void, ): this { Duplex.prototype.end.call(this, data, encoding as Encodings, cb); DTRACE_NET_STREAM_END(this);
return this; }
override read( size?: number, ): string | Uint8Array | Buffer | null | undefined { if ( this[kBuffer] && !this.connecting && this._handle && !this._handle.reading ) { _tryReadStart(this); }
return Duplex.prototype.read.call(this, size); }
destroySoon(): void { if (this.writable) { this.end(); }
if (this.writableFinished) { this.destroy(); } else { this.once("finish", this.destroy); } }
_unrefTimer() { for (let s = this; s !== null; s = s._parent) { if (s[kTimeout]) { s[kTimeout].refresh(); } } }
override _final(cb: any): any { if (this.pending) { debug("_final: not yet connected"); return this.once("connect", () => this._final(cb)); }
if (!this._handle) { return cb(); }
debug("_final: not ended, call shutdown()");
const req = new ShutdownWrap<Handle>(); req.oncomplete = _afterShutdown; req.handle = this._handle; req.callback = cb; const err = this._handle.shutdown(req);
if (err === 1 || err === codeMap.get("ENOTCONN")) { return cb(); } else if (err !== 0) { return cb(errnoException(err, "shutdown")); } }
_onTimeout() { const handle = this._handle; const lastWriteQueueSize = this[kLastWriteQueueSize];
if (lastWriteQueueSize > 0 && handle) { const { writeQueueSize } = handle;
if (lastWriteQueueSize !== writeQueueSize) { this[kLastWriteQueueSize] = writeQueueSize; this._unrefTimer();
return; } }
debug("_onTimeout"); this.emit("timeout"); }
override _read(size?: number): void { debug("_read"); if (this.connecting || !this._handle) { debug("_read wait for connection"); this.once("connect", () => this._read(size)); } else if (!this._handle.reading) { _tryReadStart(this); } }
override _destroy(exception: Error | null, cb: (err: Error | null) => void) { debug("destroy"); this.connecting = false;
for (let s = this; s !== null; s = s._parent) { clearTimeout(s[kTimeout]); }
debug("close"); if (this._handle) { debug("close handle"); const isException = exception ? true : false; this[kBytesRead] = this._handle.bytesRead; this[kBytesWritten] = this._handle.bytesWritten;
this._handle.close(() => { this._handle!.onread = _noop; this._handle = null; this._sockname = undefined;
cb(exception);
debug("emit close"); this.emit("close", isException); }); } else { cb(exception); nextTick(_emitCloseNT, this); }
if (this._server) { debug("has server"); this._server._connections--;
if (this._server._emitCloseIfDrained) { this._server._emitCloseIfDrained(); } } }
_getpeername(): AddressInfo | Record<string, never> { if (!this._handle || !("getpeername" in this._handle)) { return this._peername || {}; } else if (!this._peername) { this._peername = {}; this._handle.getpeername(this._peername); }
return this._peername; }
_getsockname(): AddressInfo | Record<string, never> { if (!this._handle || !("getsockname" in this._handle)) { return {}; } else if (!this._sockname) { this._sockname = {}; this._handle.getsockname(this._sockname); }
return this._sockname; }
_writeGeneric( writev: boolean, data: any, encoding: string, cb: (error?: Error | null) => void, ) { if (this.connecting) { this._pendingData = data; this._pendingEncoding = encoding; this.once("connect", function connect(this: Socket) { this._writeGeneric(writev, data, encoding, cb); });
return; }
this._pendingData = null; this._pendingEncoding = "";
if (!this._handle) { cb(new ERR_SOCKET_CLOSED());
return false; }
this._unrefTimer();
let req;
if (writev) { req = writevGeneric(this, data, cb); } else { req = writeGeneric(this, data, encoding, cb); } if (req.async) { this[kLastWriteQueueSize] = req.bytes; } }
_writev( chunks: Array<{ chunk: any; encoding: string }>, cb: (error?: Error | null) => void, ) { this._writeGeneric(true, chunks, "", cb); }
override _write( data: any, encoding: string, cb: (error?: Error | null) => void, ) { this._writeGeneric(false, data, encoding, cb); }
[kAfterAsyncWrite](): void { this[kLastWriteQueueSize] = 0; }
get [kUpdateTimer]() { return this._unrefTimer; }
get _connecting(): boolean { return this.connecting; }
get _bytesDispatched(): number { return this._handle ? this._handle.bytesWritten : this[kBytesWritten]; }
get _handle(): Handle | null { return this[kHandle]; }
set _handle(v: Handle | null) { this[kHandle] = v; }}
export const Stream = Socket;
export function connect( options: NetConnectOptions, connectionListener?: () => void,): Socket;export function connect( port: number, host?: string, connectionListener?: () => void,): Socket;export function connect(path: string, connectionListener?: () => void): Socket;export function connect(...args: unknown[]) { const normalized = _normalizeArgs(args); const options = normalized[0] as Partial<NetConnectOptions>; debug("createConnection", normalized); const socket = new Socket(options);
if (options.timeout) { socket.setTimeout(options.timeout); }
return socket.connect(normalized);}
export const createConnection = connect;
export interface ListenOptions extends Abortable { fd?: number; port?: number | undefined; host?: string | undefined; backlog?: number | undefined; path?: string | undefined; exclusive?: boolean | undefined; readableAll?: boolean | undefined; writableAll?: boolean | undefined; ipv6Only?: boolean | undefined;}
type ConnectionListener = (socket: Socket) => void;
interface ServerOptions { allowHalfOpen?: boolean | undefined; pauseOnConnect?: boolean | undefined;}
function _isServerSocketOptions( options: unknown,): options is null | undefined | ServerOptions { return ( options === null || typeof options === "undefined" || typeof options === "object" );}
function _isConnectionListener( connectionListener: unknown,): connectionListener is ConnectionListener { return typeof connectionListener === "function";}
function _getFlags(ipv6Only?: boolean): number { return ipv6Only === true ? TCPConstants.UV_TCP_IPV6ONLY : 0;}
function _listenInCluster( server: Server, address: string | null, port: number | null, addressType: number | null, backlog: number, fd?: number | null, exclusive?: boolean, flags?: number,) { exclusive = !!exclusive;
const isPrimary = true;
if (isPrimary || exclusive) { server._listen2(address, port, addressType, backlog, fd, flags);
return; }}
function _lookupAndListen( server: Server, port: number, address: string, backlog: number, exclusive: boolean, flags: number,) { dnsLookup(address, function doListen(err, ip, addressType) { if (err) { server.emit("error", err); } else { addressType = ip ? addressType : 4;
_listenInCluster( server, ip, port, addressType, backlog, null, exclusive, flags, ); } });}
function _addAbortSignalOption(server: Server, options: ListenOptions) { if (options?.signal === undefined) { return; }
validateAbortSignal(options.signal, "options.signal");
const { signal } = options;
const onAborted = () => { server.close(); };
if (signal.aborted) { nextTick(onAborted); } else { signal.addEventListener("abort", onAborted); server.once("close", () => signal.removeEventListener("abort", onAborted)); }}
export function _createServerHandle( address: string | null, port: number | null, addressType: number | null, fd?: number | null, flags?: number,): Handle | number { let err = 0; let handle; let isTCP = false;
if (typeof fd === "number" && fd >= 0) { try { handle = _createHandle(fd, true); } catch (e) { debug("listen invalid fd=%d:", fd, (e as Error).message);
return codeMap.get("EINVAL")!; }
err = handle.open(fd);
if (err) { return err; }
assert(!address && !port); } else if (port === -1 && addressType === -1) { handle = new Pipe(PipeConstants.SERVER);
if (isWindows) { const instances = Number.parseInt( Deno.env.get("NODE_PENDING_PIPE_INSTANCES") ?? "", );
if (!Number.isNaN(instances)) { handle.setPendingInstances!(instances); } } } else { handle = new TCP(TCPConstants.SERVER); isTCP = true; }
if (address || port || isTCP) { debug("bind to", address || "any");
if (!address) {
return _createServerHandle(DEFAULT_IPV4_ADDR, port, 4, null, flags); } else if (addressType === 6) { err = (handle as TCP).bind6(address, port ?? 0, flags ?? 0); } else { err = (handle as TCP).bind(address, port ?? 0); } }
if (err) { handle.close();
return err; }
return handle;}
function _emitErrorNT(server: Server, err: Error) { server.emit("error", err);}
function _emitListeningNT(server: Server) { if (server._handle) { server.emit("listening"); }}
function _onconnection(this: any, err: number, clientHandle?: Handle) { const handle = this; const self = handle[ownerSymbol];
debug("onconnection");
if (err) { self.emit("error", errnoException(err, "accept"));
return; }
if (self.maxConnections && self._connections >= self.maxConnections) { clientHandle!.close();
return; }
const socket = new Socket({ handle: clientHandle, allowHalfOpen: self.allowHalfOpen, pauseOnCreate: self.pauseOnConnect, readable: true, writable: true, });
self._connections++; socket.server = self; socket._server = self;
DTRACE_NET_SERVER_CONNECTION(socket); self.emit("connection", socket);}
function _setupListenHandle( this: Server, address: string | null, port: number | null, addressType: number | null, backlog: number, fd?: number | null, flags?: number,): void { debug("setupListenHandle", address, port, addressType, backlog, fd);
if (this._handle) { debug("setupListenHandle: have a handle already"); } else { debug("setupListenHandle: create a handle");
let rval = null;
if (!address && typeof fd !== "number") {
address = DEFAULT_IPV4_ADDR; addressType = 4; }
if (rval === null) { rval = _createServerHandle(address, port, addressType, fd, flags); }
if (typeof rval === "number") { const error = uvExceptionWithHostPort(rval, "listen", address, port); nextTick(_emitErrorNT, this, error);
return; }
this._handle = rval; }
this[asyncIdSymbol] = _getNewAsyncId(this._handle); this._handle.onconnection = _onconnection; this._handle[ownerSymbol] = this;
const err = this._handle.listen(backlog || 511);
if (err) { const ex = uvExceptionWithHostPort(err, "listen", address, port); this._handle.close(); this._handle = null;
defaultTriggerAsyncIdScope( this[asyncIdSymbol], nextTick, _emitErrorNT, this, ex, );
return; }
this._connectionKey = addressType + ":" + address + ":" + port;
if (this._unref) { this.unref(); }
defaultTriggerAsyncIdScope( this[asyncIdSymbol], nextTick, _emitListeningNT, this, );}
export class Server extends EventEmitter { [asyncIdSymbol] = -1;
allowHalfOpen = false; pauseOnConnect = false;
_handle: any = null; _connections = 0; _usingWorkers = false; _workers: any[] = []; _unref = false; _pipeName?: string; _connectionKey?: string;
constructor(connectionListener?: ConnectionListener); constructor(options?: ServerOptions, connectionListener?: ConnectionListener); constructor( options?: ServerOptions | ConnectionListener, connectionListener?: ConnectionListener, ) { super();
if (_isConnectionListener(options)) { this.on("connection", options); } else if (_isServerSocketOptions(options)) { this.allowHalfOpen = options?.allowHalfOpen || false; this.pauseOnConnect = !!options?.pauseOnConnect;
if (_isConnectionListener(connectionListener)) { this.on("connection", connectionListener); } } else { throw new ERR_INVALID_ARG_TYPE("options", "Object", options); } }
listen( port?: number, hostname?: string, backlog?: number, listeningListener?: () => void, ): this; listen( port?: number, hostname?: string, listeningListener?: () => void, ): this; listen(port?: number, backlog?: number, listeningListener?: () => void): this; listen(port?: number, listeningListener?: () => void): this; listen(path: string, backlog?: number, listeningListener?: () => void): this; listen(path: string, listeningListener?: () => void): this; listen(options: ListenOptions, listeningListener?: () => void): this; listen(handle: any, backlog?: number, listeningListener?: () => void): this; listen(handle: any, listeningListener?: () => void): this; listen(...args: unknown[]): this { const normalized = _normalizeArgs(args); let options = normalized[0] as Partial<ListenOptions>; const cb = normalized[1];
if (this._handle) { throw new ERR_SERVER_ALREADY_LISTEN(); }
if (cb !== null) { this.once("listening", cb); }
const backlogFromArgs: number = _toNumber(args.length > 1 && args[1]) || (_toNumber(args.length > 2 && args[2]) as number);
options = (options as any)._handle || (options as any).handle || options; const flags = _getFlags(options.ipv6Only);
if (options instanceof TCP) { this._handle = options; this[asyncIdSymbol] = this._handle.getAsyncId();
_listenInCluster(this, null, -1, -1, backlogFromArgs);
return this; }
_addAbortSignalOption(this, options);
if (typeof options.fd === "number" && options.fd >= 0) { _listenInCluster(this, null, null, null, backlogFromArgs, options.fd);
return this; }
if ( args.length === 0 || typeof args[0] === "function" || (typeof options.port === "undefined" && "port" in options) || options.port === null ) { options.port = 0; }
let backlog;
if (typeof options.port === "number" || typeof options.port === "string") { validatePort(options.port, "options.port"); backlog = options.backlog || backlogFromArgs;
if (options.host) { _lookupAndListen( this, options.port | 0, options.host, backlog, !!options.exclusive, flags, ); } else { _listenInCluster( this, null, options.port | 0, 4, backlog, undefined, options.exclusive, ); }
return this; }
if (options.path && _isPipeName(options.path)) { const pipeName = (this._pipeName = options.path); backlog = options.backlog || backlogFromArgs;
_listenInCluster( this, pipeName, -1, -1, backlog, undefined, options.exclusive, );
if (!this._handle) { return this; }
let mode = 0;
if (options.readableAll === true) { mode |= PipeConstants.UV_READABLE; }
if (options.writableAll === true) { mode |= PipeConstants.UV_WRITABLE; }
if (mode !== 0) { const err = this._handle.fchmod(mode);
if (err) { this._handle.close(); this._handle = null;
throw errnoException(err, "uv_pipe_chmod"); } }
return this; }
if (!("port" in options || "path" in options)) { throw new ERR_INVALID_ARG_VALUE( "options", options, 'must have the property "port" or "path"', ); }
throw new ERR_INVALID_ARG_VALUE("options", options); }
close(cb?: (err?: Error) => void): this { if (typeof cb === "function") { if (!this._handle) { this.once("close", function close() { cb(new ERR_SERVER_NOT_RUNNING()); }); } else { this.once("close", cb); } }
if (this._handle) { (this._handle as TCP).close(); this._handle = null; }
if (this._usingWorkers) { let left = this._workers.length; const onWorkerClose = () => { if (--left !== 0) { return; }
this._connections = 0; this._emitCloseIfDrained(); };
this._connections++;
for (let n = 0; n < this._workers.length; n++) { this._workers[n].close(onWorkerClose); } } else { this._emitCloseIfDrained(); }
return this; }
address(): AddressInfo | string | null { if (this._handle && this._handle.getsockname) { const out = {}; const err = this._handle.getsockname(out);
if (err) { throw errnoException(err, "address"); }
return out as AddressInfo; } else if (this._pipeName) { return this._pipeName; }
return null; }
getConnections(cb: (err: Error | null, count: number) => void): this { const server = this;
function end(err: Error | null, connections?: number) { defaultTriggerAsyncIdScope( server[asyncIdSymbol], nextTick, cb, err, connections, ); }
if (!this._usingWorkers) { end(null, this._connections);
return this; }
let left = this._workers.length; let total = this._connections;
function oncount(err: Error, count: number) { if (err) { left = -1;
return end(err); }
total += count;
if (--left === 0) { return end(null, total); } }
for (let n = 0; n < this._workers.length; n++) { this._workers[n].getConnections(oncount); }
return this; }
unref(): this { this._unref = true;
if (this._handle) { this._handle.unref(); }
return this; }
ref(): this { this._unref = false;
if (this._handle) { this._handle.ref(); }
return this; }
get listening(): boolean { return !!this._handle; }
_listen2 = _setupListenHandle;
_emitCloseIfDrained(): void { debug("SERVER _emitCloseIfDrained"); if (this._handle || this._connections) { debug( `SERVER handle? ${!!this._handle} connections? ${this._connections}`, ); return; }
defaultTriggerAsyncIdScope( this[asyncIdSymbol], nextTick, _emitCloseNT, this, ); }
_setupWorker(socketList: EventEmitter): void { this._usingWorkers = true; this._workers.push(socketList);
socketList.once("exit", (socketList: any) => { const index = this._workers.indexOf(socketList); this._workers.splice(index, 1); }); }
[EventEmitter.captureRejectionSymbol]( err: Error, event: string, sock: Socket, ): void { switch (event) { case "connection": { sock.destroy(err); break; } default: { this.emit("error", err); } } }}
export function createServer( options?: ServerOptions, connectionListener?: ConnectionListener,): Server { return new Server(options, connectionListener);}
export { isIP, isIPv4, isIPv6 };
export default { _createServerHandle, _normalizeArgs, isIP, isIPv4, isIPv6, connect, createConnection, createServer, Server, Socket, Stream,};