import type { Constructor } from 'https://deno.land/x/polkadot@0.2.40/util/types.ts';import type { EndpointStats, JsonRpcResponse, ProviderInterface, ProviderInterfaceCallback, ProviderInterfaceEmitCb, ProviderInterfaceEmitted, ProviderStats } from '../types.ts';
import { EventEmitter } from 'https://esm.sh/eventemitter3@5.0.1';
import { isChildClass, isNull, isUndefined, logger, objectSpread } from 'https://deno.land/x/polkadot@0.2.40/util/mod.ts';import { xglobal } from 'https://deno.land/x/polkadot@0.2.40/x-global/mod.ts';import { WebSocket } from 'https://deno.land/x/polkadot@0.2.40/x-ws/mod.ts';
import { RpcCoder } from '../coder/index.ts';import defaults from '../defaults.ts';import { LRUCache } from '../lru.ts';import { getWSErrorString } from './errors.ts';
interface SubscriptionHandler { callback: ProviderInterfaceCallback; type: string;}
interface WsStateAwaiting { callback: ProviderInterfaceCallback; method: string; params: unknown[]; start: number; subscription?: SubscriptionHandler | undefined;}
interface WsStateSubscription extends SubscriptionHandler { method: string; params: unknown[];}
const ALIASES: { [index: string]: string } = { chain_finalisedHead: 'chain_finalizedHead', chain_subscribeFinalisedHeads: 'chain_subscribeFinalizedHeads', chain_unsubscribeFinalisedHeads: 'chain_unsubscribeFinalizedHeads'};
const RETRY_DELAY = 2_500;
const DEFAULT_TIMEOUT_MS = 60 * 1000;const TIMEOUT_INTERVAL = 5_000;
const l = logger('api-ws');
function eraseRecord<T> (record: Record<string, T>, cb?: (item: T) => void): void { Object.keys(record).forEach((key): void => { if (cb) { cb(record[key]); }
delete record[key]; });}
function defaultEndpointStats (): EndpointStats { return { bytesRecv: 0, bytesSent: 0, cached: 0, errors: 0, requests: 0, subscriptions: 0, timeout: 0 };}
export class WsProvider implements ProviderInterface { readonly #callCache = new LRUCache(); readonly #coder: RpcCoder; readonly #endpoints: string[]; readonly #headers: Record<string, string>; readonly #eventemitter: EventEmitter; readonly #handlers: Record<string, WsStateAwaiting> = {}; readonly #isReadyPromise: Promise<WsProvider>; readonly #stats: ProviderStats; readonly #waitingForId: Record<string, JsonRpcResponse<unknown>> = {};
#autoConnectMs: number; #endpointIndex: number; #endpointStats: EndpointStats; #isConnected = false; #subscriptions: Record<string, WsStateSubscription> = {}; #timeoutId?: ReturnType<typeof setInterval> | null = null; #websocket: WebSocket | null; #timeout: number;
constructor (endpoint: string | string[] = defaults.WS_URL, autoConnectMs: number | false = RETRY_DELAY, headers: Record<string, string> = {}, timeout?: number) { const endpoints = Array.isArray(endpoint) ? endpoint : [endpoint];
if (endpoints.length === 0) { throw new Error('WsProvider requires at least one Endpoint'); }
endpoints.forEach((endpoint) => { if (!/^(wss|ws):\/\//.test(endpoint)) { throw new Error(`Endpoint should start with 'ws://', received '${endpoint}'`); } });
this.#eventemitter = new EventEmitter(); this.#autoConnectMs = autoConnectMs || 0; this.#coder = new RpcCoder(); this.#endpointIndex = -1; this.#endpoints = endpoints; this.#headers = headers; this.#websocket = null; this.#stats = { active: { requests: 0, subscriptions: 0 }, total: defaultEndpointStats() }; this.#endpointStats = defaultEndpointStats(); this.#timeout = timeout || DEFAULT_TIMEOUT_MS;
if (autoConnectMs && autoConnectMs > 0) { this.connectWithRetry().catch((): void => { }); }
this.#isReadyPromise = new Promise((resolve): void => { this.#eventemitter.once('connected', (): void => { resolve(this); }); }); }
public get hasSubscriptions (): boolean { return true; }
public get isClonable (): boolean { return true; }
public get isConnected (): boolean { return this.#isConnected; }
public get isReady (): Promise<WsProvider> { return this.#isReadyPromise; }
public get endpoint (): string { return this.#endpoints[this.#endpointIndex]; }
public clone (): WsProvider { return new WsProvider(this.#endpoints); }
protected selectEndpointIndex (endpoints: string[]): number { return (this.#endpointIndex + 1) % endpoints.length; }
public async connect (): Promise<void> { if (this.#websocket) { throw new Error('WebSocket is already connected'); }
try { this.#endpointIndex = this.selectEndpointIndex(this.#endpoints);
this.#websocket = typeof xglobal.WebSocket !== 'undefined' && isChildClass(xglobal.WebSocket as unknown as Constructor<WebSocket>, WebSocket) ? new WebSocket(this.endpoint) : new WebSocket(this.endpoint, undefined, { headers: this.#headers });
if (this.#websocket) { this.#websocket.onclose = this.#onSocketClose; this.#websocket.onerror = this.#onSocketError; this.#websocket.onmessage = this.#onSocketMessage; this.#websocket.onopen = this.#onSocketOpen; }
this.#timeoutId = setInterval(() => this.#timeoutHandlers(), TIMEOUT_INTERVAL); } catch (error) { l.error(error);
this.#emit('error', error);
throw error; } }
public async connectWithRetry (): Promise<void> { if (this.#autoConnectMs > 0) { try { await this.connect(); } catch { setTimeout((): void => { this.connectWithRetry().catch((): void => { }); }, this.#autoConnectMs); } } }
public async disconnect (): Promise<void> { this.#autoConnectMs = 0;
try { if (this.#websocket) { this.#websocket.close(1000); } } catch (error) { l.error(error);
this.#emit('error', error);
throw error; } }
public get stats (): ProviderStats { return { active: { requests: Object.keys(this.#handlers).length, subscriptions: Object.keys(this.#subscriptions).length }, total: this.#stats.total }; }
public get endpointStats (): EndpointStats { return this.#endpointStats; }
public on (type: ProviderInterfaceEmitted, sub: ProviderInterfaceEmitCb): () => void { this.#eventemitter.on(type, sub);
return (): void => { this.#eventemitter.removeListener(type, sub); }; }
public send <T = any> (method: string, params: unknown[], isCacheable?: boolean, subscription?: SubscriptionHandler): Promise<T> { this.#endpointStats.requests++; this.#stats.total.requests++;
const [id, body] = this.#coder.encodeJson(method, params); let resultPromise: Promise<T> | null = isCacheable ? this.#callCache.get(body) : null;
if (!resultPromise) { resultPromise = this.#send(id, body, method, params, subscription);
if (isCacheable) { this.#callCache.set(body, resultPromise); } } else { this.#endpointStats.cached++; this.#stats.total.cached++; }
return resultPromise; }
async #send <T> (id: number, body: string, method: string, params: unknown[], subscription?: SubscriptionHandler): Promise<T> { return new Promise<T>((resolve, reject): void => { try { if (!this.isConnected || this.#websocket === null) { throw new Error('WebSocket is not connected'); }
const callback = (error?: Error | null, result?: T): void => { error ? reject(error) : resolve(result as T); };
l.debug(() => ['calling', method, body]);
this.#handlers[id] = { callback, method, params, start: Date.now(), subscription };
const bytesSent = body.length;
this.#endpointStats.bytesSent += bytesSent; this.#stats.total.bytesSent += bytesSent;
this.#websocket.send(body); } catch (error) { this.#endpointStats.errors++; this.#stats.total.errors++;
reject(error); } }); }
public subscribe (type: string, method: string, params: unknown[], callback: ProviderInterfaceCallback): Promise<number | string> { this.#endpointStats.subscriptions++; this.#stats.total.subscriptions++;
return this.send<number | string>(method, params, false, { callback, type }); }
public async unsubscribe (type: string, method: string, id: number | string): Promise<boolean> { const subscription = `${type}::${id}`;
if (isUndefined(this.#subscriptions[subscription])) { l.debug(() => `Unable to find active subscription=${subscription}`);
return false; }
delete this.#subscriptions[subscription];
try { return this.isConnected && !isNull(this.#websocket) ? this.send<boolean>(method, [id]) : true; } catch { return false; } }
#emit = (type: ProviderInterfaceEmitted, ...args: unknown[]): void => { this.#eventemitter.emit(type, ...args); };
#onSocketClose = (event: CloseEvent): void => { const error = new Error(`disconnected from ${this.endpoint}: ${event.code}:: ${event.reason || getWSErrorString(event.code)}`);
if (this.#autoConnectMs > 0) { l.error(error.message); }
this.#isConnected = false;
if (this.#websocket) { this.#websocket.onclose = null; this.#websocket.onerror = null; this.#websocket.onmessage = null; this.#websocket.onopen = null; this.#websocket = null; }
if (this.#timeoutId) { clearInterval(this.#timeoutId); this.#timeoutId = null; }
eraseRecord(this.#handlers, (h) => { try { h.callback(error, undefined); } catch (err) { l.error(err); } }); eraseRecord(this.#waitingForId);
this.#endpointStats = defaultEndpointStats();
this.#emit('disconnected');
if (this.#autoConnectMs > 0) { setTimeout((): void => { this.connectWithRetry().catch(() => { }); }, this.#autoConnectMs); } };
#onSocketError = (error: Event): void => { l.debug(() => ['socket error', error]); this.#emit('error', error); };
#onSocketMessage = (message: MessageEvent<string>): void => { l.debug(() => ['received', message.data]);
const bytesRecv = message.data.length;
this.#endpointStats.bytesRecv += bytesRecv; this.#stats.total.bytesRecv += bytesRecv;
const response = JSON.parse(message.data) as JsonRpcResponse<string>;
return isUndefined(response.method) ? this.#onSocketMessageResult(response) : this.#onSocketMessageSubscribe(response); };
#onSocketMessageResult = (response: JsonRpcResponse<string>): void => { const handler = this.#handlers[response.id];
if (!handler) { l.debug(() => `Unable to find handler for id=${response.id}`);
return; }
try { const { method, params, subscription } = handler; const result = this.#coder.decodeResponse<string>(response);
handler.callback(null, result);
if (subscription) { const subId = `${subscription.type}::${result}`;
this.#subscriptions[subId] = objectSpread({}, subscription, { method, params });
if (this.#waitingForId[subId]) { this.#onSocketMessageSubscribe(this.#waitingForId[subId]); } } } catch (error) { this.#endpointStats.errors++; this.#stats.total.errors++;
handler.callback(error as Error, undefined); }
delete this.#handlers[response.id]; };
#onSocketMessageSubscribe = (response: JsonRpcResponse<unknown>): void => { const method = ALIASES[response.method as string] || response.method || 'invalid'; const subId = `${method}::${response.params.subscription}`; const handler = this.#subscriptions[subId];
if (!handler) { this.#waitingForId[subId] = response;
l.debug(() => `Unable to find handler for subscription=${subId}`);
return; }
delete this.#waitingForId[subId];
try { const result = this.#coder.decodeResponse(response);
handler.callback(null, result); } catch (error) { this.#endpointStats.errors++; this.#stats.total.errors++;
handler.callback(error as Error, undefined); } };
#onSocketOpen = (): boolean => { if (this.#websocket === null) { throw new Error('WebSocket cannot be null in onOpen'); }
l.debug(() => ['connected to', this.endpoint]);
this.#isConnected = true;
this.#resubscribe();
this.#emit('connected');
return true; };
#resubscribe = (): void => { const subscriptions = this.#subscriptions;
this.#subscriptions = {};
Promise.all(Object.keys(subscriptions).map(async (id): Promise<void> => { const { callback, method, params, type } = subscriptions[id];
if (type.startsWith('author_')) { return; }
try { await this.subscribe(type, method, params, callback); } catch (error) { l.error(error); } })).catch(l.error); };
#timeoutHandlers = (): void => { const now = Date.now(); const ids = Object.keys(this.#handlers);
for (let i = 0, count = ids.length; i < count; i++) { const handler = this.#handlers[ids[i]];
if ((now - handler.start) > this.#timeout) { try { handler.callback(new Error(`No response received from RPC endpoint in ${this.#timeout / 1000}s`), undefined); } catch { }
this.#endpointStats.timeout++; this.#stats.total.timeout++; delete this.#handlers[ids[i]]; } } };}