import { BufReader, BufWriter } from "../io/bufio.ts";import { assert } from "../_util/assert.ts";import { Deferred, deferred, MuxAsyncIterator } from "../async/mod.ts";import { bodyReader, chunkedBodyReader, emptyReader, readRequest, writeResponse,} from "./_io.ts";export class ServerRequest { url!: string; method!: string; proto!: string; protoMinor!: number; protoMajor!: number; headers!: Headers; conn!: Deno.Conn; r!: BufReader; w!: BufWriter;
#done: Deferred<Error | undefined> = deferred(); #contentLength?: number | null = undefined; #body?: Deno.Reader = undefined; #finalized = false;
get done(): Promise<Error | undefined> { return this.#done.then((e) => e); }
get contentLength(): number | null { if (this.#contentLength === undefined) { const cl = this.headers.get("content-length"); if (cl) { this.#contentLength = parseInt(cl); if (Number.isNaN(this.#contentLength)) { this.#contentLength = null; } } else { this.#contentLength = null; } } return this.#contentLength; }
get body(): Deno.Reader { if (!this.#body) { if (this.contentLength != null) { this.#body = bodyReader(this.contentLength, this.r); } else { const transferEncoding = this.headers.get("transfer-encoding"); if (transferEncoding != null) { const parts = transferEncoding .split(",") .map((e): string => e.trim().toLowerCase()); assert( parts.includes("chunked"), 'transfer-encoding must include "chunked" if content-length is not set', ); this.#body = chunkedBodyReader(this.headers, this.r); } else { this.#body = emptyReader(); } } } return this.#body; }
async respond(r: Response) { let err: Error | undefined; try { await writeResponse(this.w, r); } catch (e) { try { this.conn.close(); } catch { } err = e; } this.#done.resolve(err); if (err) { throw err; } }
async finalize() { if (this.#finalized) return; const body = this.body; const buf = new Uint8Array(1024); while ((await body.read(buf)) !== null) { } this.#finalized = true; }}
export class Server implements AsyncIterable<ServerRequest> { #closing = false; #connections: Deno.Conn[] = [];
constructor(public listener: Deno.Listener) {}
close(): void { this.#closing = true; this.listener.close(); for (const conn of this.#connections) { try { conn.close(); } catch (e) { if (!(e instanceof Deno.errors.BadResource)) { throw e; } } } }
private async *iterateHttpRequests( conn: Deno.Conn, ): AsyncIterableIterator<ServerRequest> { const reader = new BufReader(conn); const writer = new BufWriter(conn);
while (!this.#closing) { let request: ServerRequest | null; try { request = await readRequest(conn, reader); } catch (error) { if ( error instanceof Deno.errors.InvalidData || error instanceof Deno.errors.UnexpectedEof ) { try { await writeResponse(writer, { status: 400, body: new TextEncoder().encode(`${error.message}\r\n\r\n`), }); } catch { } } break; } if (request === null) { break; }
request.w = writer; yield request;
const responseError = await request.done; if (responseError) { this.untrackConnection(request.conn); return; }
try { await request.finalize(); } catch { break; } }
this.untrackConnection(conn); try { conn.close(); } catch { } }
private trackConnection(conn: Deno.Conn): void { this.#connections.push(conn); }
private untrackConnection(conn: Deno.Conn): void { const index = this.#connections.indexOf(conn); if (index !== -1) { this.#connections.splice(index, 1); } }
private async *acceptConnAndIterateHttpRequests( mux: MuxAsyncIterator<ServerRequest>, ): AsyncIterableIterator<ServerRequest> { if (this.#closing) return; let conn: Deno.Conn; try { conn = await this.listener.accept(); } catch (error) { if ( error instanceof Deno.errors.BadResource || error instanceof Deno.errors.InvalidData || error instanceof Deno.errors.UnexpectedEof || error instanceof Deno.errors.ConnectionReset || error instanceof Deno.errors.NotConnected ) { return mux.add(this.acceptConnAndIterateHttpRequests(mux)); } throw error; } this.trackConnection(conn); mux.add(this.acceptConnAndIterateHttpRequests(mux)); yield* this.iterateHttpRequests(conn); }
[Symbol.asyncIterator](): AsyncIterableIterator<ServerRequest> { const mux: MuxAsyncIterator<ServerRequest> = new MuxAsyncIterator(); mux.add(this.acceptConnAndIterateHttpRequests(mux)); return mux.iterate(); }}
export type HTTPOptions = Omit<Deno.ListenOptions, "transport">;
export function _parseAddrFromStr(addr: string): HTTPOptions { let url: URL; try { const host = addr.startsWith(":") ? `0.0.0.0${addr}` : addr; url = new URL(`http://${host}`); } catch { throw new TypeError("Invalid address."); } if ( url.username || url.password || url.pathname != "/" || url.search || url.hash ) { throw new TypeError("Invalid address."); }
return { hostname: url.hostname, port: url.port === "" ? 80 : Number(url.port), };}
export function serve(addr: string | HTTPOptions): Server { if (typeof addr === "string") { addr = _parseAddrFromStr(addr); }
const listener = Deno.listen(addr); return new Server(listener);}
export async function listenAndServe( addr: string | HTTPOptions, handler: (req: ServerRequest) => void,) { const server = serve(addr);
for await (const request of server) { handler(request); }}
export type HTTPSOptions = Omit<Deno.ListenTlsOptions, "transport">;
export function serveTLS(options: HTTPSOptions): Server { const tlsOptions: Deno.ListenTlsOptions = { ...options, transport: "tcp", }; const listener = Deno.listenTls(tlsOptions); return new Server(listener);}
export async function listenAndServeTLS( options: HTTPSOptions, handler: (req: ServerRequest) => void,) { const server = serveTLS(options);
for await (const request of server) { handler(request); }}
export interface Response { status?: number; statusText?: string; headers?: Headers; body?: Uint8Array | Deno.Reader | string; trailers?: () => Promise<Headers> | Headers;}