Module
x/messagepack_rpc/session_test.ts
🦕 Deno module that allows for the implementation of MessagePack-RPC using MessagePack as the message schema.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477import { assert, assertEquals, AssertionError, assertRejects, assertThrows,} from "@std/assert";import { deadline, DeadlineError } from "@std/async";import { decode, encode } from "@lambdalisue/messagepack";import { type Channel, channel, collect, pop, push,} from "@lambdalisue/streamtools";import { buildNotificationMessage, buildRequestMessage, buildResponseMessage,} from "./message.ts";import { Session } from "./session.ts";
function createDummySession(): { input: Channel<Uint8Array>; output: Channel<Uint8Array>; session: Session;} { const input = channel<Uint8Array>(); const output = channel<Uint8Array>(); const session = new Session(input.reader, output.writer); return { input, output, session };}
function ensureNotNull<T>(value: T | null): T { if (value === null) { throw new AssertionError("value must not be null"); } return value;}
Deno.test("Session.send", async (t) => { await t.step( "throws an error if the session is not started", () => { const { session } = createDummySession();
const message = buildRequestMessage(1, "sum", [1, 2]); assertThrows( () => session.send(message), Error, "Session is not running", ); }, );
await t.step( "sends a message to the specified writer", async () => { const { session, output } = createDummySession();
session.start();
const message = buildRequestMessage(1, "sum", [1, 2]); session.send(message); assertEquals( decode(ensureNotNull(await pop(output.reader))), message, ); }, );});
Deno.test("Session.recv", async (t) => { await t.step( "throws an error if the session is not started", () => { const { session } = createDummySession();
assertThrows(() => session.recv(1), Error, "Session is not running"); }, );
await t.step( "waits a corresponding response message and resolves with it", async () => { const { session, input } = createDummySession();
session.start();
const message = buildResponseMessage(1, null, 3); push(input.writer, encode(message)); assertEquals( await session.recv(1), message, ); }, );});
Deno.test("Session.start", async (t) => { await t.step( "throws an error if the session is already started", () => { const { session } = createDummySession();
session.start(); assertThrows(() => session.start(), Error, "Session is already running"); }, );
await t.step( "locks specified reader and writer", () => { const { session, input, output } = createDummySession();
session.start(); assert(input.reader.locked, "reader is not locked"); assert(output.writer.locked, "writer is not locked"); }, );
await t.step( "invokes a method defined in `dispatcher` and send-back a response message when a request message is received", async () => { let called = false; const { session, input, output } = createDummySession(); session.dispatcher = { sum: (a, b) => { called = true; assertEquals(a, 1); assertEquals(b, 2); return 3; }, }; session.start();
await push(input.writer, encode(buildRequestMessage(1, "sum", [1, 2]))); await session.shutdown(); assert(called, "handler is not called"); assertEquals(await collect(output.reader), [ encode(buildResponseMessage(1, null, 3)), ]); }, );
await t.step( "invokes a method defined in `dispatcher` and send-back a response message when a request message is received (error)", async () => { let called = false; const { session, input, output } = createDummySession(); session.dispatcher = { sum: () => { called = true; throw "sum error"; }, }; session.start();
await push(input.writer, encode(buildRequestMessage(1, "sum", [1, 2]))); await session.shutdown(); assert(called, "handler is not called"); assertEquals(await collect(output.reader), [ encode( buildResponseMessage(1, "sum error", null), ), ]); }, );
await t.step( "invokes a method defined in `dispatcher` when a notification message is received", async () => { let called = false; const { session, input, output } = createDummySession(); session.dispatcher = { sum: (a, b) => { called = true; assertEquals(a, 1); assertEquals(b, 2); return 3; }, }; session.start();
await push(input.writer, encode(buildNotificationMessage("sum", [1, 2]))); await session.shutdown(); assert(called, "handler is not called"); assertEquals(await collect(output.reader), []); }, );
await t.step( "invokes a method defined in `dispatcher` when a notification message is received (error)", async () => { let called = false; const { session, input, output } = createDummySession(); session.dispatcher = { sum: () => { called = true; throw "sum error"; }, }; session.start();
await push(input.writer, encode(buildNotificationMessage("sum", [1, 2]))); await session.shutdown(); assert(called, "handler is not called"); assertEquals(await collect(output.reader), []); }, );});
Deno.test("Session.wait", async (t) => { await t.step( "throws an error if the session is not started", () => { const { session } = createDummySession();
assertThrows(() => session.wait(), Error, "Session is not running"); }, );
await t.step( "returns a promise that is resolved when the session is closed (reader is closed)", async () => { const output = channel<Uint8Array>(); const { promise, resolve } = Promise.withResolvers<void>(); const session = new Session( // Reader that is not closed until the guard is resolved new ReadableStream({ async start(controller) { await promise; controller.close(); }, }), output.writer, );
session.start();
const waiter = session.wait(); await assertRejects( () => deadline(waiter, 100), DeadlineError, ); resolve(); await deadline(waiter, 100); }, );});
Deno.test("Session.shutdown", async (t) => { await t.step( "throws an error if the session is not started", () => { const { session } = createDummySession();
assertThrows(() => session.shutdown(), Error, "Session is not running"); }, );
await t.step( "unlocks specified reader and writer", async () => { const { session, input, output } = createDummySession();
session.start(); await session.shutdown(); assert(!input.reader.locked, "reader is locked"); assert(!output.writer.locked, "writer is locked"); }, );
await t.step( "waits until all messages are processed to the writer", async () => { const input = channel<Uint8Array>(); const { promise, resolve } = Promise.withResolvers<void>(); const session = new Session( input.reader, // Writer that is not processed until the guard is resolved new WritableStream({ async write() { await promise; }, }), );
session.start(); session.send(buildRequestMessage(1, "sum", [1, 2])); const shutdown = session.shutdown(); await assertRejects( () => deadline(shutdown, 100), DeadlineError, ); // Process all messages resolve(); await deadline(shutdown, 100); }, );});
Deno.test("Session.forceShutdown", async (t) => { await t.step( "throws an error if the session is not started", () => { const { session } = createDummySession();
assertThrows( () => session.forceShutdown(), Error, "Session is not running", ); }, );
await t.step( "unlocks specified reader and writer", async () => { const { session, input, output } = createDummySession();
session.start(); await session.forceShutdown(); assert(!input.reader.locked, "reader is locked"); assert(!output.writer.locked, "writer is locked"); }, );
await t.step( "does not wait until all messages are processed to the writer", async () => { const input = channel<Uint8Array>(); const { promise, resolve } = Promise.withResolvers<void>(); const session = new Session( input.reader, // Writer that is not processed until the guard is resolved new WritableStream({ async write() { await promise; }, }), );
session.start(); session.send(buildRequestMessage(1, "sum", [1, 2])); const shutdown = session.forceShutdown(); await deadline(shutdown, 100); resolve(); }, );});
Deno.test("Session.onInvalidMessage", async (t) => { await t.step( "is called when an invalid message is received", async () => { let called = false; const { session, input } = createDummySession(); session.onInvalidMessage = (message) => { called = true; assertEquals(message, "invalid"); }; session.start();
await push(input.writer, encode("invalid")); await session.shutdown(); assert(called, "onInvalidMessage is not called"); }, );
await t.step( "is called when an invalid message is received (array)", async () => { let called = false; const { session, input } = createDummySession(); session.onInvalidMessage = (message) => { called = true; assertEquals(message, [3, "invalid"]); }; session.start();
await push(input.writer, encode([3, "invalid"])); await session.shutdown(); assert(called, "onInvalidMessage is not called"); }, );});
Deno.test("Session.onMessageError", async (t) => { await t.step( "is called when handling a request message fails (sending a response fails)", async () => { const { promise, resolve } = Promise.withResolvers<void>(); let called: unknown; const { session, input } = createDummySession(); session.dispatcher = { sum() { return 3; }, }; session.onMessageError = (error, message) => { called = [error.message, message]; resolve(); }; session.start();
// deno-lint-ignore no-explicit-any (session as any).send = () => { throw new Error("send error"); };
await push(input.writer, encode(buildRequestMessage(1, "sum", [1, 2]))); await session.shutdown(); await promise; assertEquals(called, [ "send error", buildRequestMessage(1, "sum", [1, 2]), ]); }, );
await t.step( "is called when handling a response message fails (unexpected response message is received)", async () => { const { promise, resolve } = Promise.withResolvers<void>(); let called: unknown; const { session, input } = createDummySession(); session.dispatcher = { sum() { return 3; }, }; session.onMessageError = (error, message) => { called = [error.message, message]; resolve(); }; session.start();
await push(input.writer, encode(buildResponseMessage(1, null, 3))); await session.shutdown(); await promise; assertEquals(called, [ "Reservation with key 1 does not exist", buildResponseMessage(1, null, 3), ]); }, );
await t.step( "is called when handling a notification message fails (dispatch fails)", async () => { const { promise, resolve } = Promise.withResolvers<void>(); let called: unknown; const { session, input } = createDummySession(); session.dispatcher = { sum() { throw new Error("sum error"); }, }; session.onMessageError = (error, message) => { called = [error.message, message]; resolve(); }; session.start();
await push(input.writer, encode(buildNotificationMessage("sum", [1, 2]))); await session.shutdown(); await promise; assertEquals(called, [ "sum error", buildNotificationMessage("sum", [1, 2]), ]); }, );});