Skip to main content
Module

std/node/net.ts

Deno standard library
Go to Latest
File
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.// Copyright Joyent, Inc. and other Node contributors.//// Permission is hereby granted, free of charge, to any person obtaining a// copy of this software and associated documentation files (the// "Software"), to deal in the Software without restriction, including// without limitation the rights to use, copy, modify, merge, publish,// distribute, sublicense, and/or sell copies of the Software, and to permit// persons to whom the Software is furnished to do so, subject to the// following conditions://// The above copyright notice and this permission notice shall be included// in all copies or substantial portions of the Software.//// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE// USE OR OTHER DEALINGS IN THE SOFTWARE.
import { notImplemented } from "./_utils.ts";import { EventEmitter } from "./events.ts";import { isIP, isIPv4, isIPv6, normalizedArgsSymbol } from "./internal/net.ts";import { Duplex } from "./stream.ts";import { asyncIdSymbol, defaultTriggerAsyncIdScope, newAsyncId, ownerSymbol,} from "./internal/async_hooks.ts";import { ERR_INVALID_ADDRESS_FAMILY, ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_VALUE, ERR_INVALID_FD_TYPE, ERR_INVALID_IP_ADDRESS, ERR_MISSING_ARGS, ERR_SERVER_ALREADY_LISTEN, ERR_SERVER_NOT_RUNNING, ERR_SOCKET_CLOSED, errnoException, exceptionWithHostPort, genericNodeError, uvExceptionWithHostPort,} from "./internal/errors.ts";import type { ErrnoException } from "./internal/errors.ts";import { Encodings } from "./_utils.ts";import { isUint8Array } from "./internal/util/types.ts";import { kAfterAsyncWrite, kBuffer, kBufferCb, kBufferGen, kHandle, kUpdateTimer, onStreamRead, setStreamTimeout, writeGeneric, writevGeneric,} from "./internal/stream_base_commons.ts";import { kTimeout } from "./internal/timers.mjs";import { nextTick } from "./_next_tick.ts";import { DTRACE_NET_SERVER_CONNECTION, DTRACE_NET_STREAM_END,} from "./internal/dtrace.ts";import { Buffer } from "./buffer.ts";import type { LookupOneOptions } from "./internal/dns/utils.ts";import { validateAbortSignal, validateFunction, validateInt32, validateNumber, validatePort, validateString,} from "./internal/validators.mjs";import { constants as TCPConstants, TCP, TCPConnectWrap,} from "./internal_binding/tcp_wrap.ts";import { constants as PipeConstants, Pipe, PipeConnectWrap,} from "./internal_binding/pipe_wrap.ts";import { ShutdownWrap } from "./internal_binding/stream_wrap.ts";import { assert } from "../_util/asserts.ts";import { isWindows } from "../_util/os.ts";import { ADDRCONFIG, lookup as dnsLookup } from "./dns.ts";import { codeMap } from "./internal_binding/uv.ts";import { guessHandleType } from "./internal_binding/util.ts";import { debuglog } from "./internal/util/debuglog.ts";import type { DuplexOptions } from "./_stream.d.ts";import type { BufferEncoding } from "./_global.d.ts";import type { Abortable } from "./_events.d.ts";import { channel } from "./diagnostics_channel.ts";
let debug = debuglog("net", (fn) => { debug = fn;});
const kLastWriteQueueSize = Symbol("lastWriteQueueSize");const kSetNoDelay = Symbol("kSetNoDelay");const kBytesRead = Symbol("kBytesRead");const kBytesWritten = Symbol("kBytesWritten");
const DEFAULT_IPV4_ADDR = "0.0.0.0";const DEFAULT_IPV6_ADDR = "::";
type Handle = TCP | Pipe;
interface HandleOptions { pauseOnCreate?: boolean; manualStart?: boolean; handle?: Handle;}
interface OnReadOptions { buffer: Uint8Array | (() => Uint8Array); /** * This function is called for every chunk of incoming data. * * Two arguments are passed to it: the number of bytes written to buffer and * a reference to buffer. * * Return `false` from this function to implicitly `pause()` the socket. */ callback(bytesWritten: number, buf: Uint8Array): boolean;}
interface ConnectOptions { /** * If specified, incoming data is stored in a single buffer and passed to the * supplied callback when data arrives on the socket. * * Note: this will cause the streaming functionality to not provide any data, * however events like `"error"`, `"end"`, and `"close"` will still be * emitted as normal and methods like `pause()` and `resume()` will also * behave as expected. */ onread?: OnReadOptions;}
interface SocketOptions extends ConnectOptions, HandleOptions, DuplexOptions { /** * If specified, wrap around an existing socket with the given file * descriptor, otherwise a new socket will be created. */ fd?: number; /** * If set to `false`, then the socket will automatically end the writable * side when the readable side ends. See `net.createServer()` and the `"end"` * event for details. Default: `false`. */ allowHalfOpen?: boolean; /** * Allow reads on the socket when an fd is passed, otherwise ignored. * Default: `false`. */ readable?: boolean; /** * Allow writes on the socket when an fd is passed, otherwise ignored. * Default: `false`. */ writable?: boolean; /** An Abort signal that may be used to destroy the socket. */ signal?: AbortSignal;}
interface TcpNetConnectOptions extends TcpSocketConnectOptions, SocketOptions { timeout?: number;}
interface IpcNetConnectOptions extends IpcSocketConnectOptions, SocketOptions { timeout?: number;}
type NetConnectOptions = TcpNetConnectOptions | IpcNetConnectOptions;
interface AddressInfo { address: string; family?: string; port: number;}
type LookupFunction = ( hostname: string, options: LookupOneOptions, callback: ( err: ErrnoException | null, address: string, family: number, ) => void,) => void;
interface TcpSocketConnectOptions extends ConnectOptions { port: number; host?: string; localAddress?: string; localPort?: number; hints?: number; family?: number; lookup?: LookupFunction;}
interface IpcSocketConnectOptions extends ConnectOptions { path: string;}
type SocketConnectOptions = TcpSocketConnectOptions | IpcSocketConnectOptions;
function _getNewAsyncId(handle?: Handle): number { return !handle || typeof handle.getAsyncId !== "function" ? newAsyncId() : handle.getAsyncId();}
interface NormalizedArgs { 0: Partial<NetConnectOptions | ListenOptions>; 1: ConnectionListener | null; [normalizedArgsSymbol]?: boolean;}
const _noop = (_arrayBuffer: Uint8Array, _nread: number): undefined => { return;};
const netClientSocketChannel = channel("net.client.socket");const netServerSocketChannel = channel("net.server.socket");
function _toNumber(x: unknown): number | false { return (x = Number(x)) >= 0 ? (x as number) : false;}
function _isPipeName(s: unknown): s is string { return typeof s === "string" && _toNumber(s) === false;}
function _createHandle(fd: number, isServer: boolean): Handle { validateInt32(fd, "fd", 0);
const type = guessHandleType(fd);
if (type === "PIPE") { return new Pipe(isServer ? PipeConstants.SERVER : PipeConstants.SOCKET); }
if (type === "TCP") { return new TCP(isServer ? TCPConstants.SERVER : TCPConstants.SOCKET); }
throw new ERR_INVALID_FD_TYPE(type);}
// Returns an array [options, cb], where options is an object,// cb is either a function or null.// Used to normalize arguments of `Socket.prototype.connect()` and// `Server.prototype.listen()`. Possible combinations of parameters:// - (options[...][, cb])// - (path[...][, cb])// - ([port][, host][...][, cb])// For `Socket.prototype.connect()`, the [...] part is ignored// For `Server.prototype.listen()`, the [...] part is [, backlog]// but will not be handled here (handled in listen())export function _normalizeArgs(args: unknown[]): NormalizedArgs { let arr: NormalizedArgs;
if (args.length === 0) { arr = [{}, null]; arr[normalizedArgsSymbol] = true;
return arr; }
const arg0 = args[0] as Partial<NetConnectOptions> | number | string; let options: Partial<SocketConnectOptions> = {};
if (typeof arg0 === "object" && arg0 !== null) { // (options[...][, cb]) options = arg0; } else if (_isPipeName(arg0)) { // (path[...][, cb]) (options as IpcSocketConnectOptions).path = arg0; } else { // ([port][, host][...][, cb]) (options as TcpSocketConnectOptions).port = arg0;
if (args.length > 1 && typeof args[1] === "string") { (options as TcpSocketConnectOptions).host = args[1]; } }
const cb = args[args.length - 1];
if (!_isConnectionListener(cb)) { arr = [options, null]; } else { arr = [options, cb]; }
arr[normalizedArgsSymbol] = true;
return arr;}
function _isTCPConnectWrap( req: TCPConnectWrap | PipeConnectWrap,): req is TCPConnectWrap { return "localAddress" in req && "localPort" in req;}
function _afterConnect( status: number, // deno-lint-ignore no-explicit-any handle: any, req: PipeConnectWrap | TCPConnectWrap, readable: boolean, writable: boolean,) { let socket = handle[ownerSymbol];
if (socket.constructor.name === "ReusedHandle") { socket = socket.handle; }
// Callback may come after call to destroy if (socket.destroyed) { return; }
debug("afterConnect");
assert(socket.connecting);
socket.connecting = false; socket._sockname = null;
if (status === 0) { if (socket.readable && !readable) { socket.push(null); socket.read(); }
if (socket.writable && !writable) { socket.end(); }
socket._unrefTimer();
socket.emit("connect"); socket.emit("ready");
// Start the first read, or get an immediate EOF. // this doesn't actually consume any bytes, because len=0. if (readable && !socket.isPaused()) { socket.read(0); } } else { socket.connecting = false; let details;
if (_isTCPConnectWrap(req)) { details = req.localAddress + ":" + req.localPort; }
const ex = exceptionWithHostPort( status, "connect", req.address, (req as TCPConnectWrap).port, details, );
if (_isTCPConnectWrap(req)) { ex.localAddress = req.localAddress; ex.localPort = req.localPort; }
socket.destroy(ex); }}
function _checkBindError(err: number, port: number, handle: TCP) { // EADDRINUSE may not be reported until we call `listen()` or `connect()`. // To complicate matters, a failed `bind()` followed by `listen()` or `connect()` // will implicitly bind to a random port. Ergo, check that the socket is // bound to the expected port before calling `listen()` or `connect()`. if (err === 0 && port > 0 && handle.getsockname) { const out: AddressInfo | Record<string, never> = {}; err = handle.getsockname(out);
if (err === 0 && port !== out.port) { err = codeMap.get("EADDRINUSE")!; } }
return err;}
function _isPipe( options: Partial<SocketConnectOptions>,): options is IpcSocketConnectOptions { return "path" in options && !!options.path;}
function _connectErrorNT(socket: Socket, err: Error) { socket.destroy(err);}
function _internalConnect( socket: Socket, address: string, port: number, addressType: number, localAddress: string, localPort: number, flags: number,) { assert(socket.connecting);
let err;
if (localAddress || localPort) { if (addressType === 4) { localAddress = localAddress || DEFAULT_IPV4_ADDR; err = (socket._handle as TCP).bind(localAddress, localPort); } else { // addressType === 6 localAddress = localAddress || DEFAULT_IPV6_ADDR; err = (socket._handle as TCP).bind6(localAddress, localPort, flags); }
debug( "binding to localAddress: %s and localPort: %d (addressType: %d)", localAddress, localPort, addressType, );
err = _checkBindError(err, localPort, socket._handle as TCP);
if (err) { const ex = exceptionWithHostPort(err, "bind", localAddress, localPort); socket.destroy(ex);
return; } }
if (addressType === 6 || addressType === 4) { const req = new TCPConnectWrap(); req.oncomplete = _afterConnect; req.address = address; req.port = port; req.localAddress = localAddress; req.localPort = localPort;
if (addressType === 4) { err = (socket._handle as TCP).connect(req, address, port); } else { err = (socket._handle as TCP).connect6(req, address, port); } } else { const req = new PipeConnectWrap(); req.oncomplete = _afterConnect; req.address = address;
err = (socket._handle as Pipe).connect(req, address); }
if (err) { let details = "";
const sockname = socket._getsockname();
if (sockname) { details = `${sockname.address}:${sockname.port}`; }
const ex = exceptionWithHostPort(err, "connect", address, port, details); socket.destroy(ex); }}
// Provide a better error message when we call end() as a result// of the other side sending a FIN. The standard "write after end"// is overly vague, and makes it seem like the user's code is to blame.function _writeAfterFIN( this: Socket, // deno-lint-ignore no-explicit-any chunk: any, encoding?: | BufferEncoding | null | ((error: Error | null | undefined) => void), cb?: (error: Error | null | undefined) => void,): boolean { if (!this.writableEnded) { return Duplex.prototype.write.call( this, chunk, encoding as BufferEncoding | null, // @ts-expect-error Using `call` seem to be interfering with the overload for write cb, ); }
if (typeof encoding === "function") { cb = encoding; encoding = null; }
const err = genericNodeError( "This socket has been ended by the other party", { code: "EPIPE" }, );
if (typeof cb === "function") { defaultTriggerAsyncIdScope(this[asyncIdSymbol], nextTick, cb, err); }
if (this._server) { nextTick(() => this.destroy(err)); } else { this.destroy(err); }
return false;}
function _tryReadStart(socket: Socket) { // Not already reading, start the flow. debug("Socket._handle.readStart"); socket._handle!.reading = true; const err = socket._handle!.readStart();
if (err) { socket.destroy(errnoException(err, "read")); }}
// Called when the "end" event is emitted.function _onReadableStreamEnd(this: Socket) { if (!this.allowHalfOpen) { this.write = _writeAfterFIN; }}
// Called when creating new Socket, or when re-using a closed Socketfunction _initSocketHandle(socket: Socket) { socket._undestroy(); socket._sockname = undefined;
// Handle creation may be deferred to bind() or connect() time. if (socket._handle) { // deno-lint-ignore no-explicit-any (socket._handle as any)[ownerSymbol] = socket; socket._handle.onread = onStreamRead; socket[asyncIdSymbol] = _getNewAsyncId(socket._handle);
let userBuf = socket[kBuffer];
if (userBuf) { const bufGen = socket[kBufferGen];
if (bufGen !== null) { userBuf = bufGen();
if (!isUint8Array(userBuf)) { return; }
socket[kBuffer] = userBuf; }
socket._handle.useUserBuffer(userBuf); } }}
function _lookupAndConnect( self: Socket, options: TcpSocketConnectOptions,) { const { localAddress, localPort } = options; const host = options.host || "localhost"; let { port } = options;
if (localAddress && !isIP(localAddress)) { throw new ERR_INVALID_IP_ADDRESS(localAddress); }
if (localPort) { validateNumber(localPort, "options.localPort"); }
if (typeof port !== "undefined") { if (typeof port !== "number" && typeof port !== "string") { throw new ERR_INVALID_ARG_TYPE( "options.port", ["number", "string"], port, ); }
validatePort(port); }
port |= 0;
// If host is an IP, skip performing a lookup const addressType = isIP(host); if (addressType) { defaultTriggerAsyncIdScope(self[asyncIdSymbol], nextTick, () => { if (self.connecting) { defaultTriggerAsyncIdScope( self[asyncIdSymbol], _internalConnect, self, host, port, addressType, localAddress, localPort, ); } });
return; }
if (options.lookup !== undefined) { validateFunction(options.lookup, "options.lookup"); }
const dnsOpts = { family: options.family, hints: options.hints || 0, };
if ( !isWindows && dnsOpts.family !== 4 && dnsOpts.family !== 6 && dnsOpts.hints === 0 ) { dnsOpts.hints = ADDRCONFIG; }
debug("connect: find host", host); debug("connect: dns options", dnsOpts); self._host = host; const lookup = options.lookup || dnsLookup;
defaultTriggerAsyncIdScope(self[asyncIdSymbol], function () { lookup( host, dnsOpts, function emitLookup( err: ErrnoException | null, ip: string, addressType: number, ) { self.emit("lookup", err, ip, addressType, host);
// It's possible we were destroyed while looking this up. // XXX it would be great if we could cancel the promise returned by // the look up. if (!self.connecting) { return; }
if (err) { // net.createConnection() creates a net.Socket object and immediately // calls net.Socket.connect() on it (that's us). There are no event // listeners registered yet so defer the error event to the next tick. nextTick(_connectErrorNT, self, err); } else if (!isIP(ip)) { err = new ERR_INVALID_IP_ADDRESS(ip);
nextTick(_connectErrorNT, self, err); } else if (addressType !== 4 && addressType !== 6) { err = new ERR_INVALID_ADDRESS_FAMILY( `${addressType}`, options.host!, options.port, );
nextTick(_connectErrorNT, self, err); } else { self._unrefTimer();
defaultTriggerAsyncIdScope( self[asyncIdSymbol], _internalConnect, self, ip, port, addressType, localAddress, localPort, ); } }, ); });}
function _afterShutdown(this: ShutdownWrap<TCP>) { // deno-lint-ignore no-explicit-any const self: any = this.handle[ownerSymbol];
debug("afterShutdown destroyed=%j", self.destroyed, self._readableState);
this.callback();}
function _emitCloseNT(s: Socket | Server) { debug("SERVER: emit close"); s.emit("close");}
/** * This class is an abstraction of a TCP socket or a streaming `IPC` endpoint * (uses named pipes on Windows, and Unix domain sockets otherwise). It is also * an `EventEmitter`. * * A `net.Socket` can be created by the user and used directly to interact with * a server. For example, it is returned by `createConnection`, * so the user can use it to talk to the server. * * It can also be created by Node.js and passed to the user when a connection * is received. For example, it is passed to the listeners of a `"connection"` event emitted on a `Server`, so the user can use * it to interact with the client. */export class Socket extends Duplex { // Problem with this is that users can supply their own handle, that may not // have `handle.getAsyncId()`. In this case an `[asyncIdSymbol]` should // probably be supplied by `async_hooks`. [asyncIdSymbol] = -1;
[kHandle]: Handle | null = null; [kSetNoDelay] = false; [kLastWriteQueueSize] = 0; // deno-lint-ignore no-explicit-any [kTimeout]: any = null; [kBuffer]: Uint8Array | boolean | null = null; [kBufferCb]: OnReadOptions["callback"] | null = null; [kBufferGen]: (() => Uint8Array) | null = null;
// Used after `.destroy()` [kBytesRead] = 0; [kBytesWritten] = 0;
// Reserved properties server = null; // deno-lint-ignore no-explicit-any _server: any = null;
_peername?: AddressInfo | Record<string, never>; _sockname?: AddressInfo | Record<string, never>; _pendingData: Uint8Array | string | null = null; _pendingEncoding = ""; _host: string | null = null; // deno-lint-ignore no-explicit-any _parent: any = null;
constructor(options: SocketOptions | number) { if (typeof options === "number") { // Legacy interface. options = { fd: options }; } else { options = { ...options }; }
// Default to *not* allowing half open sockets. options.allowHalfOpen = Boolean(options.allowHalfOpen); // For backwards compat do not emit close on destroy. options.emitClose = false; options.autoDestroy = true; // Handle strings directly. options.decodeStrings = false;
super(options);
if (options.handle) { this._handle = options.handle; this[asyncIdSymbol] = _getNewAsyncId(this._handle); } else if (options.fd !== undefined) { // REF: https://github.com/denoland/deno/issues/6529 notImplemented("net.Socket.prototype.constructor with fd option"); }
const onread = options.onread;
if ( onread !== null && typeof onread === "object" && (isUint8Array(onread.buffer) || typeof onread.buffer === "function") && typeof onread.callback === "function" ) { if (typeof onread.buffer === "function") { this[kBuffer] = true; this[kBufferGen] = onread.buffer; } else { this[kBuffer] = onread.buffer; }
this[kBufferCb] = onread.callback; }
this.on("end", _onReadableStreamEnd);
_initSocketHandle(this);
// If we have a handle, then start the flow of data into the // buffer. If not, then this will happen when we connect. if (this._handle && options.readable !== false) { if (options.pauseOnCreate) { // Stop the handle from reading and pause the stream this._handle.reading = false; this._handle.readStop(); // @ts-expect-error This property shouldn't be modified this.readableFlowing = false; } else if (!options.manualStart) { this.read(0); } } }
/** * Initiate a connection on a given socket. * * Possible signatures: * * - `socket.connect(options[, connectListener])` * - `socket.connect(path[, connectListener])` for `IPC` connections. * - `socket.connect(port[, host][, connectListener])` for TCP connections. * - Returns: `net.Socket` The socket itself. * * This function is asynchronous. When the connection is established, the `"connect"` event will be emitted. If there is a problem connecting, * instead of a `"connect"` event, an `"error"` event will be emitted with * the error passed to the `"error"` listener. * The last parameter `connectListener`, if supplied, will be added as a listener * for the `"connect"` event **once**. * * This function should only be used for reconnecting a socket after `"close"` has been emitted or otherwise it may lead to undefined * behavior. */ connect( options: SocketConnectOptions | NormalizedArgs, connectionListener?: ConnectionListener, ): this; connect( port: number, host: string, connectionListener?: ConnectionListener, ): this; connect(port: number, connectionListener?: ConnectionListener): this; connect(path: string, connectionListener?: ConnectionListener): this; connect(...args: unknown[]): this { let normalized: NormalizedArgs;
// If passed an array, it's treated as an array of arguments that have // already been normalized (so we don't normalize more than once). This has // been solved before in https://github.com/nodejs/node/pull/12342, but was // reverted as it had unintended side effects. if ( Array.isArray(args[0]) && (args[0] as unknown as NormalizedArgs)[normalizedArgsSymbol] ) { normalized = args[0] as unknown as NormalizedArgs; } else { normalized = _normalizeArgs(args); }
const options = normalized[0]; const cb = normalized[1];
// `options.port === null` will be checked later. if ( (options as TcpSocketConnectOptions).port === undefined && (options as IpcSocketConnectOptions).path == null ) { throw new ERR_MISSING_ARGS(["options", "port", "path"]); }
if (this.write !== Socket.prototype.write) { this.write = Socket.prototype.write; }
if (this.destroyed) { this._handle = null; this._peername = undefined; this._sockname = undefined; }
const { path } = options as IpcNetConnectOptions; const pipe = _isPipe(options); debug("pipe", pipe, path);
if (!this._handle) { this._handle = pipe ? new Pipe(PipeConstants.SOCKET) : new TCP(TCPConstants.SOCKET);
_initSocketHandle(this); }
if (cb !== null) { this.once("connect", cb); }
this._unrefTimer();
this.connecting = true;
if (pipe) { validateString(path, "options.path"); defaultTriggerAsyncIdScope( this[asyncIdSymbol], _internalConnect, this, path, ); } else { _lookupAndConnect(this, options as TcpSocketConnectOptions); }
return this; }
/** * Pauses the reading of data. That is, `"data"` events will not be emitted. * Useful to throttle back an upload. * * @return The socket itself. */ override pause(): this { if ( this[kBuffer] && !this.connecting && this._handle && this._handle.reading ) { this._handle.reading = false;
if (!this.destroyed) { const err = this._handle.readStop();
if (err) { this.destroy(errnoException(err, "read")); } } }
return Duplex.prototype.pause.call(this) as unknown as this; }
/** * Resumes reading after a call to `socket.pause()`. * * @return The socket itself. */ override resume(): this { if ( this[kBuffer] && !this.connecting && this._handle && !this._handle.reading ) { _tryReadStart(this); }
return Duplex.prototype.resume.call(this) as this; }
/** * Sets the socket to timeout after `timeout` milliseconds of inactivity on * the socket. By default `net.Socket` do not have a timeout. * * When an idle timeout is triggered the socket will receive a `"timeout"` event but the connection will not be severed. The user must manually call `socket.end()` or `socket.destroy()` to * end the connection. * * ```ts * import { createRequire } from "https://deno.land/std@$STD_VERSION/node/module.ts"; * * const require = createRequire(import.meta.url); * const net = require("net"); * * const socket = new net.Socket(); * socket.setTimeout(3000); * socket.on("timeout", () => { * console.log("socket timeout"); * socket.end(); * }); * ``` * * If `timeout` is `0`, then the existing idle timeout is disabled. * * The optional `callback` parameter will be added as a one-time listener for the `"timeout"` event. * @return The socket itself. */ setTimeout = setStreamTimeout;
/** * Enable/disable the use of Nagle's algorithm. * * When a TCP connection is created, it will have Nagle's algorithm enabled. * * Nagle's algorithm delays data before it is sent via the network. It attempts * to optimize throughput at the expense of latency. * * Passing `true` for `noDelay` or not passing an argument will disable Nagle's * algorithm for the socket. Passing `false` for `noDelay` will enable Nagle's * algorithm. * * @param noDelay * @return The socket itself. */ setNoDelay(noDelay?: boolean): this { if (!this._handle) { this.once( "connect", noDelay ? this.setNoDelay : () => this.setNoDelay(noDelay), );
return this; }
// Backwards compatibility: assume true when `noDelay` is omitted const newValue = noDelay === undefined ? true : !!noDelay;
if ( "setNoDelay" in this._handle && this._handle.setNoDelay && newValue !== this[kSetNoDelay] ) { this[kSetNoDelay] = newValue; this._handle.setNoDelay(newValue); }
return this; }
/** * Enable/disable keep-alive functionality, and optionally set the initial * delay before the first keepalive probe is sent on an idle socket. * * Set `initialDelay` (in milliseconds) to set the delay between the last * data packet received and the first keepalive probe. Setting `0` for`initialDelay` will leave the value unchanged from the default * (or previous) setting. * * Enabling the keep-alive functionality will set the following socket options: * * - `SO_KEEPALIVE=1` * - `TCP_KEEPIDLE=initialDelay` * - `TCP_KEEPCNT=10` * - `TCP_KEEPINTVL=1` * * @param enable * @param initialDelay * @return The socket itself. */ setKeepAlive(enable: boolean, initialDelay?: number): this { if (!this._handle) { this.once("connect", () => this.setKeepAlive(enable, initialDelay));
return this; }
if ("setKeepAlive" in this._handle) { this._handle.setKeepAlive(enable, ~~(initialDelay! / 1000)); }
return this; }
/** * Returns the bound `address`, the address `family` name and `port` of the * socket as reported by the operating system:`{ port: 12346, family: "IPv4", address: "127.0.0.1" }` */ address(): AddressInfo | Record<string, never> { return this._getsockname(); }
/** * Calling `unref()` on a socket will allow the program to exit if this is the only * active socket in the event system. If the socket is already `unref`ed calling`unref()` again will have no effect. * * @return The socket itself. */ unref(): this { if (!this._handle) { this.once("connect", this.unref);
return this; }
if (typeof this._handle.unref === "function") { this._handle.unref(); }
return this; }
/** * Opposite of `unref()`, calling `ref()` on a previously `unref`ed socket will_not_ let the program exit if it's the only socket left (the default behavior). * If the socket is `ref`ed calling `ref` again will have no effect. * * @return The socket itself. */ ref(): this { if (!this._handle) { this.once("connect", this.ref);
return this; }
if (typeof this._handle.ref === "function") { this._handle.ref(); }
return this; }
/** * This property shows the number of characters buffered for writing. The buffer * may contain strings whose length after encoding is not yet known. So this number * is only an approximation of the number of bytes in the buffer. * * `net.Socket` has the property that `socket.write()` always works. This is to * help users get up and running quickly. The computer cannot always keep up * with the amount of data that is written to a socket. The network connection * simply might be too slow. Node.js will internally queue up the data written to a * socket and send it out over the wire when it is possible. * * The consequence of this internal buffering is that memory may grow. * Users who experience large or growing `bufferSize` should attempt to * "throttle" the data flows in their program with `socket.pause()` and `socket.resume()`. * * @deprecated Use `writableLength` instead. */ get bufferSize(): number { if (this._handle) { return this.writableLength; }
return 0; }
/** * The amount of received bytes. */ get bytesRead(): number { return this._handle ? this._handle.bytesRead : this[kBytesRead]; }
/** * The amount of bytes sent. */ get bytesWritten(): number | undefined { let bytes = this._bytesDispatched; const data = this._pendingData; const encoding = this._pendingEncoding; const writableBuffer = this.writableBuffer;
if (!writableBuffer) { return undefined; }
for (const el of writableBuffer) { bytes += el!.chunk instanceof Buffer ? el!.chunk.length : Buffer.byteLength(el!.chunk, el!.encoding); }
if (Array.isArray(data)) { // Was a writev, iterate over chunks to get total length for (let i = 0; i < data.length; i++) { const chunk = data[i];
// deno-lint-ignore no-explicit-any if ((data as any).allBuffers || chunk instanceof Buffer) { bytes += chunk.length; } else { bytes += Buffer.byteLength(chunk.chunk, chunk.encoding); } } } else if (data) { // Writes are either a string or a Buffer. if (typeof data !== "string") { bytes += (data as Buffer).length; } else { bytes += Buffer.byteLength(data, encoding); } }
return bytes; }
/** * If `true`,`socket.connect(options[, connectListener])` was * called and has not yet finished. It will stay `true` until the socket becomes * connected, then it is set to `false` and the `"connect"` event is emitted. Note * that the `socket.connect(options[, connectListener])` callback is a listener for the `"connect"` event. */ connecting = false;
/** * The string representation of the local IP address the remote client is * connecting on. For example, in a server listening on `"0.0.0.0"`, if a client * connects on `"192.168.1.1"`, the value of `socket.localAddress` would be`"192.168.1.1"`. */ get localAddress(): string { return this._getsockname().address; }
/** * The numeric representation of the local port. For example, `80` or `21`. */ get localPort(): number { return this._getsockname().port; }
/** * The string representation of the local IP family. `"IPv4"` or `"IPv6"`. */ get localFamily(): string | undefined { return this._getsockname().family; }
/** * The string representation of the remote IP address. For example,`"74.125.127.100"` or `"2001:4860:a005::68"`. Value may be `undefined` if * the socket is destroyed (for example, if the client disconnected). */ get remoteAddress(): string | undefined { return this._getpeername().address; }
/** * The string representation of the remote IP family. `"IPv4"` or `"IPv6"`. */ get remoteFamily(): string | undefined { const { family } = this._getpeername();
return family ? `IPv${family}` : family; }
/** * The numeric representation of the remote port. For example, `80` or `21`. */ get remotePort(): number | undefined { return this._getpeername().port; }
get pending(): boolean { return !this._handle || this.connecting; }
get readyState(): string { if (this.connecting) { return "opening"; } else if (this.readable && this.writable) { return "open"; } else if (this.readable && !this.writable) { return "readOnly"; } else if (!this.readable && this.writable) { return "writeOnly"; } return "closed"; }
/** * Half-closes the socket. i.e., it sends a FIN packet. It is possible the * server will still send some data. * * See `writable.end()` for further details. * * @param encoding Only used when data is `string`. * @param cb Optional callback for when the socket is finished. * @return The socket itself. */ override end(cb?: () => void): this; override end(buffer: Uint8Array | string, cb?: () => void): this; override end( data: Uint8Array | string, encoding?: Encodings, cb?: () => void, ): this; override end( data?: Uint8Array | string | (() => void), encoding?: Encodings | (() => void), cb?: () => void, ): this { Duplex.prototype.end.call(this, data, encoding as Encodings, cb); DTRACE_NET_STREAM_END(this);
return this; }
/** * @param size Optional argument to specify how much data to read. */ override read( size?: number, ): string | Uint8Array | Buffer | null | undefined { if ( this[kBuffer] && !this.connecting && this._handle && !this._handle.reading ) { _tryReadStart(this); }
return Duplex.prototype.read.call(this, size); }
destroySoon() { if (this.writable) { this.end(); }
if (this.writableFinished) { this.destroy(); } else { this.once("finish", this.destroy); } }
_unrefTimer() { // deno-lint-ignore no-this-alias for (let s = this; s !== null; s = s._parent) { if (s[kTimeout]) { s[kTimeout].refresh(); } } }
// The user has called .end(), and all the bytes have been // sent out to the other side. // deno-lint-ignore no-explicit-any override _final(cb: any): any { // If still connecting - defer handling `_final` until 'connect' will happen if (this.pending) { debug("_final: not yet connected"); return this.once("connect", () => this._final(cb)); }
if (!this._handle) { return cb(); }
debug("_final: not ended, call shutdown()");
const req = new ShutdownWrap<Handle>(); req.oncomplete = _afterShutdown; req.handle = this._handle; req.callback = cb; const err = this._handle.shutdown(req);
if (err === 1 || err === codeMap.get("ENOTCONN")) { // synchronous finish return cb(); } else if (err !== 0) { return cb(errnoException(err, "shutdown")); } }
_onTimeout() { const handle = this._handle; const lastWriteQueueSize = this[kLastWriteQueueSize];
if (lastWriteQueueSize > 0 && handle) { // `lastWriteQueueSize !== writeQueueSize` means there is // an active write in progress, so we suppress the timeout. const { writeQueueSize } = handle;
if (lastWriteQueueSize !== writeQueueSize) { this[kLastWriteQueueSize] = writeQueueSize; this._unrefTimer();
return; } }
debug("_onTimeout"); this.emit("timeout"); }
override _read(size?: number) { debug("_read"); if (this.connecting || !this._handle) { debug("_read wait for connection"); this.once("connect", () => this._read(size)); } else if (!this._handle.reading) { _tryReadStart(this); } }
override _destroy(exception: Error | null, cb: (err: Error | null) => void) { debug("destroy"); this.connecting = false;
// deno-lint-ignore no-this-alias for (let s = this; s !== null; s = s._parent) { clearTimeout(s[kTimeout]); }
debug("close"); if (this._handle) { debug("close handle"); const isException = exception ? true : false; // `bytesRead` and `kBytesWritten` should be accessible after `.destroy()` this[kBytesRead] = this._handle.bytesRead; this[kBytesWritten] = this._handle.bytesWritten;
this._handle.close(() => { this._handle!.onread = _noop; this._handle = null; this._sockname = undefined;
debug("emit close"); this.emit("close", isException); }); cb(exception); } else { cb(exception); nextTick(_emitCloseNT, this); }
if (this._server) { debug("has server"); this._server._connections--;
if (this._server._emitCloseIfDrained) { this._server._emitCloseIfDrained(); } } }
_getpeername(): AddressInfo | Record<string, never> { if (!this._handle || !("getpeername" in this._handle) || this.connecting) { return this._peername || {}; } else if (!this._peername) { this._peername = {}; this._handle.getpeername(this._peername); }
return this._peername; }
_getsockname(): AddressInfo | Record<string, never> { if (!this._handle || !("getsockname" in this._handle)) { return {}; } else if (!this._sockname) { this._sockname = {}; this._handle.getsockname(this._sockname); }
return this._sockname; }
_writeGeneric( writev: boolean, // deno-lint-ignore no-explicit-any data: any, encoding: string, cb: (error?: Error | null) => void, ) { // If we are still connecting, then buffer this for later. // The Writable logic will buffer up any more writes while // waiting for this one to be done. if (this.connecting) { this._pendingData = data; this._pendingEncoding = encoding; this.once("connect", function connect(this: Socket) { this._writeGeneric(writev, data, encoding, cb); });
return; }
this._pendingData = null; this._pendingEncoding = "";
if (!this._handle) { cb(new ERR_SOCKET_CLOSED());
return false; }
this._unrefTimer();
let req;
if (writev) { req = writevGeneric(this, data, cb); } else { req = writeGeneric(this, data, encoding, cb); } if (req.async) { this[kLastWriteQueueSize] = req.bytes; } }
// @ts-ignore Duplex defining as a property when want a method. _writev( // deno-lint-ignore no-explicit-any chunks: Array<{ chunk: any; encoding: string }>, cb: (error?: Error | null) => void, ) { this._writeGeneric(true, chunks, "", cb); }
override _write( // deno-lint-ignore no-explicit-any data: any, encoding: string, cb: (error?: Error | null) => void, ) { this._writeGeneric(false, data, encoding, cb); }
[kAfterAsyncWrite]() { this[kLastWriteQueueSize] = 0; }
get [kUpdateTimer]() { return this._unrefTimer; }
get _connecting(): boolean { return this.connecting; }
// Legacy alias. Having this is probably being overly cautious, but it doesn't // really hurt anyone either. This can probably be removed safely if desired. get _bytesDispatched(): number { return this._handle ? this._handle.bytesWritten : this[kBytesWritten]; }
get _handle(): Handle | null { return this[kHandle]; }
set _handle(v: Handle | null) { this[kHandle] = v; }}
export const Stream = Socket;
// Target API://// let s = net.connect({port: 80, host: 'google.com'}, function() {// ...// });//// There are various forms://// connect(options, [cb])// connect(port, [host], [cb])// connect(path, [cb]);//export function connect( options: NetConnectOptions, connectionListener?: () => void,): Socket;export function connect( port: number, host?: string, connectionListener?: () => void,): Socket;export function connect(path: string, connectionListener?: () => void): Socket;export function connect(...args: unknown[]) { const normalized = _normalizeArgs(args); const options = normalized[0] as Partial<NetConnectOptions>; debug("createConnection", normalized); const socket = new Socket(options);
if (netClientSocketChannel.hasSubscribers) { netClientSocketChannel.publish({ socket, }); }
if (options.timeout) { socket.setTimeout(options.timeout); }
return socket.connect(normalized);}
export const createConnection = connect;
export interface ListenOptions extends Abortable { fd?: number; port?: number | undefined; host?: string | undefined; backlog?: number | undefined; path?: string | undefined; exclusive?: boolean | undefined; readableAll?: boolean | undefined; writableAll?: boolean | undefined; /** * Default: `false` */ ipv6Only?: boolean | undefined;}
type ConnectionListener = (socket: Socket) => void;
interface ServerOptions { /** * Indicates whether half-opened TCP connections are allowed. * Default: false */ allowHalfOpen?: boolean | undefined; /** * Indicates whether the socket should be paused on incoming connections. * Default: false */ pauseOnConnect?: boolean | undefined;}
function _isServerSocketOptions( options: unknown,): options is null | undefined | ServerOptions { return ( options === null || typeof options === "undefined" || typeof options === "object" );}
function _isConnectionListener( connectionListener: unknown,): connectionListener is ConnectionListener { return typeof connectionListener === "function";}
function _getFlags(ipv6Only?: boolean): number { return ipv6Only === true ? TCPConstants.UV_TCP_IPV6ONLY : 0;}
function _listenInCluster( server: Server, address: string | null, port: number | null, addressType: number | null, backlog: number, fd?: number | null, exclusive?: boolean, flags?: number,) { exclusive = !!exclusive;
// TODO(cmorten): here we deviate somewhat from the Node implementation which // makes use of the https://nodejs.org/api/cluster.html module to run servers // across a "cluster" of Node processes to take advantage of multi-core // systems. // // Though Deno has has a Worker capability from which we could simulate this, // for now we assert that we are _always_ on the primary process. const isPrimary = true;
if (isPrimary || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags);
return; }}
function _lookupAndListen( server: Server, port: number, address: string, backlog: number, exclusive: boolean, flags: number,) { dnsLookup(address, function doListen(err, ip, addressType) { if (err) { server.emit("error", err); } else { addressType = ip ? addressType : 4;
_listenInCluster( server, ip, port, addressType, backlog, null, exclusive, flags, ); } });}
function _addAbortSignalOption(server: Server, options: ListenOptions) { if (options?.signal === undefined) { return; }
validateAbortSignal(options.signal, "options.signal");
const { signal } = options;
const onAborted = () => { server.close(); };
if (signal.aborted) { nextTick(onAborted); } else { signal.addEventListener("abort", onAborted); server.once("close", () => signal.removeEventListener("abort", onAborted)); }}
// Returns handle if it can be created, or error code if it can'texport function _createServerHandle( address: string | null, port: number | null, addressType: number | null, fd?: number | null, flags?: number,): Handle | number { let err = 0; // Assign handle in listen, and clean up if bind or listen fails let handle; let isTCP = false;
if (typeof fd === "number" && fd >= 0) { try { handle = _createHandle(fd, true); } catch (e) { // Not a fd we can listen on. This will trigger an error. debug("listen invalid fd=%d:", fd, (e as Error).message);
return codeMap.get("EINVAL")!; }
err = handle.open(fd);
if (err) { return err; }
assert(!address && !port); } else if (port === -1 && addressType === -1) { handle = new Pipe(PipeConstants.SERVER);
if (isWindows) { const instances = Number.parseInt( Deno.env.get("NODE_PENDING_PIPE_INSTANCES") ?? "", );
if (!Number.isNaN(instances)) { handle.setPendingInstances!(instances); } } } else { handle = new TCP(TCPConstants.SERVER); isTCP = true; }
if (address || port || isTCP) { debug("bind to", address || "any");
if (!address) { // TODO: differs from Node which tries to bind to IPv6 first when no // address is provided. // // Forcing IPv4 as a workaround for Deno not aligning with Node on // implicit binding on Windows. // // REF: https://github.com/denoland/deno/issues/10762
// Try binding to ipv6 first // err = (handle as TCP).bind6(DEFAULT_IPV6_ADDR, port ?? 0, flags ?? 0);
// if (err) { // handle.close();
// Fallback to ipv4 return _createServerHandle(DEFAULT_IPV4_ADDR, port, 4, null, flags); // } } else if (addressType === 6) { err = (handle as TCP).bind6(address, port ?? 0, flags ?? 0); } else { err = (handle as TCP).bind(address, port ?? 0); } }
if (err) { handle.close();
return err; }
return handle;}
function _emitErrorNT(server: Server, err: Error) { server.emit("error", err);}
function _emitListeningNT(server: Server) { // Ensure handle hasn't closed if (server._handle) { server.emit("listening"); }}
// deno-lint-ignore no-explicit-anyfunction _onconnection(this: any, err: number, clientHandle?: Handle) { // deno-lint-ignore no-this-alias const handle = this; const self = handle[ownerSymbol];
debug("onconnection");
if (err) { self.emit("error", errnoException(err, "accept"));
return; }
if (self.maxConnections && self._connections >= self.maxConnections) { clientHandle!.close();
return; }
const socket = new Socket({ handle: clientHandle, allowHalfOpen: self.allowHalfOpen, pauseOnCreate: self.pauseOnConnect, readable: true, writable: true, });
// TODO: implement noDelay and setKeepAlive
self._connections++; socket.server = self; socket._server = self;
DTRACE_NET_SERVER_CONNECTION(socket); self.emit("connection", socket);
if (netServerSocketChannel.hasSubscribers) { netServerSocketChannel.publish({ socket, }); }}
function _setupListenHandle( this: Server, address: string | null, port: number | null, addressType: number | null, backlog: number, fd?: number | null, flags?: number,) { debug("setupListenHandle", address, port, addressType, backlog, fd);
// If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug("setupListenHandle: have a handle already"); } else { debug("setupListenHandle: create a handle");
let rval = null;
// Try to bind to the unspecified IPv6 address, see if IPv6 is available if (!address && typeof fd !== "number") { // TODO: differs from Node which tries to bind to IPv6 first when no // address is provided. // // Forcing IPv4 as a workaround for Deno not aligning with Node on // implicit binding on Windows. // // REF: https://github.com/denoland/deno/issues/10762 // rval = _createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags);
// if (typeof rval === "number") { // rval = null; address = DEFAULT_IPV4_ADDR; addressType = 4; // } else { // address = DEFAULT_IPV6_ADDR; // addressType = 6; // } }
if (rval === null) { rval = _createServerHandle(address, port, addressType, fd, flags); }
if (typeof rval === "number") { const error = uvExceptionWithHostPort(rval, "listen", address, port); nextTick(_emitErrorNT, this, error);
return; }
this._handle = rval; }
this[asyncIdSymbol] = _getNewAsyncId(this._handle); this._handle.onconnection = _onconnection; this._handle[ownerSymbol] = this;
// Use a backlog of 512 entries. We pass 511 to the listen() call because // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1); // which will thus give us a backlog of 512 entries. const err = this._handle.listen(backlog || 511);
if (err) { const ex = uvExceptionWithHostPort(err, "listen", address, port); this._handle.close(); this._handle = null;
defaultTriggerAsyncIdScope( this[asyncIdSymbol], nextTick, _emitErrorNT, this, ex, );
return; }
// Generate connection key, this should be unique to the connection this._connectionKey = addressType + ":" + address + ":" + port;
// Unref the handle if the server was unref'ed prior to listening if (this._unref) { this.unref(); }
defaultTriggerAsyncIdScope( this[asyncIdSymbol], nextTick, _emitListeningNT, this, );}
/** This class is used to create a TCP or IPC server. */export class Server extends EventEmitter { [asyncIdSymbol] = -1;
allowHalfOpen = false; pauseOnConnect = false;
// deno-lint-ignore no-explicit-any _handle: any = null; _connections = 0; _usingWorkers = false; // deno-lint-ignore no-explicit-any _workers: any[] = []; _unref = false; _pipeName?: string; _connectionKey?: string;
/** * `net.Server` is an `EventEmitter` with the following events: * * - `"close"` - Emitted when the server closes. If connections exist, this * event is not emitted until all connections are ended. * - `"connection"` - Emitted when a new connection is made. `socket` is an * instance of `net.Socket`. * - `"error"` - Emitted when an error occurs. Unlike `net.Socket`, the * `"close"` event will not be emitted directly following this event unless * `server.close()` is manually called. See the example in discussion of * `server.listen()`. * - `"listening"` - Emitted when the server has been bound after calling * `server.listen()`. */ constructor(connectionListener?: ConnectionListener); constructor(options?: ServerOptions, connectionListener?: ConnectionListener); constructor( options?: ServerOptions | ConnectionListener, connectionListener?: ConnectionListener, ) { super();
if (_isConnectionListener(options)) { this.on("connection", options); } else if (_isServerSocketOptions(options)) { this.allowHalfOpen = options?.allowHalfOpen || false; this.pauseOnConnect = !!options?.pauseOnConnect;
if (_isConnectionListener(connectionListener)) { this.on("connection", connectionListener); } } else { throw new ERR_INVALID_ARG_TYPE("options", "Object", options); } }
/** * Start a server listening for connections. A `net.Server` can be a TCP or * an `IPC` server depending on what it listens to. * * Possible signatures: * * - `server.listen(handle[, backlog][, callback])` * - `server.listen(options[, callback])` * - `server.listen(path[, backlog][, callback])` for `IPC` servers * - `server.listen([port[, host[, backlog]]][, callback])` for TCP servers * * This function is asynchronous. When the server starts listening, the `'listening'` event will be emitted. The last parameter `callback`will be added as a listener for the `'listening'` * event. * * All `listen()` methods can take a `backlog` parameter to specify the maximum * length of the queue of pending connections. The actual length will be determined * by the OS through sysctl settings such as `tcp_max_syn_backlog` and `somaxconn` on Linux. The default value of this parameter is 511 (not 512). * * All `Socket` are set to `SO_REUSEADDR` (see [`socket(7)`](https://man7.org/linux/man-pages/man7/socket.7.html) for * details). * * The `server.listen()` method can be called again if and only if there was an * error during the first `server.listen()` call or `server.close()` has been * called. Otherwise, an `ERR_SERVER_ALREADY_LISTEN` error will be thrown. * * One of the most common errors raised when listening is `EADDRINUSE`. * This happens when another server is already listening on the requested`port`/`path`/`handle`. One way to handle this would be to retry * after a certain amount of time: * * ```ts * import { createRequire } from "https://deno.land/std@$STD_VERSION/node/module.ts"; * * const require = createRequire(import.meta.url); * const net = require("net"); * * const PORT = 3000; * const HOST = "127.0.0.1"; * const server = new net.Server(); * * server.on("error", (e: Error & { code: string; }) => { * if (e.code === "EADDRINUSE") { * console.log("Address in use, retrying..."); * setTimeout(() => { * server.close(); * server.listen(PORT, HOST); * }, 1000); * } * }); * ``` */ listen( port?: number, hostname?: string, backlog?: number, listeningListener?: () => void, ): this; listen( port?: number, hostname?: string, listeningListener?: () => void, ): this; listen(port?: number, backlog?: number, listeningListener?: () => void): this; listen(port?: number, listeningListener?: () => void): this; listen(path: string, backlog?: number, listeningListener?: () => void): this; listen(path: string, listeningListener?: () => void): this; listen(options: ListenOptions, listeningListener?: () => void): this; // deno-lint-ignore no-explicit-any listen(handle: any, backlog?: number, listeningListener?: () => void): this; // deno-lint-ignore no-explicit-any listen(handle: any, listeningListener?: () => void): this; listen(...args: unknown[]): this { const normalized = _normalizeArgs(args); let options = normalized[0] as Partial<ListenOptions>; const cb = normalized[1];
if (this._handle) { throw new ERR_SERVER_ALREADY_LISTEN(); }
if (cb !== null) { this.once("listening", cb); }
const backlogFromArgs: number = // (handle, backlog) or (path, backlog) or (port, backlog) _toNumber(args.length > 1 && args[1]) || (_toNumber(args.length > 2 && args[2]) as number); // (port, host, backlog)
// deno-lint-ignore no-explicit-any options = (options as any)._handle || (options as any).handle || options; const flags = _getFlags(options.ipv6Only);
// (handle[, backlog][, cb]) where handle is an object with a handle if (options instanceof TCP) { this._handle = options; this[asyncIdSymbol] = this._handle.getAsyncId();
_listenInCluster(this, null, -1, -1, backlogFromArgs);
return this; }
_addAbortSignalOption(this, options);
// (handle[, backlog][, cb]) where handle is an object with a fd if (typeof options.fd === "number" && options.fd >= 0) { _listenInCluster(this, null, null, null, backlogFromArgs, options.fd);
return this; }
// ([port][, host][, backlog][, cb]) where port is omitted, // that is, listen(), listen(null), listen(cb), or listen(null, cb) // or (options[, cb]) where options.port is explicitly set as undefined or // null, bind to an arbitrary unused port if ( args.length === 0 || typeof args[0] === "function" || (typeof options.port === "undefined" && "port" in options) || options.port === null ) { options.port = 0; }
// ([port][, host][, backlog][, cb]) where port is specified // or (options[, cb]) where options.port is specified // or if options.port is normalized as 0 before let backlog;
if (typeof options.port === "number" || typeof options.port === "string") { validatePort(options.port, "options.port"); backlog = options.backlog || backlogFromArgs;
// start TCP server listening on host:port if (options.host) { _lookupAndListen( this, options.port | 0, options.host, backlog, !!options.exclusive, flags, ); } else { // Undefined host, listens on unspecified address // Default addressType 4 will be used to search for primary server _listenInCluster( this, null, options.port | 0, 4, backlog, undefined, options.exclusive, ); }
return this; }
// (path[, backlog][, cb]) or (options[, cb]) // where path or options.path is a UNIX domain socket or Windows pipe if (options.path && _isPipeName(options.path)) { const pipeName = (this._pipeName = options.path); backlog = options.backlog || backlogFromArgs;
_listenInCluster( this, pipeName, -1, -1, backlog, undefined, options.exclusive, );
if (!this._handle) { // Failed and an error shall be emitted in the next tick. // Therefore, we directly return. return this; }
let mode = 0;
if (options.readableAll === true) { mode |= PipeConstants.UV_READABLE; }
if (options.writableAll === true) { mode |= PipeConstants.UV_WRITABLE; }
if (mode !== 0) { const err = this._handle.fchmod(mode);
if (err) { this._handle.close(); this._handle = null;
throw errnoException(err, "uv_pipe_chmod"); } }
return this; }
if (!("port" in options || "path" in options)) { throw new ERR_INVALID_ARG_VALUE( "options", options, 'must have the property "port" or "path"', ); }
throw new ERR_INVALID_ARG_VALUE("options", options); }
/** * Stops the server from accepting new connections and keeps existing * connections. This function is asynchronous, the server is finally closed * when all connections are ended and the server emits a `"close"` event. * The optional `callback` will be called once the `"close"` event occurs. Unlike * that event, it will be called with an `Error` as its only argument if the server * was not open when it was closed. * * @param cb Called when the server is closed. */ close(cb?: (err?: Error) => void): this { if (typeof cb === "function") { if (!this._handle) { this.once("close", function close() { cb(new ERR_SERVER_NOT_RUNNING()); }); } else { this.once("close", cb); } }
if (this._handle) { (this._handle as TCP).close(); this._handle = null; }
if (this._usingWorkers) { let left = this._workers.length; const onWorkerClose = () => { if (--left !== 0) { return; }
this._connections = 0; this._emitCloseIfDrained(); };
// Increment connections to be sure that, even if all sockets will be closed // during polling of workers, `close` event will be emitted only once. this._connections++;
// Poll workers for (let n = 0; n < this._workers.length; n++) { this._workers[n].close(onWorkerClose); } } else { this._emitCloseIfDrained(); }
return this; }
/** * Returns the bound `address`, the address `family` name, and `port` of the server * as reported by the operating system if listening on an IP socket * (useful to find which port was assigned when getting an OS-assigned address):`{ port: 12346, family: "IPv4", address: "127.0.0.1" }`. * * For a server listening on a pipe or Unix domain socket, the name is returned * as a string. * * ```ts * import { createRequire } from "https://deno.land/std@$STD_VERSION/node/module.ts"; * import { Socket } from "https://deno.land/std@$STD_VERSION/node/net.ts"; * * const require = createRequire(import.meta.url); * const net = require("net"); * * const server = net.createServer((socket: Socket) => { * socket.end("goodbye\n"); * }).on("error", (err: Error) => { * // Handle errors here. * throw err; * }); * * // Grab an arbitrary unused port. * server.listen(() => { * console.log("opened server on", server.address()); * }); * ``` * * `server.address()` returns `null` before the `"listening"` event has been * emitted or after calling `server.close()`. */ address(): AddressInfo | string | null { if (this._handle && this._handle.getsockname) { const out = {}; const err = this._handle.getsockname(out);
if (err) { throw errnoException(err, "address"); }
return out as AddressInfo; } else if (this._pipeName) { return this._pipeName; }
return null; }
/** * Asynchronously get the number of concurrent connections on the server. Works * when sockets were sent to forks. * * Callback should take two arguments `err` and `count`. */ getConnections(cb: (err: Error | null, count: number) => void): this { // deno-lint-ignore no-this-alias const server = this;
function end(err: Error | null, connections?: number) { defaultTriggerAsyncIdScope( server[asyncIdSymbol], nextTick, cb, err, connections, ); }
if (!this._usingWorkers) { end(null, this._connections);
return this; }
// Poll workers let left = this._workers.length; let total = this._connections;
function oncount(err: Error, count: number) { if (err) { left = -1;
return end(err); }
total += count;
if (--left === 0) { return end(null, total); } }
for (let n = 0; n < this._workers.length; n++) { this._workers[n].getConnections(oncount); }
return this; }
/** * Calling `unref()` on a server will allow the program to exit if this is the only * active server in the event system. If the server is already `unref`ed calling `unref()` again will have no effect. */ unref(): this { this._unref = true;
if (this._handle) { this._handle.unref(); }
return this; }
/** * Opposite of `unref()`, calling `ref()` on a previously `unref`ed server will _not_ let the program exit if it's the only server left (the default behavior). * If the server is `ref`ed calling `ref()` again will have no effect. */ ref(): this { this._unref = false;
if (this._handle) { this._handle.ref(); }
return this; }
/** * Indicates whether or not the server is listening for connections. */ get listening(): boolean { return !!this._handle; }
_listen2 = _setupListenHandle;
_emitCloseIfDrained() { debug("SERVER _emitCloseIfDrained"); if (this._handle || this._connections) { debug( `SERVER handle? ${!!this._handle} connections? ${this._connections}`, ); return; }
// We use setTimeout instead of nextTick here to avoid EADDRINUSE error // when the same port listened immediately after the 'close' event. // ref: https://github.com/denoland/deno_std/issues/2788 defaultTriggerAsyncIdScope( this[asyncIdSymbol], setTimeout, _emitCloseNT, 0, this, ); }
_setupWorker(socketList: EventEmitter) { this._usingWorkers = true; this._workers.push(socketList);
// deno-lint-ignore no-explicit-any socketList.once("exit", (socketList: any) => { const index = this._workers.indexOf(socketList); this._workers.splice(index, 1); }); }
[EventEmitter.captureRejectionSymbol]( err: Error, event: string, sock: Socket, ) { switch (event) { case "connection": { sock.destroy(err); break; } default: { this.emit("error", err); } } }}
/** * Creates a new TCP or IPC server. * * Accepts an `options` object with properties `allowHalfOpen` (default `false`) * and `pauseOnConnect` (default `false`). * * If `allowHalfOpen` is set to `false`, then the socket will * automatically end the writable side when the readable side ends. * * If `allowHalfOpen` is set to `true`, when the other end of the socket * signals the end of transmission, the server will only send back the end of * transmission when `socket.end()` is explicitly called. For example, in the * context of TCP, when a FIN packed is received, a FIN packed is sent back * only when `socket.end()` is explicitly called. Until then the connection is * half-closed (non-readable but still writable). See `"end"` event and RFC 1122 * (section 4.2.2.13) for more information. * * `pauseOnConnect` indicates whether the socket should be paused on incoming * connections. * * If `pauseOnConnect` is set to `true`, then the socket associated with each * incoming connection will be paused, and no data will be read from its * handle. This allows connections to be passed between processes without any * data being read by the original process. To begin reading data from a paused * socket, call `socket.resume()`. * * The server can be a TCP server or an IPC server, depending on what it * `listen()` to. * * Here is an example of an TCP echo server which listens for connections on * port 8124: * * ```ts * import { createRequire } from "https://deno.land/std@$STD_VERSION/node/module.ts"; * import { Socket } from "https://deno.land/std@$STD_VERSION/node/net.ts"; * * const require = createRequire(import.meta.url); * const net = require("net"); * * const server = net.createServer((c: Socket) => { * // "connection" listener. * console.log("client connected"); * c.on("end", () => { * console.log("client disconnected"); * }); * c.write("hello\r\n"); * c.pipe(c); * }); * * server.on("error", (err: Error) => { * throw err; * }); * * server.listen(8124, () => { * console.log("server bound"); * }); * ``` * * Test this by using `telnet`: * * ```console * $ telnet localhost 8124 * ``` * * @param options Socket options. * @param connectionListener Automatically set as a listener for the `"connection"` event. * @return A `net.Server`. */export function createServer( options?: ServerOptions, connectionListener?: ConnectionListener,): Server { return new Server(options, connectionListener);}
export { isIP, isIPv4, isIPv6 };
export default { _createServerHandle, _normalizeArgs, isIP, isIPv4, isIPv6, connect, createConnection, createServer, Server, Socket, Stream,};