Module

std/ws/mod.ts

Deno standard library
Go to Latest
File
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.import { hasOwnProperty } from "../_util/has_own_property.ts";import { BufReader, BufWriter } from "../io/bufio.ts";import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts";import { Sha1 } from "../hash/sha1.ts";import { writeResponse } from "../http/_io.ts";import { TextProtoReader } from "../textproto/mod.ts";import { Deferred, deferred } from "../async/deferred.ts";import { assert } from "../_util/assert.ts";import { concat } from "../bytes/mod.ts";
export enum OpCode { Continue = 0x0, TextFrame = 0x1, BinaryFrame = 0x2, Close = 0x8, Ping = 0x9, Pong = 0xa,}
export type WebSocketEvent = | string | Uint8Array | WebSocketCloseEvent // Received after closing connection finished. | WebSocketPingEvent // Received after pong frame responded. | WebSocketPongEvent;
export interface WebSocketCloseEvent { code: number; reason?: string;}
/** Returns true if input value is a WebSocketCloseEvent, false otherwise. */export function isWebSocketCloseEvent( a: WebSocketEvent,): a is WebSocketCloseEvent { return hasOwnProperty(a, "code");}
export type WebSocketPingEvent = ["ping", Uint8Array];
/** Returns true if input value is a WebSocketPingEvent, false otherwise. */export function isWebSocketPingEvent( a: WebSocketEvent,): a is WebSocketPingEvent { return Array.isArray(a) && a[0] === "ping" && a[1] instanceof Uint8Array;}
export type WebSocketPongEvent = ["pong", Uint8Array];
/** Returns true if input value is a WebSocketPongEvent, false otherwise. */export function isWebSocketPongEvent( a: WebSocketEvent,): a is WebSocketPongEvent { return Array.isArray(a) && a[0] === "pong" && a[1] instanceof Uint8Array;}
export type WebSocketMessage = string | Uint8Array;
export interface WebSocketFrame { isLastFrame: boolean; opcode: OpCode; mask?: Uint8Array; payload: Uint8Array;}
export interface WebSocket extends AsyncIterable<WebSocketEvent> { readonly conn: Deno.Conn; readonly isClosed: boolean;
[Symbol.asyncIterator](): AsyncIterableIterator<WebSocketEvent>;
/** * @throws `Deno.errors.ConnectionReset` */ send(data: WebSocketMessage): Promise<void>;
/** * @param data * @throws `Deno.errors.ConnectionReset` */ ping(data?: WebSocketMessage): Promise<void>;
/** Close connection after sending close frame to peer. * This is canonical way of disconnection but it may hang because of peer's response delay. * Default close code is 1000 (Normal Closure) * @throws `Deno.errors.ConnectionReset` */ close(): Promise<void>; close(code: number): Promise<void>; close(code: number, reason: string): Promise<void>;
/** Close connection forcely without sending close frame to peer. * This is basically undesirable way of disconnection. Use carefully. */ closeForce(): void;}
/** Unmask masked websocket payload */export function unmask(payload: Uint8Array, mask?: Uint8Array): void { if (mask) { for (let i = 0, len = payload.length; i < len; i++) { payload[i] ^= mask[i & 3]; } }}
/** Write WebSocket frame to inputted writer. */export async function writeFrame( frame: WebSocketFrame, writer: Deno.Writer,) { const payloadLength = frame.payload.byteLength; let header: Uint8Array; const hasMask = frame.mask ? 0x80 : 0; if (frame.mask && frame.mask.byteLength !== 4) { throw new Error( "invalid mask. mask must be 4 bytes: length=" + frame.mask.byteLength, ); } if (payloadLength < 126) { header = new Uint8Array([0x80 | frame.opcode, hasMask | payloadLength]); } else if (payloadLength < 0xffff) { header = new Uint8Array([ 0x80 | frame.opcode, hasMask | 0b01111110, payloadLength >>> 8, payloadLength & 0x00ff, ]); } else { header = new Uint8Array([ 0x80 | frame.opcode, hasMask | 0b01111111, ...sliceLongToBytes(payloadLength), ]); } if (frame.mask) { header = concat(header, frame.mask); } unmask(frame.payload, frame.mask); header = concat(header, frame.payload); const w = BufWriter.create(writer); await w.write(header); await w.flush();}
/** Read websocket frame from given BufReader * @throws `Deno.errors.UnexpectedEof` When peer closed connection without close frame * @throws `Error` Frame is invalid */export async function readFrame(buf: BufReader): Promise<WebSocketFrame> { let b = await buf.readByte(); assert(b !== null); let isLastFrame = false; switch (b >>> 4) { case 0b1000: isLastFrame = true; break; case 0b0000: isLastFrame = false; break; default: throw new Error("invalid signature"); } const opcode = b & 0x0f; // has_mask & payload b = await buf.readByte(); assert(b !== null); const hasMask = b >>> 7; let payloadLength = b & 0b01111111; if (payloadLength === 126) { const l = await readShort(buf); assert(l !== null); payloadLength = l; } else if (payloadLength === 127) { const l = await readLong(buf); assert(l !== null); payloadLength = Number(l); } // mask let mask: Uint8Array | undefined; if (hasMask) { mask = new Uint8Array(4); assert((await buf.readFull(mask)) !== null); } // payload const payload = new Uint8Array(payloadLength); assert((await buf.readFull(payload)) !== null); return { isLastFrame, opcode, mask, payload, };}
class WebSocketImpl implements WebSocket { readonly conn: Deno.Conn; private readonly mask?: Uint8Array; private readonly bufReader: BufReader; private readonly bufWriter: BufWriter; private sendQueue: Array<{ frame: WebSocketFrame; d: Deferred<void>; }> = [];
constructor({ conn, bufReader, bufWriter, mask, }: { conn: Deno.Conn; bufReader?: BufReader; bufWriter?: BufWriter; mask?: Uint8Array; }) { this.conn = conn; this.mask = mask; this.bufReader = bufReader || new BufReader(conn); this.bufWriter = bufWriter || new BufWriter(conn); }
async *[Symbol.asyncIterator](): AsyncIterableIterator<WebSocketEvent> { const decoder = new TextDecoder(); let frames: WebSocketFrame[] = []; let payloadsLength = 0; while (!this._isClosed) { let frame: WebSocketFrame; try { frame = await readFrame(this.bufReader); } catch { this.ensureSocketClosed(); break; } unmask(frame.payload, frame.mask); switch (frame.opcode) { case OpCode.TextFrame: case OpCode.BinaryFrame: case OpCode.Continue: frames.push(frame); payloadsLength += frame.payload.length; if (frame.isLastFrame) { const concat = new Uint8Array(payloadsLength); let offs = 0; for (const frame of frames) { concat.set(frame.payload, offs); offs += frame.payload.length; } if (frames[0].opcode === OpCode.TextFrame) { // text yield decoder.decode(concat); } else { // binary yield concat; } frames = []; payloadsLength = 0; } break; case OpCode.Close: { // [0x12, 0x34] -> 0x1234 const code = (frame.payload[0] << 8) | frame.payload[1]; const reason = decoder.decode( frame.payload.subarray(2, frame.payload.length), ); await this.close(code, reason); yield { code, reason }; return; } case OpCode.Ping: await this.enqueue({ opcode: OpCode.Pong, payload: frame.payload, isLastFrame: true, }); yield ["ping", frame.payload] as WebSocketPingEvent; break; case OpCode.Pong: yield ["pong", frame.payload] as WebSocketPongEvent; break; default: } } }
private dequeue(): void { const [entry] = this.sendQueue; if (!entry) return; if (this._isClosed) return; const { d, frame } = entry; writeFrame(frame, this.bufWriter) .then(() => d.resolve()) .catch((e) => d.reject(e)) .finally(() => { this.sendQueue.shift(); this.dequeue(); }); }
private enqueue(frame: WebSocketFrame): Promise<void> { if (this._isClosed) { throw new Deno.errors.ConnectionReset("Socket has already been closed"); } const d = deferred<void>(); this.sendQueue.push({ d, frame }); if (this.sendQueue.length === 1) { this.dequeue(); } return d; }
send(data: WebSocketMessage): Promise<void> { const opcode = typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame; const payload = typeof data === "string" ? new TextEncoder().encode(data) : data; const isLastFrame = true; const frame = { isLastFrame, opcode, payload, mask: this.mask, }; return this.enqueue(frame); }
ping(data: WebSocketMessage = ""): Promise<void> { const payload = typeof data === "string" ? new TextEncoder().encode(data) : data; const frame = { isLastFrame: true, opcode: OpCode.Ping, mask: this.mask, payload, }; return this.enqueue(frame); }
private _isClosed = false; get isClosed(): boolean { return this._isClosed; }
async close(code = 1000, reason?: string) { try { const header = [code >>> 8, code & 0x00ff]; let payload: Uint8Array; if (reason) { const reasonBytes = new TextEncoder().encode(reason); payload = new Uint8Array(2 + reasonBytes.byteLength); payload.set(header); payload.set(reasonBytes, 2); } else { payload = new Uint8Array(header); } await this.enqueue({ isLastFrame: true, opcode: OpCode.Close, mask: this.mask, payload, }); } catch (e) { throw e; } finally { this.ensureSocketClosed(); } }
closeForce(): void { this.ensureSocketClosed(); }
private ensureSocketClosed(): void { if (this.isClosed) return; try { this.conn.close(); } catch (e) { console.error(e); } finally { this._isClosed = true; const rest = this.sendQueue; this.sendQueue = []; rest.forEach((e) => e.d.reject( new Deno.errors.ConnectionReset("Socket has already been closed"), ) ); } }}
/** Returns true if input headers are usable for WebSocket, otherwise false. */export function acceptable(req: { headers: Headers }): boolean { const upgrade = req.headers.get("upgrade"); if (!upgrade || upgrade.toLowerCase() !== "websocket") { return false; } const secKey = req.headers.get("sec-websocket-key"); return ( req.headers.has("sec-websocket-key") && typeof secKey === "string" && secKey.length > 0 );}
const kGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
/** Create value of Sec-WebSocket-Accept header from inputted nonce. */export function createSecAccept(nonce: string): string { const sha1 = new Sha1(); sha1.update(nonce + kGUID); const bytes = sha1.digest(); return btoa(String.fromCharCode(...bytes));}
/** Upgrade inputted TCP connection into WebSocket connection. */export async function acceptWebSocket(req: { conn: Deno.Conn; bufWriter: BufWriter; bufReader: BufReader; headers: Headers;}): Promise<WebSocket> { const { conn, headers, bufReader, bufWriter } = req; if (acceptable(req)) { const sock = new WebSocketImpl({ conn, bufReader, bufWriter }); const secKey = headers.get("sec-websocket-key"); if (typeof secKey !== "string") { throw new Error("sec-websocket-key is not provided"); } const secAccept = createSecAccept(secKey); const newHeaders = new Headers({ Upgrade: "websocket", Connection: "Upgrade", "Sec-WebSocket-Accept": secAccept, }); const secProtocol = headers.get("sec-websocket-protocol"); if (typeof secProtocol === "string") { newHeaders.set("Sec-WebSocket-Protocol", secProtocol); } const secVersion = headers.get("sec-websocket-version"); if (typeof secVersion === "string") { newHeaders.set("Sec-WebSocket-Version", secVersion); } await writeResponse(bufWriter, { status: 101, headers: newHeaders, }); return sock; } throw new Error("request is not acceptable");}
const kSecChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-.~_";
/** Returns base64 encoded 16 bytes string for Sec-WebSocket-Key header. */export function createSecKey(): string { let key = ""; for (let i = 0; i < 16; i++) { const j = Math.floor(Math.random() * kSecChars.length); key += kSecChars[j]; } return btoa(key);}
export async function handshake( url: URL, headers: Headers, bufReader: BufReader, bufWriter: BufWriter,) { const { hostname, pathname, search } = url; const key = createSecKey();
if (!headers.has("host")) { headers.set("host", hostname); } headers.set("upgrade", "websocket"); headers.set("connection", "upgrade"); headers.set("sec-websocket-key", key); headers.set("sec-websocket-version", "13");
let headerStr = `GET ${pathname}${search} HTTP/1.1\r\n`; for (const [key, value] of headers) { headerStr += `${key}: ${value}\r\n`; } headerStr += "\r\n";
await bufWriter.write(new TextEncoder().encode(headerStr)); await bufWriter.flush();
const tpReader = new TextProtoReader(bufReader); const statusLine = await tpReader.readLine(); if (statusLine === null) { throw new Deno.errors.UnexpectedEof(); } const m = statusLine.match(/^(?<version>\S+) (?<statusCode>\S+) /); if (!m) { throw new Error("ws: invalid status line: " + statusLine); }
assert(m.groups); const { version, statusCode } = m.groups; if (version !== "HTTP/1.1" || statusCode !== "101") { throw new Error( `ws: server didn't accept handshake: ` + `version=${version}, statusCode=${statusCode}`, ); }
const responseHeaders = await tpReader.readMIMEHeader(); if (responseHeaders === null) { throw new Deno.errors.UnexpectedEof(); }
const expectedSecAccept = createSecAccept(key); const secAccept = responseHeaders.get("sec-websocket-accept"); if (secAccept !== expectedSecAccept) { throw new Error( `ws: unexpected sec-websocket-accept header: ` + `expected=${expectedSecAccept}, actual=${secAccept}`, ); }}
export function createWebSocket(params: { conn: Deno.Conn; bufWriter?: BufWriter; bufReader?: BufReader; mask?: Uint8Array;}): WebSocket { return new WebSocketImpl(params);}