Skip to main content
Module

x/polkadot/rpc-provider/ws/index.ts

Package publishing for deno.land/x/polkadot
Go to Latest
File
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626

import type { Constructor } from 'https://deno.land/x/polkadot@0.2.40/util/types.ts';import type { EndpointStats, JsonRpcResponse, ProviderInterface, ProviderInterfaceCallback, ProviderInterfaceEmitCb, ProviderInterfaceEmitted, ProviderStats } from '../types.ts';
import { EventEmitter } from 'https://esm.sh/eventemitter3@5.0.1';
import { isChildClass, isNull, isUndefined, logger, objectSpread } from 'https://deno.land/x/polkadot@0.2.40/util/mod.ts';import { xglobal } from 'https://deno.land/x/polkadot@0.2.40/x-global/mod.ts';import { WebSocket } from 'https://deno.land/x/polkadot@0.2.40/x-ws/mod.ts';
import { RpcCoder } from '../coder/index.ts';import defaults from '../defaults.ts';import { LRUCache } from '../lru.ts';import { getWSErrorString } from './errors.ts';
interface SubscriptionHandler { callback: ProviderInterfaceCallback; type: string;}
interface WsStateAwaiting { callback: ProviderInterfaceCallback; method: string; params: unknown[]; start: number; subscription?: SubscriptionHandler | undefined;}
interface WsStateSubscription extends SubscriptionHandler { method: string; params: unknown[];}
const ALIASES: { [index: string]: string } = { chain_finalisedHead: 'chain_finalizedHead', chain_subscribeFinalisedHeads: 'chain_subscribeFinalizedHeads', chain_unsubscribeFinalisedHeads: 'chain_unsubscribeFinalizedHeads'};
const RETRY_DELAY = 2_500;
const DEFAULT_TIMEOUT_MS = 60 * 1000;const TIMEOUT_INTERVAL = 5_000;
const l = logger('api-ws');
/** @internal Clears a Record<*> of all keys, optionally with all callback on clear */function eraseRecord<T> (record: Record<string, T>, cb?: (item: T) => void): void { Object.keys(record).forEach((key): void => { if (cb) { cb(record[key]); }
delete record[key]; });}
/** @internal Creates a default/empty stats object */function defaultEndpointStats (): EndpointStats { return { bytesRecv: 0, bytesSent: 0, cached: 0, errors: 0, requests: 0, subscriptions: 0, timeout: 0 };}
/** * # @polkadot/rpc-provider/ws * * @name WsProvider * * @description The WebSocket Provider allows sending requests using WebSocket to a WebSocket RPC server TCP port. Unlike the [[HttpProvider]], it does support subscriptions and allows listening to events such as new blocks or balance changes. * * @example * <BR> * * ```javascript * import Api from 'https://deno.land/x/polkadot@0.2.40/api/promise/index.ts'; * import { WsProvider } from 'https://deno.land/x/polkadot@0.2.40/rpc-provider/ws/index.ts'; * * const provider = new WsProvider('ws://127.0.0.1:9944'); * const api = new Api(provider); * ``` * * @see [[HttpProvider]] */export class WsProvider implements ProviderInterface { readonly #callCache = new LRUCache(); readonly #coder: RpcCoder; readonly #endpoints: string[]; readonly #headers: Record<string, string>; readonly #eventemitter: EventEmitter; readonly #handlers: Record<string, WsStateAwaiting> = {}; readonly #isReadyPromise: Promise<WsProvider>; readonly #stats: ProviderStats; readonly #waitingForId: Record<string, JsonRpcResponse<unknown>> = {};
#autoConnectMs: number; #endpointIndex: number; #endpointStats: EndpointStats; #isConnected = false; #subscriptions: Record<string, WsStateSubscription> = {}; #timeoutId?: ReturnType<typeof setInterval> | null = null; #websocket: WebSocket | null; #timeout: number;
/** * @param {string | string[]} endpoint The endpoint url. Usually `ws://ip:9944` or `wss://ip:9944`, may provide an array of endpoint strings. * @param {number | false} autoConnectMs Whether to connect automatically or not (default). Provided value is used as a delay between retries. * @param {Record<string, string>} headers The headers provided to the underlying WebSocket * @param {number} [timeout] Custom timeout value used per request . Defaults to `DEFAULT_TIMEOUT_MS` */ constructor (endpoint: string | string[] = defaults.WS_URL, autoConnectMs: number | false = RETRY_DELAY, headers: Record<string, string> = {}, timeout?: number) { const endpoints = Array.isArray(endpoint) ? endpoint : [endpoint];
if (endpoints.length === 0) { throw new Error('WsProvider requires at least one Endpoint'); }
endpoints.forEach((endpoint) => { if (!/^(wss|ws):\/\//.test(endpoint)) { throw new Error(`Endpoint should start with 'ws://', received '${endpoint}'`); } });
this.#eventemitter = new EventEmitter(); this.#autoConnectMs = autoConnectMs || 0; this.#coder = new RpcCoder(); this.#endpointIndex = -1; this.#endpoints = endpoints; this.#headers = headers; this.#websocket = null; this.#stats = { active: { requests: 0, subscriptions: 0 }, total: defaultEndpointStats() }; this.#endpointStats = defaultEndpointStats(); this.#timeout = timeout || DEFAULT_TIMEOUT_MS;
if (autoConnectMs && autoConnectMs > 0) { this.connectWithRetry().catch((): void => { // does not throw }); }
this.#isReadyPromise = new Promise((resolve): void => { this.#eventemitter.once('connected', (): void => { resolve(this); }); }); }
/** * @summary `true` when this provider supports subscriptions */ public get hasSubscriptions (): boolean { return true; }
/** * @summary `true` when this provider supports clone() */ public get isClonable (): boolean { return true; }
/** * @summary Whether the node is connected or not. * @return {boolean} true if connected */ public get isConnected (): boolean { return this.#isConnected; }
/** * @description Promise that resolves the first time we are connected and loaded */ public get isReady (): Promise<WsProvider> { return this.#isReadyPromise; }
public get endpoint (): string { return this.#endpoints[this.#endpointIndex]; }
/** * @description Returns a clone of the object */ public clone (): WsProvider { return new WsProvider(this.#endpoints); }
protected selectEndpointIndex (endpoints: string[]): number { return (this.#endpointIndex + 1) % endpoints.length; }
/** * @summary Manually connect * @description The [[WsProvider]] connects automatically by default, however if you decided otherwise, you may * connect manually using this method. */ // eslint-disable-next-line @typescript-eslint/require-await public async connect (): Promise<void> { if (this.#websocket) { throw new Error('WebSocket is already connected'); }
try { this.#endpointIndex = this.selectEndpointIndex(this.#endpoints);
// the as here is Deno-specific - not available on the globalThis this.#websocket = typeof xglobal.WebSocket !== 'undefined' && isChildClass(xglobal.WebSocket as unknown as Constructor<WebSocket>, WebSocket) ? new WebSocket(this.endpoint) // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore - WS may be an instance of ws, which supports options : new WebSocket(this.endpoint, undefined, { headers: this.#headers });
if (this.#websocket) { this.#websocket.onclose = this.#onSocketClose; this.#websocket.onerror = this.#onSocketError; this.#websocket.onmessage = this.#onSocketMessage; this.#websocket.onopen = this.#onSocketOpen; }
// timeout any handlers that have not had a response this.#timeoutId = setInterval(() => this.#timeoutHandlers(), TIMEOUT_INTERVAL); } catch (error) { l.error(error);
this.#emit('error', error);
throw error; } }
/** * @description Connect, never throwing an error, but rather forcing a retry */ public async connectWithRetry (): Promise<void> { if (this.#autoConnectMs > 0) { try { await this.connect(); } catch { setTimeout((): void => { this.connectWithRetry().catch((): void => { // does not throw }); }, this.#autoConnectMs); } } }
/** * @description Manually disconnect from the connection, clearing auto-connect logic */ // eslint-disable-next-line @typescript-eslint/require-await public async disconnect (): Promise<void> { // switch off autoConnect, we are in manual mode now this.#autoConnectMs = 0;
try { if (this.#websocket) { // 1000 - Normal closure; the connection successfully completed this.#websocket.close(1000); } } catch (error) { l.error(error);
this.#emit('error', error);
throw error; } }
/** * @description Returns the connection stats */ public get stats (): ProviderStats { return { active: { requests: Object.keys(this.#handlers).length, subscriptions: Object.keys(this.#subscriptions).length }, total: this.#stats.total }; }
public get endpointStats (): EndpointStats { return this.#endpointStats; }
/** * @summary Listens on events after having subscribed using the [[subscribe]] function. * @param {ProviderInterfaceEmitted} type Event * @param {ProviderInterfaceEmitCb} sub Callback * @return unsubscribe function */ public on (type: ProviderInterfaceEmitted, sub: ProviderInterfaceEmitCb): () => void { this.#eventemitter.on(type, sub);
return (): void => { this.#eventemitter.removeListener(type, sub); }; }
/** * @summary Send JSON data using WebSockets to configured HTTP Endpoint or queue. * @param method The RPC methods to execute * @param params Encoded parameters as applicable for the method * @param subscription Subscription details (internally used) */ public send <T = any> (method: string, params: unknown[], isCacheable?: boolean, subscription?: SubscriptionHandler): Promise<T> { this.#endpointStats.requests++; this.#stats.total.requests++;
const [id, body] = this.#coder.encodeJson(method, params); let resultPromise: Promise<T> | null = isCacheable ? this.#callCache.get(body) : null;
if (!resultPromise) { resultPromise = this.#send(id, body, method, params, subscription);
if (isCacheable) { this.#callCache.set(body, resultPromise); } } else { this.#endpointStats.cached++; this.#stats.total.cached++; }
return resultPromise; }
async #send <T> (id: number, body: string, method: string, params: unknown[], subscription?: SubscriptionHandler): Promise<T> { return new Promise<T>((resolve, reject): void => { try { if (!this.isConnected || this.#websocket === null) { throw new Error('WebSocket is not connected'); }
const callback = (error?: Error | null, result?: T): void => { error ? reject(error) : resolve(result as T); };
l.debug(() => ['calling', method, body]);
this.#handlers[id] = { callback, method, params, start: Date.now(), subscription };
const bytesSent = body.length;
this.#endpointStats.bytesSent += bytesSent; this.#stats.total.bytesSent += bytesSent;
this.#websocket.send(body); } catch (error) { this.#endpointStats.errors++; this.#stats.total.errors++;
reject(error); } }); }
/** * @name subscribe * @summary Allows subscribing to a specific event. * * @example * <BR> * * ```javascript * const provider = new WsProvider('ws://127.0.0.1:9944'); * const rpc = new Rpc(provider); * * rpc.state.subscribeStorage([[storage.system.account, <Address>]], (_, values) => { * console.log(values) * }).then((subscriptionId) => { * console.log('balance changes subscription id: ', subscriptionId) * }) * ``` */ public subscribe (type: string, method: string, params: unknown[], callback: ProviderInterfaceCallback): Promise<number | string> { this.#endpointStats.subscriptions++; this.#stats.total.subscriptions++;
// subscriptions are not cached, LRU applies to .at(<blockHash>) only return this.send<number | string>(method, params, false, { callback, type }); }
/** * @summary Allows unsubscribing to subscriptions made with [[subscribe]]. */ public async unsubscribe (type: string, method: string, id: number | string): Promise<boolean> { const subscription = `${type}::${id}`;
// FIXME This now could happen with re-subscriptions. The issue is that with a re-sub // the assigned id now does not match what the API user originally received. It has // a slight complication in solving - since we cannot rely on the send id, but rather // need to find the actual subscription id to map it if (isUndefined(this.#subscriptions[subscription])) { l.debug(() => `Unable to find active subscription=${subscription}`);
return false; }
delete this.#subscriptions[subscription];
try { return this.isConnected && !isNull(this.#websocket) ? this.send<boolean>(method, [id]) : true; } catch { return false; } }
#emit = (type: ProviderInterfaceEmitted, ...args: unknown[]): void => { this.#eventemitter.emit(type, ...args); };
#onSocketClose = (event: CloseEvent): void => { const error = new Error(`disconnected from ${this.endpoint}: ${event.code}:: ${event.reason || getWSErrorString(event.code)}`);
if (this.#autoConnectMs > 0) { l.error(error.message); }
this.#isConnected = false;
if (this.#websocket) { this.#websocket.onclose = null; this.#websocket.onerror = null; this.#websocket.onmessage = null; this.#websocket.onopen = null; this.#websocket = null; }
if (this.#timeoutId) { clearInterval(this.#timeoutId); this.#timeoutId = null; }
// reject all hanging requests eraseRecord(this.#handlers, (h) => { try { h.callback(error, undefined); } catch (err) { // does not throw l.error(err); } }); eraseRecord(this.#waitingForId);
// Reset stats for active endpoint this.#endpointStats = defaultEndpointStats();
this.#emit('disconnected');
if (this.#autoConnectMs > 0) { setTimeout((): void => { this.connectWithRetry().catch(() => { // does not throw }); }, this.#autoConnectMs); } };
#onSocketError = (error: Event): void => { l.debug(() => ['socket error', error]); this.#emit('error', error); };
#onSocketMessage = (message: MessageEvent<string>): void => { l.debug(() => ['received', message.data]);
const bytesRecv = message.data.length;
this.#endpointStats.bytesRecv += bytesRecv; this.#stats.total.bytesRecv += bytesRecv;
const response = JSON.parse(message.data) as JsonRpcResponse<string>;
return isUndefined(response.method) ? this.#onSocketMessageResult(response) : this.#onSocketMessageSubscribe(response); };
#onSocketMessageResult = (response: JsonRpcResponse<string>): void => { const handler = this.#handlers[response.id];
if (!handler) { l.debug(() => `Unable to find handler for id=${response.id}`);
return; }
try { const { method, params, subscription } = handler; const result = this.#coder.decodeResponse<string>(response);
// first send the result - in case of subs, we may have an update // immediately if we have some queued results already handler.callback(null, result);
if (subscription) { const subId = `${subscription.type}::${result}`;
this.#subscriptions[subId] = objectSpread({}, subscription, { method, params });
// if we have a result waiting for this subscription already if (this.#waitingForId[subId]) { this.#onSocketMessageSubscribe(this.#waitingForId[subId]); } } } catch (error) { this.#endpointStats.errors++; this.#stats.total.errors++;
handler.callback(error as Error, undefined); }
delete this.#handlers[response.id]; };
#onSocketMessageSubscribe = (response: JsonRpcResponse<unknown>): void => { const method = ALIASES[response.method as string] || response.method || 'invalid'; const subId = `${method}::${response.params.subscription}`; const handler = this.#subscriptions[subId];
if (!handler) { // store the JSON, we could have out-of-order subid coming in this.#waitingForId[subId] = response;
l.debug(() => `Unable to find handler for subscription=${subId}`);
return; }
// housekeeping delete this.#waitingForId[subId];
try { const result = this.#coder.decodeResponse(response);
handler.callback(null, result); } catch (error) { this.#endpointStats.errors++; this.#stats.total.errors++;
handler.callback(error as Error, undefined); } };
#onSocketOpen = (): boolean => { if (this.#websocket === null) { throw new Error('WebSocket cannot be null in onOpen'); }
l.debug(() => ['connected to', this.endpoint]);
this.#isConnected = true;
this.#resubscribe();
this.#emit('connected');
return true; };
#resubscribe = (): void => { const subscriptions = this.#subscriptions;
this.#subscriptions = {};
Promise.all(Object.keys(subscriptions).map(async (id): Promise<void> => { const { callback, method, params, type } = subscriptions[id];
// only re-create subscriptions which are not in author (only area where // transactions are created, i.e. submissions such as 'author_submitAndWatchExtrinsic' // are not included (and will not be re-broadcast) if (type.startsWith('author_')) { return; }
try { await this.subscribe(type, method, params, callback); } catch (error) { l.error(error); } })).catch(l.error); };
#timeoutHandlers = (): void => { const now = Date.now(); const ids = Object.keys(this.#handlers);
for (let i = 0, count = ids.length; i < count; i++) { const handler = this.#handlers[ids[i]];
if ((now - handler.start) > this.#timeout) { try { handler.callback(new Error(`No response received from RPC endpoint in ${this.#timeout / 1000}s`), undefined); } catch { // ignore }
this.#endpointStats.timeout++; this.#stats.total.timeout++; delete this.#handlers[ids[i]]; } } };}