import { Reservator } from "https://deno.land/x/reservator@v0.1.0/mod.ts";import { Channel, channel,} from "https://deno.land/x/streamtools@v0.5.0/mod.ts";import { DecodeStream, EncodeStream } from "./json_streams.ts";import { Command } from "./command.ts";import { isMessage, Message } from "./message.ts";
const shutdown = Symbol("shutdown");
export class Session { #outer: Channel<Uint8Array>; #inner: Channel<Command | Message>; #running?: { innerWriter: WritableStreamDefaultWriter<Command | Message>; reservator: Reservator<number, Message>; producerController: AbortController; consumerController: AbortController; waiter: Promise<void>; };
onInvalidMessage?: (message: unknown) => void;
onMessage?: (message: Message) => void;
constructor( reader: ReadableStream<Uint8Array>, writer: WritableStream<Uint8Array>, ) { this.#outer = { reader, writer }; this.#inner = channel(); }
send(data: Command | Message): Promise<void> { if (!this.#running) { throw new Error("Session is not running"); } const { innerWriter } = this.#running; return innerWriter.write(data); }
recv(msgid: number): Promise<Message> { if (!this.#running) { throw new Error("Session is not running"); } const { reservator } = this.#running; return reservator.reserve(msgid); }
start(): void { if (this.#running) { throw new Error("Session is already running"); } const reservator = new Reservator<number, Message>(); const innerWriter = this.#inner.writer.getWriter(); const consumerController = new AbortController(); const producerController = new AbortController();
const ignoreShutdownError = (err: unknown) => { if (err === shutdown) { return; } return Promise.reject(err); };
const consumer = this.#outer.reader .pipeThrough(new DecodeStream()) .pipeTo( new WritableStream({ write: (m) => this.#handleMessage(m) }), { signal: consumerController.signal }, ) .catch(ignoreShutdownError) .finally(async () => { await innerWriter.ready; await innerWriter.close(); });
const producer = this.#inner.reader .pipeThrough(new EncodeStream<Command>()) .pipeTo(this.#outer.writer, { signal: producerController.signal }) .catch(ignoreShutdownError);
const waiter = Promise.all([consumer, producer]) .then(() => {}).finally(() => { innerWriter.releaseLock(); this.#running = undefined; });
this.#running = { reservator, innerWriter, consumerController, producerController, waiter, }; }
wait(): Promise<void> { if (!this.#running) { throw new Error("Session is not running"); } const { waiter } = this.#running; return waiter; }
shutdown(): Promise<void> { if (!this.#running) { throw new Error("Session is not running"); } const { consumerController, waiter } = this.#running; consumerController.abort(shutdown); return waiter; }
forceShutdown(): Promise<void> { if (!this.#running) { throw new Error("Session is not running"); } const { consumerController, producerController, waiter } = this.#running; producerController.abort(shutdown); consumerController.abort(shutdown); return waiter; }
#handleMessage(message: unknown): void { if (!isMessage(message)) { this.onInvalidMessage?.call(this, message); return; } const [msgid, _] = message; if (msgid < 0) { const { reservator } = this.#running!; reservator.resolve(msgid, message); } else { this.onMessage?.call(this, message); } }}