Skip to main content
Module

x/postgresjs/src/connection.js

Postgres.js - The Fastest full featured PostgreSQL client for Node.js and Deno
Very Popular
Go to Latest
File
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014
import { HmacSha256 } from 'https://deno.land/std@0.132.0/hash/sha256.ts'import { Buffer } from 'https://deno.land/std@0.132.0/node/buffer.ts'import process from 'https://deno.land/std@0.132.0/node/process.ts'import { setImmediate, clearImmediate } from '../polyfills.js'import { net } from '../polyfills.js'import { tls } from '../polyfills.js'import crypto from 'https://deno.land/std@0.132.0/node/crypto.ts'import Stream from 'https://deno.land/std@0.132.0/node/stream.ts'
import { stringify, handleValue, arrayParser, arraySerializer } from './types.js'import { Errors } from './errors.js'import Result from './result.js'import Queue from './queue.js'import { Query, CLOSE } from './query.js'import b from './bytes.js'
export default Connection
let uid = 1
const Sync = b().S().end() , Flush = b().H().end() , SSLRequest = b().i32(8).i32(80877103).end(8) , ExecuteUnnamed = Buffer.concat([b().E().str(b.N).i32(0).end(), Sync]) , DescribeUnnamed = b().D().str('S').str(b.N).end() , noop = () => { /* noop */ }
const retryRoutines = new Set([ 'FetchPreparedStatement', 'RevalidateCachedQuery', 'transformAssignedExpr'])
const errorFields = { 83 : 'severity_local', // S 86 : 'severity', // V 67 : 'code', // C 77 : 'message', // M 68 : 'detail', // D 72 : 'hint', // H 80 : 'position', // P 112 : 'internal_position', // p 113 : 'internal_query', // q 87 : 'where', // W 115 : 'schema_name', // s 116 : 'table_name', // t 99 : 'column_name', // c 100 : 'data type_name', // d 110 : 'constraint_name', // n 70 : 'file', // F 76 : 'line', // L 82 : 'routine' // R}
function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose = noop } = {}) { const { ssl, max, user, host, port, database, parsers, transform, onnotice, onnotify, onparameter, max_pipeline, keep_alive, backoff, target_session_attrs } = options
const sent = Queue() , id = uid++ , backend = { pid: null, secret: null } , idleTimer = timer(end, options.idle_timeout) , lifeTimer = timer(end, options.max_lifetime) , connectTimer = timer(connectTimedOut, options.connect_timeout)
let socket = null , cancelMessage , result = new Result() , incoming = Buffer.alloc(0) , needsTypes = options.fetch_types , backendParameters = {} , statements = {} , statementId = Math.random().toString(36).slice(2) , statementCount = 1 , closedDate = 0 , remaining = 0 , hostIndex = 0 , retries = 0 , length = 0 , delay = 0 , rows = 0 , serverSignature = null , nextWriteTimer = null , terminated = false , incomings = null , results = null , initial = null , ending = null , stream = null , chunk = null , ended = null , nonce = null , query = null , final = null
const connection = { queue: queues.closed, idleTimer, connect(query) { initial = query reconnect() }, terminate, execute, cancel, end, count: 0, id }
queues.closed && queues.closed.push(connection)
return connection
async function createSocket() { let x try { x = options.socket ? (await Promise.resolve(options.socket(options))) : net.Socket() } catch (e) { error(e) return } x.on('error', error) x.on('close', closed) x.on('drain', drain) return x }
async function cancel({ pid, secret }, resolve, reject) { try { cancelMessage = b().i32(16).i32(80877102).i32(pid).i32(secret).end(16) await connect() socket.once('error', reject) socket.once('close', resolve) } catch (error) { reject(error) } }
function execute(q) { if (terminated) return queryError(q, Errors.connection('CONNECTION_DESTROYED', options))
if (q.cancelled) return
try { q.state = backend query ? sent.push(q) : (query = q, query.active = true)
build(q) return write(toBuffer(q)) && !q.describeFirst && sent.length < max_pipeline && (!q.options.onexecute || q.options.onexecute(connection)) } catch (error) { sent.length === 0 && write(Sync) errored(error) return true } }
function toBuffer(q) { if (q.parameters.length >= 65534) throw Errors.generic('MAX_PARAMETERS_EXCEEDED', 'Max number of parameters (65534) exceeded')
return q.options.simple ? b().Q().str(q.strings[0] + b.N).end() : q.describeFirst ? Buffer.concat([describe(q), Flush]) : q.prepare ? q.prepared ? prepared(q) : Buffer.concat([describe(q), prepared(q)]) : unnamed(q) }
function describe(q) { return Buffer.concat([ Parse(q.statement.string, q.parameters, q.statement.types, q.statement.name), Describe('S', q.statement.name) ]) }
function prepared(q) { return Buffer.concat([ Bind(q.parameters, q.statement.types, q.statement.name, q.cursorName), q.cursorFn ? Execute('', q.cursorRows) : ExecuteUnnamed ]) }
function unnamed(q) { return Buffer.concat([ Parse(q.statement.string, q.parameters, q.statement.types), DescribeUnnamed, prepared(q) ]) }
function build(q) { const parameters = [] , types = []
const string = stringify(q, q.strings[0], q.args[0], parameters, types, options)
!q.tagged && q.args.forEach(x => handleValue(x, parameters, types, options))
q.prepare = options.prepare && ('prepare' in q.options ? q.options.prepare : true) q.string = string q.signature = q.prepare && types + string q.onlyDescribe && (delete statements[q.signature]) q.parameters = q.parameters || parameters q.prepared = q.prepare && q.signature in statements q.describeFirst = q.onlyDescribe || (parameters.length && !q.prepared) q.statement = q.prepared ? statements[q.signature] : { string, types, name: q.prepare ? statementId + statementCount++ : '' }
typeof options.debug === 'function' && options.debug(id, string, parameters, types) }
function write(x, fn) { chunk = chunk ? Buffer.concat([chunk, x]) : Buffer.from(x) if (fn || chunk.length >= 1024) return nextWrite(fn) nextWriteTimer === null && (nextWriteTimer = setImmediate(nextWrite)) return true }
function nextWrite(fn) { const x = socket.write(chunk, fn) nextWriteTimer !== null && clearImmediate(nextWriteTimer) chunk = nextWriteTimer = null return x }
function connectTimedOut() { errored(Errors.connection('CONNECT_TIMEOUT', options, socket)) socket.destroy() }
async function secure() { write(SSLRequest) const canSSL = await new Promise(r => socket.once('data', x => r(x[0] === 83))) // S
if (!canSSL && ssl === 'prefer') return connected()
socket.removeAllListeners() socket = tls.connect({ socket, ...(ssl === 'require' || ssl === 'allow' || ssl === 'prefer' ? { rejectUnauthorized: false } : ssl === 'verify-full' ? {} : typeof ssl === 'object' ? ssl : {} ) }) socket.on('secureConnect', connected) socket.on('error', error) socket.on('close', closed) socket.on('drain', drain) }
/* c8 ignore next 3 */ function drain() { !query && onopen(connection) }
function data(x) { if (incomings) { incomings.push(x) remaining -= x.length if (remaining >= 0) return }
incoming = incomings ? Buffer.concat(incomings, length - remaining) : incoming.length === 0 ? x : Buffer.concat([incoming, x], incoming.length + x.length)
while (incoming.length > 4) { length = incoming.readUInt32BE(1) if (length >= incoming.length) { remaining = length - incoming.length incomings = [incoming] break }
try { handle(incoming.slice(0, length + 1)) } catch (e) { query && (query.cursorFn || query.describeFirst) && write(Sync) errored(e) } incoming = incoming.slice(length + 1) remaining = 0 incomings = null } }
async function connect() { terminated = false backendParameters = {} socket || (socket = await createSocket())
if (!socket) return
connectTimer.start()
if (options.socket) return ssl ? secure() : connected()
socket.on('connect', ssl ? secure : connected)
if (options.path) return socket.connect(options.path)
socket.connect(port[hostIndex], host[hostIndex]) hostIndex = (hostIndex + 1) % port.length }
function reconnect() { setTimeout(connect, closedDate ? closedDate + delay - Number(process.hrtime.bigint() / 1000000n) : 0) }
function connected() { try { statements = {} needsTypes = options.fetch_types statementId = Math.random().toString(36).slice(2) statementCount = 1 lifeTimer.start() socket.on('data', data) keep_alive && socket.setKeepAlive(true) const s = StartupMessage() write(s) } catch (err) { error(err) } }
function error(err) { if (connection.queue === queues.connecting && options.host[retries + 1]) return
errored(err) while (sent.length) queryError(sent.shift(), err) }
function errored(err) { stream && (stream.destroy(err), stream = null) query && queryError(query, err) initial && (queryError(initial, err), initial = null) }
function queryError(query, err) { query.reject(Object.create(err, { stack: { value: err.stack + query.origin.replace(/.*\n/, '\n'), enumerable: options.debug }, query: { value: query.string, enumerable: options.debug }, parameters: { value: query.parameters, enumerable: options.debug }, args: { value: query.args, enumerable: options.debug }, types: { value: query.statement && query.statement.types, enumerable: options.debug } })) }
function end() { return ending || ( !connection.reserved && onend(connection), !connection.reserved && !initial && !query && sent.length === 0 ? (terminate(), new Promise(r => socket && socket.readyState !== 'closed' ? socket.once('close', r) : r())) : ending = new Promise(r => ended = r) ) }
function terminate() { terminated = true if (stream || query || initial || sent.length) error(Errors.connection('CONNECTION_DESTROYED', options))
clearImmediate(nextWriteTimer) if (socket) { socket.removeListener('data', data) socket.removeListener('connect', connected) socket.readyState === 'open' && socket.end(b().X().end()) } ended && (ended(), ending = ended = null) }
async function closed(hadError) { incoming = Buffer.alloc(0) remaining = 0 incomings = null clearImmediate(nextWriteTimer) socket.removeListener('data', data) socket.removeListener('connect', connected) idleTimer.cancel() lifeTimer.cancel() connectTimer.cancel()
if (socket.encrypted) { socket.removeAllListeners() socket = null }
if (initial) return reconnect()
!hadError && (query || sent.length) && error(Errors.connection('CONNECTION_CLOSED', options, socket)) closedDate = Number(process.hrtime.bigint() / 1000000n) hadError && options.shared.retries++ delay = (typeof backoff === 'function' ? backoff(options.shared.retries) : backoff) * 1000 onclose(connection) }
/* Handlers */ function handle(xs, x = xs[0]) { ( x === 68 ? DataRow : // D x === 100 ? CopyData : // d x === 65 ? NotificationResponse : // A x === 83 ? ParameterStatus : // S x === 90 ? ReadyForQuery : // Z x === 67 ? CommandComplete : // C x === 50 ? BindComplete : // 2 x === 49 ? ParseComplete : // 1 x === 116 ? ParameterDescription : // t x === 84 ? RowDescription : // T x === 82 ? Authentication : // R x === 110 ? NoData : // n x === 75 ? BackendKeyData : // K x === 69 ? ErrorResponse : // E x === 115 ? PortalSuspended : // s x === 51 ? CloseComplete : // 3 x === 71 ? CopyInResponse : // G x === 78 ? NoticeResponse : // N x === 72 ? CopyOutResponse : // H x === 99 ? CopyDone : // c x === 73 ? EmptyQueryResponse : // I x === 86 ? FunctionCallResponse : // V x === 118 ? NegotiateProtocolVersion : // v x === 87 ? CopyBothResponse : // W /* c8 ignore next */ UnknownMessage )(xs) }
function DataRow(x) { let index = 7 let length let column let value
const row = query.isRaw ? new Array(query.statement.columns.length) : {} for (let i = 0; i < query.statement.columns.length; i++) { column = query.statement.columns[i] length = x.readInt32BE(index) index += 4
value = length === -1 ? null : query.isRaw === true ? x.slice(index, index += length) : column.parser === undefined ? x.toString('utf8', index, index += length) : column.parser.array === true ? column.parser(x.toString('utf8', index + 1, index += length)) : column.parser(x.toString('utf8', index, index += length))
query.isRaw ? (row[i] = query.isRaw === true ? value : transform.value.from ? transform.value.from(value) : value) : (row[column.name] = transform.value.from ? transform.value.from(value) : value) }
query.forEachFn ? query.forEachFn(transform.row.from ? transform.row.from(row) : row, result) : (result[rows++] = transform.row.from ? transform.row.from(row) : row) }
function ParameterStatus(x) { const [k, v] = x.toString('utf8', 5, x.length - 1).split(b.N) backendParameters[k] = v if (options.parameters[k] !== v) { options.parameters[k] = v onparameter && onparameter(k, v) } }
function ReadyForQuery(x) { query && query.options.simple && query.resolve(results || result) query = results = null result = new Result() connectTimer.cancel()
if (initial) { if (target_session_attrs) { if (!backendParameters.in_hot_standby || !backendParameters.default_transaction_read_only) return fetchState() else if (tryNext(target_session_attrs, backendParameters)) return terminate() }
if (needsTypes) return fetchArrayTypes()
execute(initial) options.shared.retries = retries = initial = 0 return }
while (sent.length && (query = sent.shift()) && (query.active = true, query.cancelled)) Connection(options).cancel(query.state, query.cancelled.resolve, query.cancelled.reject)
if (query) return // Consider opening if able and sent.length < 50
connection.reserved ? x[5] === 73 // I ? ending ? terminate() : (connection.reserved = null, onopen(connection)) : connection.reserved() : ending ? terminate() : onopen(connection) }
function CommandComplete(x) { rows = 0
for (let i = x.length - 1; i > 0; i--) { if (x[i] === 32 && x[i + 1] < 58 && result.count === null) result.count = +x.toString('utf8', i + 1, x.length - 1) if (x[i - 1] >= 65) { result.command = x.toString('utf8', 5, i) result.state = backend break } }
final && (final(), final = null)
if (result.command === 'BEGIN' && max !== 1 && !connection.reserved) return errored(Errors.generic('UNSAFE_TRANSACTION', 'Only use sql.begin or max: 1'))
if (query.options.simple) return BindComplete()
if (query.cursorFn) { result.count && query.cursorFn(result) write(Sync) }
query.resolve(result) }
function ParseComplete() { query.parsing = false }
function BindComplete() { !result.statement && (result.statement = query.statement) result.columns = query.statement.columns }
function ParameterDescription(x) { const length = x.readUInt16BE(5)
for (let i = 0; i < length; ++i) !query.statement.types[i] && (query.statement.types[i] = x.readUInt32BE(7 + i * 4))
query.prepare && (statements[query.signature] = query.statement) query.describeFirst && !query.onlyDescribe && (write(prepared(query)), query.describeFirst = false) }
function RowDescription(x) { if (result.command) { results = results || [result] results.push(result = new Result()) result.count = null query.statement.columns = null }
const length = x.readUInt16BE(5) let index = 7 let start
query.statement.columns = Array(length)
for (let i = 0; i < length; ++i) { start = index while (x[index++] !== 0); const type = x.readUInt32BE(index + 6) query.statement.columns[i] = { name: transform.column.from ? transform.column.from(x.toString('utf8', start, index - 1)) : x.toString('utf8', start, index - 1), parser: parsers[type], type } index += 18 }
result.statement = query.statement if (query.onlyDescribe) return (query.resolve(query.statement), write(Sync)) }
async function Authentication(x, type = x.readUInt32BE(5)) { ( type === 3 ? AuthenticationCleartextPassword : type === 5 ? AuthenticationMD5Password : type === 10 ? SASL : type === 11 ? SASLContinue : type === 12 ? SASLFinal : type !== 0 ? UnknownAuth : noop )(x, type) }
/* c8 ignore next 5 */ async function AuthenticationCleartextPassword() { write( b().p().str(await Pass()).z(1).end() ) }
async function AuthenticationMD5Password(x) { write( b().p().str('md5' + md5(Buffer.concat([Buffer.from(md5((await Pass()) + user)), x.slice(9)]))).z(1).end() ) }
function SASL() { b().p().str('SCRAM-SHA-256' + b.N) const i = b.i nonce = crypto.randomBytes(18).toString('base64') write(b.inc(4).str('n,,n=*,r=' + nonce).i32(b.i - i - 4, i).end()) }
async function SASLContinue(x) { const res = x.toString('utf8', 9).split(',').reduce((acc, x) => (acc[x[0]] = x.slice(2), acc), {})
const saltedPassword = crypto.pbkdf2Sync( await Pass(), Buffer.from(res.s, 'base64'), parseInt(res.i), 32, 'sha256' )
const clientKey = hmac(saltedPassword, 'Client Key')
const auth = 'n=*,r=' + nonce + ',' + 'r=' + res.r + ',s=' + res.s + ',i=' + res.i + ',c=biws,r=' + res.r
serverSignature = hmac(hmac(saltedPassword, 'Server Key'), auth).toString('base64')
write( b().p().str('c=biws,r=' + res.r + ',p=' + xor(clientKey, hmac(sha256(clientKey), auth)).toString('base64')).end() ) }
function SASLFinal(x) { if (x.toString('utf8', 9).split(b.N, 1)[0].slice(2) === serverSignature) return /* c8 ignore next 5 */ errored(Errors.generic('SASL_SIGNATURE_MISMATCH', 'The server did not return the correct signature')) socket.destroy() }
function Pass() { return Promise.resolve(typeof options.pass === 'function' ? options.pass() : options.pass ) }
function NoData() { result.statement = query.statement result.statement.columns = [] if (query.onlyDescribe) return (query.resolve(query.statement), write(Sync)) }
function BackendKeyData(x) { backend.pid = x.readUInt32BE(5) backend.secret = x.readUInt32BE(9) }
async function fetchArrayTypes() { needsTypes = false const types = await new Query([` select b.oid, b.typarray from pg_catalog.pg_type a left join pg_catalog.pg_type b on b.oid = a.typelem where a.typcategory = 'A' group by b.oid, b.typarray order by b.oid `], [], execute) types.forEach(({ oid, typarray }) => addArrayType(oid, typarray)) }
function addArrayType(oid, typarray) { const parser = options.parsers[oid] options.shared.typeArrayMap[oid] = typarray options.parsers[typarray] = (xs) => arrayParser(xs, parser) options.parsers[typarray].array = true options.serializers[typarray] = (xs) => arraySerializer(xs, options.serializers[oid], options) }
function tryNext(x, xs) { return ( (x === 'read-write' && xs.default_transaction_read_only === 'on') || (x === 'read-only' && xs.default_transaction_read_only === 'off') || (x === 'primary' && xs.in_hot_standby === 'off') || (x === 'standby' && xs.in_hot_standby === 'on') || (x === 'prefer-standby' && xs.in_hot_standby === 'off' && options.host[retries]) ) }
function fetchState() { const query = new Query([` show transaction_read_only; select pg_catalog.pg_is_in_recovery() `], [], execute, null, { simple: true }) query.resolve = ([[a], [b]]) => { backendParameters.default_transaction_read_only = a.transaction_read_only backendParameters.in_hot_standby = b.pg_is_in_recovery ? 'on' : 'off' } query.execute() }
function ErrorResponse(x) { query && (query.cursorFn || query.describeFirst) && write(Sync) const error = Errors.postgres(parseError(x)) query && query.retried ? errored(query.retried) : query && retryRoutines.has(error.routine) ? retry(query, error) : errored(error) }
function retry(q, error) { delete statements[q.signature] q.retried = error execute(q) }
function NotificationResponse(x) { if (!onnotify) return
let index = 9 while (x[index++] !== 0); onnotify( x.toString('utf8', 9, index - 1), x.toString('utf8', index, x.length - 1) ) }
async function PortalSuspended() { try { const x = await Promise.resolve(query.cursorFn(result)) rows = 0 x === CLOSE ? write(Close(query.portal)) : (result = new Result(), write(Execute('', query.cursorRows))) } catch (err) { write(Sync) query.reject(err) } }
function CloseComplete() { result.count && query.cursorFn(result) query.resolve(result) }
function CopyInResponse() { stream = new Stream.Writable({ autoDestroy: true, write(chunk, encoding, callback) { socket.write(b().d().raw(chunk).end(), callback) }, destroy(error, callback) { callback(error) socket.write(b().f().str(error + b.N).end()) stream = null }, final(callback) { socket.write(b().c().end()) final = callback } }) query.resolve(stream) }
function CopyOutResponse() { stream = new Stream.Readable({ read() { socket.resume() } }) query.resolve(stream) }
/* c8 ignore next 3 */ function CopyBothResponse() { stream = new Stream.Duplex({ autoDestroy: true, read() { socket.resume() }, /* c8 ignore next 11 */ write(chunk, encoding, callback) { socket.write(b().d().raw(chunk).end(), callback) }, destroy(error, callback) { callback(error) socket.write(b().f().str(error + b.N).end()) stream = null }, final(callback) { socket.write(b().c().end()) final = callback } }) query.resolve(stream) }
function CopyData(x) { stream.push(x.slice(5)) || socket.pause() }
function CopyDone() { stream.push(null) stream = null }
function NoticeResponse(x) { onnotice ? onnotice(parseError(x)) : console.log(parseError(x)) // eslint-disable-line
}
/* c8 ignore next 3 */ function EmptyQueryResponse() { /* noop */ }
/* c8 ignore next 3 */ function FunctionCallResponse() { errored(Errors.notSupported('FunctionCallResponse')) }
/* c8 ignore next 3 */ function NegotiateProtocolVersion() { errored(Errors.notSupported('NegotiateProtocolVersion')) }
/* c8 ignore next 3 */ function UnknownMessage(x) { console.error('Postgres.js : Unknown Message:', x[0]) // eslint-disable-line }
/* c8 ignore next 3 */ function UnknownAuth(x, type) { console.error('Postgres.js : Unknown Auth:', type) // eslint-disable-line }
/* Messages */ function Bind(parameters, types, statement = '', portal = '') { let prev , type
b().B().str(portal + b.N).str(statement + b.N).i16(0).i16(parameters.length)
parameters.forEach((x, i) => { if (x === null) return b.i32(0xFFFFFFFF)
type = types[i] parameters[i] = x = type in options.serializers ? options.serializers[type](x) : '' + x
prev = b.i b.inc(4).str(x).i32(b.i - prev - 4, prev) })
b.i16(0)
return b.end() }
function Parse(str, parameters, types, name = '') { b().P().str(name + b.N).str(str + b.N).i16(parameters.length) parameters.forEach((x, i) => b.i32(types[i] || 0)) return b.end() }
function Describe(x, name = '') { return b().D().str(x).str(name + b.N).end() }
function Execute(portal = '', rows = 0) { return Buffer.concat([ b().E().str(portal + b.N).i32(rows).end(), Flush ]) }
function Close(portal = '') { return Buffer.concat([ b().C().str('P').str(portal + b.N).end(), b().S().end() ]) }
function StartupMessage() { return cancelMessage || b().inc(4).i16(3).z(2).str( Object.entries(Object.assign({ user, database, client_encoding: 'UTF8' }, options.connection )).filter(([, v]) => v).map(([k, v]) => k + b.N + v).join(b.N) ).z(2).end(0) }
}
function parseError(x) { const error = {} let start = 5 for (let i = 5; i < x.length - 1; i++) { if (x[i] === 0) { error[errorFields[x[start]]] = x.toString('utf8', start + 1, i) start = i + 1 } } return error}
function md5(x) { return crypto.createHash('md5').update(x).digest('hex')}
function hmac(key, x) { return Buffer.from(new HmacSha256(key).update(x).digest())}
function sha256(x) { return crypto.createHash('sha256').update(x).digest()}
function xor(a, b) { const length = Math.max(a.length, b.length) const buffer = Buffer.allocUnsafe(length) for (let i = 0; i < length; i++) buffer[i] = a[i] ^ b[i] return buffer}
function timer(fn, seconds) { seconds = typeof seconds === 'function' ? seconds() : seconds if (!seconds) return { cancel: noop, start: noop }
let timer return { cancel() { timer && (clearTimeout(timer), timer = null) }, start() { timer && clearTimeout(timer) timer = setTimeout(done, seconds * 1000, arguments) } }
function done(args) { fn.apply(null, args) timer = null }}