import Conn = Deno.Conn;import Reader = Deno.Reader;import { BufReader, BufWriter,} from "./vendor/https/deno.land/std/io/bufio.ts";import { promiseInterrupter } from "./promises.ts";import { deferred } from "./vendor/https/deno.land/std/util/async.ts";import { initServeOptions, readRequest, writeResponse } from "./serveio.ts";import { createResponder, ServerResponder } from "./responder.ts";import ListenOptions = Deno.ListenOptions;import Listener = Deno.Listener;import { BodyReader } from "./readers.ts";import ListenTLSOptions = Deno.ListenTLSOptions;import { promiseWaitQueue } from "./util.ts";
export type HttpBody = string | Uint8Array | Reader | ReadableStream< Uint8Array>;export type ClientRequest = { url: string; method: string; headers?: Headers; body?: HttpBody; trailers?: () => Promise<Headers> | Headers;};
export type ServerResponse = { status: number; headers?: Headers; body?: HttpBody | null; trailers?: () => Promise<Headers> | Headers;};
export type IncomingHttpRequest = { url: string; path: string; query: URLSearchParams; method: string; proto: string; headers: Headers; body?: BodyReader; cookies: Map<string, string>; trailers?: Headers; keepAlive?: KeepAlive; finalize: () => Promise<void>;};
export type KeepAlive = { timeout: number; max: number;};
export type ServerRequest = IncomingHttpRequest & { conn: Conn; bufWriter: BufWriter; bufReader: BufReader;} & ServerResponder;
export type IncomingHttpResponse = { proto: string; status: number; statusText: string; headers: Headers; body: BodyReader; trailers?: Headers; finalize: () => Promise<void>;};
export type ClientResponse = IncomingHttpResponse & { conn: Conn; bufWriter: BufWriter; bufReader: BufReader;};
export type ServeOptions = { cancel?: Promise<void>; keepAliveTimeout?: number; readTimeout?: number;};
export type ServeListener = Deno.Closer;export type ServeHandler = (req: ServerRequest) => void | Promise<void>;
export type HostPort = { hostname?: string; port: number };function createListener(opts: HostPort): Listener { return Deno.listen({ ...opts, transport: "tcp" });}
export function listenAndServeTLS( listenOptions: ListenTLSOptions, handler: ServeHandler, opts?: ServeOptions,): ServeListener { const listener = Deno.listenTLS(listenOptions); return listenInternal(listener, handler, opts);}
export function listenAndServe( listenOptions: ListenOptions, handler: ServeHandler, opts: ServeOptions = {},): ServeListener { opts = initServeOptions(opts); const listener = createListener(listenOptions); return listenInternal(listener, handler, opts);}
function listenInternal( listener: Listener, handler: ServeHandler, opts: ServeOptions = {},): ServeListener { let cancel: Promise<void>; let d = deferred<void>(); if (opts.cancel) { cancel = Promise.race([opts.cancel, d]); } else { cancel = d; } const throwIfCancelled = promiseInterrupter({ cancel, }); let closed = false; const close = () => { if (!closed) { d.resolve(); listener.close(); closed = true; } }; const acceptRoutine = () => { if (closed) return; throwIfCancelled(listener.accept()) .then((conn) => { handleKeepAliveConn(conn, handler, opts); acceptRoutine(); }) .catch(close); }; acceptRoutine(); return { close };}
export function handleKeepAliveConn( conn: Conn, handler: ServeHandler, opts: ServeOptions = {},): void { const bufReader = new BufReader(conn); const bufWriter = new BufWriter(conn); const originalOpts = opts; const q = promiseWaitQueue<ServerResponse, void>((resp) => writeResponse(bufWriter, resp) );
scheduleReadRequest({ keepAliveTimeout: opts.readTimeout, readTimeout: opts.readTimeout, cancel: opts.cancel, });
function scheduleReadRequest(opts: ServeOptions) { processRequest(opts) .then((v) => { if (v) scheduleReadRequest(v); }) .catch(() => { conn.close(); }); }
async function processRequest( opts: ServeOptions, ): Promise<ServeOptions | undefined> { const baseReq = await readRequest(bufReader, opts); let responded: Promise<void> = Promise.resolve(); const onResponse = (resp: ServerResponse) => { responded = q.enqueue(resp); return responded; }; const responder = createResponder(bufWriter, onResponse); const req: ServerRequest = { ...baseReq, bufWriter, bufReader, conn, ...responder, }; await handler(req); await responded; await req.finalize(); if (req.respondedStatus() === 101) { return; } let keepAliveTimeout = originalOpts.keepAliveTimeout; if (req.keepAlive && req.keepAlive.max <= 0) { throw Deno.EOF; } if (req.headers.get("connection") === "close") { throw Deno.EOF; } if (req.keepAlive) { keepAliveTimeout = Math.min( keepAliveTimeout!, req.keepAlive.timeout * 1000, ); } return { keepAliveTimeout, readTimeout: opts.readTimeout, cancel: opts.cancel, }; }}