mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 22:43:34 +08:00
fix: handle websocket messages with a priority queue (#1427)
* fix: handle websocket messages with a priority queue * fix: set ping timeout to 5s on socket io * fix: timeout socket.io connections on emit * fix: add send timeouts to ws-rpc * test: socket-io ping and emit timeout disconnects * docs: new ws timeout env vars * docs: update queue timeout env var * style: remove unused export * fix: socket message promises
This commit is contained in:
16
.env
16
.env
@@ -138,6 +138,22 @@ MAINNET_SEND_MANY_CONTRACT_ID=SP3FBR2AGK5H9QBDH3EEN6DF8EK8JY7RX8QJ5SVTE.send-man
|
||||
# IMGIX_DOMAIN=https://<your domain>.imgix.net
|
||||
# IMGIX_TOKEN=<your token>
|
||||
|
||||
# Web Socket ping interval to determine client availability, in seconds.
|
||||
# STACKS_API_WS_PING_INTERVAL=5
|
||||
|
||||
# Web Socket ping timeout, in seconds. Clients will be dropped if they do not respond with a pong
|
||||
# after this time has elapsed.
|
||||
# STACKS_API_WS_PING_TIMEOUT=5
|
||||
|
||||
# Web Socket message timeout, in seconds. Clients will be dropped if they do not acknowledge a
|
||||
# message after this time has elapsed.
|
||||
# STACKS_API_WS_MESSAGE_TIMEOUT=5
|
||||
|
||||
# Web Socket update queue timeout, in seconds. When an update is scheduled (new block, tx update,
|
||||
# etc.), we will allow this number of seconds to elapse to allow all subscribed clients to receive
|
||||
# new data.
|
||||
# STACKS_API_WS_UPDATE_QUEUE_TIMEOUT=5
|
||||
|
||||
# Specify max number of STX address to store in an in-memory LRU cache (CPU optimization).
|
||||
# Defaults to 50,000, which should result in around 25 megabytes of additional memory usage.
|
||||
# STACKS_ADDRESS_CACHE_SIZE=10000
|
||||
|
||||
34
docs/socket-io/index.d.ts
vendored
34
docs/socket-io/index.d.ts
vendored
@@ -24,34 +24,38 @@ export type Topic =
|
||||
| NftAssetEventTopic
|
||||
| NftCollectionEventTopic;
|
||||
|
||||
// Allows timeout callbacks for messages. See
|
||||
// https://socket.io/docs/v4/typescript/#emitting-with-a-timeout
|
||||
type WithTimeoutAck<isSender extends boolean, args extends any[]> = isSender extends true ? [Error, ...args] : args;
|
||||
|
||||
export interface ClientToServerMessages {
|
||||
subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void;
|
||||
unsubscribe: (...topic: Topic[]) => void;
|
||||
}
|
||||
|
||||
export interface ServerToClientMessages {
|
||||
block: (block: Block) => void;
|
||||
microblock: (microblock: Microblock) => void;
|
||||
mempool: (transaction: MempoolTransaction) => void;
|
||||
transaction: (transaction: Transaction | MempoolTransaction) => void;
|
||||
export interface ServerToClientMessages<isSender extends boolean = false> {
|
||||
block: (block: Block, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
transaction: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
|
||||
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
|
||||
[key: 'nft-event']: (event: NftEvent) => void;
|
||||
'nft-event': (event: NftEvent) => void;
|
||||
[key: 'nft-event']: (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
|
||||
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
|
||||
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent) => void;
|
||||
'nft-asset-event': (assetIdentifier: string, value: string, event: NftEvent) => void;
|
||||
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
'nft-asset-event': (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
|
||||
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
|
||||
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent) => void;
|
||||
'nft-collection-event': (assetIdentifier: string, event: NftEvent) => void;
|
||||
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
'nft-collection-event': (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
|
||||
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
|
||||
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers) => void;
|
||||
'address-transaction': (address: string, tx: AddressTransactionWithTransfers) => void;
|
||||
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
'address-transaction': (address: string, tx: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
|
||||
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
|
||||
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse) => void;
|
||||
'address-stx-balance': (address: string, stxBalance: AddressStxBalanceResponse) => void;
|
||||
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
'address-stx-balance': (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
}
|
||||
|
||||
@@ -17,12 +17,17 @@ import {
|
||||
WebSocketPayload,
|
||||
WebSocketTopics,
|
||||
} from '../web-socket-channel';
|
||||
import {
|
||||
getWsMessageTimeoutMs,
|
||||
getWsPingIntervalMs,
|
||||
getWsPingTimeoutMs,
|
||||
} from '../web-socket-transmitter';
|
||||
|
||||
/**
|
||||
* SocketIO channel for sending real time API updates.
|
||||
*/
|
||||
export class SocketIOChannel extends WebSocketChannel {
|
||||
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages>;
|
||||
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>;
|
||||
private adapter?: Adapter;
|
||||
|
||||
constructor(server: http.Server) {
|
||||
@@ -33,9 +38,14 @@ export class SocketIOChannel extends WebSocketChannel {
|
||||
}
|
||||
|
||||
connect(): void {
|
||||
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(this.server, {
|
||||
cors: { origin: '*' },
|
||||
});
|
||||
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>(
|
||||
this.server,
|
||||
{
|
||||
cors: { origin: '*' },
|
||||
pingInterval: getWsPingIntervalMs(),
|
||||
pingTimeout: getWsPingTimeoutMs(),
|
||||
}
|
||||
);
|
||||
this.io = io;
|
||||
|
||||
io.on('connection', async socket => {
|
||||
@@ -153,6 +163,21 @@ export class SocketIOChannel extends WebSocketChannel {
|
||||
return false;
|
||||
}
|
||||
|
||||
private async getTopicSockets(room: Topic) {
|
||||
if (!this.io) {
|
||||
return;
|
||||
}
|
||||
const sockets = [];
|
||||
const socketIds = await this.io.to(room).allSockets();
|
||||
for (const id of socketIds) {
|
||||
const socket = this.io.sockets.sockets.get(id);
|
||||
if (socket) {
|
||||
sockets.push(socket);
|
||||
}
|
||||
}
|
||||
return sockets;
|
||||
}
|
||||
|
||||
send<P extends keyof WebSocketPayload>(
|
||||
payload: P,
|
||||
...args: ListenerType<WebSocketPayload[P]>
|
||||
@@ -160,35 +185,58 @@ export class SocketIOChannel extends WebSocketChannel {
|
||||
if (!this.io) {
|
||||
return;
|
||||
}
|
||||
// If a client takes more than this number of ms to respond to an event `emit`, it will be
|
||||
// disconnected.
|
||||
const timeout = getWsMessageTimeoutMs();
|
||||
switch (payload) {
|
||||
case 'block': {
|
||||
const [block] = args as ListenerType<WebSocketPayload['block']>;
|
||||
this.prometheus?.sendEvent('block');
|
||||
this.io.to('block').emit('block', block);
|
||||
void this.getTopicSockets('block').then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket.timeout(timeout).emit('block', block, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'microblock': {
|
||||
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
|
||||
this.prometheus?.sendEvent('microblock');
|
||||
this.io.to('microblock').emit('microblock', microblock);
|
||||
void this.getTopicSockets('microblock').then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket.timeout(timeout).emit('microblock', microblock, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'mempoolTransaction': {
|
||||
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
|
||||
this.prometheus?.sendEvent('mempool');
|
||||
this.io.to('mempool').emit('mempool', tx);
|
||||
void this.getTopicSockets('mempool').then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket.timeout(timeout).emit('mempool', tx, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'transaction': {
|
||||
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
|
||||
this.prometheus?.sendEvent('transaction');
|
||||
this.io.to(`transaction:${tx.tx_id}`).emit('transaction', tx);
|
||||
void this.getTopicSockets(`transaction:${tx.tx_id}`).then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket.timeout(timeout).emit('transaction', tx, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'nftEvent': {
|
||||
const [event] = args as ListenerType<WebSocketPayload['nftEvent']>;
|
||||
this.prometheus?.sendEvent('nft-event');
|
||||
this.io.to('nft-event').emit('nft-event', event);
|
||||
void this.getTopicSockets(`nft-event`).then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket.timeout(timeout).emit('nft-event', event, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'nftAssetEvent': {
|
||||
@@ -196,7 +244,13 @@ export class SocketIOChannel extends WebSocketChannel {
|
||||
WebSocketPayload['nftAssetEvent']
|
||||
>;
|
||||
this.prometheus?.sendEvent('nft-asset-event');
|
||||
this.io.to('nft-event').emit('nft-asset-event', assetIdentifier, value, event);
|
||||
void this.getTopicSockets(`nft-event`).then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket
|
||||
.timeout(timeout)
|
||||
.emit('nft-asset-event', assetIdentifier, value, event, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'nftCollectionEvent': {
|
||||
@@ -204,23 +258,41 @@ export class SocketIOChannel extends WebSocketChannel {
|
||||
WebSocketPayload['nftCollectionEvent']
|
||||
>;
|
||||
this.prometheus?.sendEvent('nft-collection-event');
|
||||
this.io.to('nft-event').emit('nft-collection-event', assetIdentifier, event);
|
||||
void this.getTopicSockets(`nft-event`).then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket
|
||||
.timeout(timeout)
|
||||
.emit('nft-collection-event', assetIdentifier, event, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'principalTransaction': {
|
||||
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
|
||||
const topic: AddressTransactionTopic = `address-transaction:${principal}`;
|
||||
this.prometheus?.sendEvent('address-transaction');
|
||||
this.io.to(topic).emit('address-transaction', principal, tx);
|
||||
this.io.to(topic).emit(topic, principal, tx);
|
||||
void this.getTopicSockets(topic).then(sockets =>
|
||||
sockets?.forEach(socket => {
|
||||
socket
|
||||
.timeout(timeout)
|
||||
.emit('address-transaction', principal, tx, _ => socket.disconnect(true));
|
||||
socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true));
|
||||
})
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'principalStxBalance': {
|
||||
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
|
||||
const topic: AddressStxBalanceTopic = `address-stx-balance:${principal}`;
|
||||
this.prometheus?.sendEvent('address-stx-balance');
|
||||
this.io.to(topic).emit('address-stx-balance', principal, balance);
|
||||
this.io.to(topic).emit(topic, principal, balance);
|
||||
void this.getTopicSockets(topic).then(sockets =>
|
||||
sockets?.forEach(socket => {
|
||||
socket
|
||||
.timeout(timeout)
|
||||
.emit('address-stx-balance', principal, balance, _ => socket.disconnect(true));
|
||||
socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true));
|
||||
})
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
import * as http from 'http';
|
||||
import * as WebSocket from 'ws';
|
||||
import * as net from 'net';
|
||||
import { isProdEnv, isValidPrincipal, logError, normalizeHashString } from '../../../../helpers';
|
||||
import {
|
||||
isProdEnv,
|
||||
isValidPrincipal,
|
||||
logError,
|
||||
normalizeHashString,
|
||||
resolveOrTimeout,
|
||||
} from '../../../../helpers';
|
||||
import { WebSocketPrometheus } from '../web-socket-prometheus';
|
||||
import {
|
||||
ListenerType,
|
||||
@@ -36,6 +42,7 @@ import {
|
||||
RpcNftCollectionEventSubscriptionParams,
|
||||
NftEvent,
|
||||
} from '@stacks/stacks-blockchain-api-types';
|
||||
import { getWsMessageTimeoutMs, getWsPingIntervalMs } from '../web-socket-transmitter';
|
||||
|
||||
type Subscription =
|
||||
| RpcTxUpdateSubscriptionParams
|
||||
@@ -58,7 +65,7 @@ class SubscriptionManager {
|
||||
// Sockets that are responding to ping.
|
||||
liveSockets: Set<WebSocket> = new Set();
|
||||
heartbeatInterval?: NodeJS.Timeout;
|
||||
readonly heartbeatIntervalMs = 5_000;
|
||||
readonly heartbeatIntervalMs = getWsPingIntervalMs();
|
||||
|
||||
addSubscription(client: WebSocket, topicId: string) {
|
||||
if (this.subscriptions.size === 0) {
|
||||
@@ -567,10 +574,16 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
|
||||
private processTxUpdate(tx: Transaction | MempoolTransaction) {
|
||||
try {
|
||||
const subscribers = this.subscriptions.get('transaction')?.subscriptions.get(tx.tx_id);
|
||||
const manager = this.subscriptions.get('transaction');
|
||||
if (!manager) {
|
||||
return;
|
||||
}
|
||||
const subscribers = manager.subscriptions.get(tx.tx_id);
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification('tx_update', tx).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
subscribers.forEach(client =>
|
||||
this.sendWithTimeout(manager, tx.tx_id, client, rpcNotificationPayload)
|
||||
);
|
||||
this.prometheus?.sendEvent('transaction');
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -580,9 +593,11 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
|
||||
private processAddressUpdate(principal: string, tx: AddressTransactionWithTransfers) {
|
||||
try {
|
||||
const subscribers = this.subscriptions
|
||||
.get('principalTransactions')
|
||||
?.subscriptions.get(principal);
|
||||
const manager = this.subscriptions.get('principalTransactions');
|
||||
if (!manager) {
|
||||
return;
|
||||
}
|
||||
const subscribers = manager.subscriptions.get(principal);
|
||||
if (subscribers) {
|
||||
const updateNotification = {
|
||||
address: principal,
|
||||
@@ -595,7 +610,9 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
'address_tx_update',
|
||||
updateNotification
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
subscribers.forEach(client =>
|
||||
this.sendWithTimeout(manager, principal, client, rpcNotificationPayload)
|
||||
);
|
||||
this.prometheus?.sendEvent('address-transaction');
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -604,7 +621,11 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
}
|
||||
|
||||
private processAddressBalanceUpdate(principal: string, balance: AddressStxBalanceResponse) {
|
||||
const subscribers = this.subscriptions.get('principalStxBalance')?.subscriptions.get(principal);
|
||||
const manager = this.subscriptions.get('principalStxBalance');
|
||||
if (!manager) {
|
||||
return;
|
||||
}
|
||||
const subscribers = manager.subscriptions.get(principal);
|
||||
if (subscribers) {
|
||||
try {
|
||||
const balanceNotification = {
|
||||
@@ -615,7 +636,9 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
'address_balance_update',
|
||||
balanceNotification
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
subscribers.forEach(client =>
|
||||
this.sendWithTimeout(manager, principal, client, rpcNotificationPayload)
|
||||
);
|
||||
this.prometheus?.sendEvent('address-stx-balance');
|
||||
} catch (error) {
|
||||
logError(`error sending websocket stx balance update to ${principal}`, error);
|
||||
@@ -625,10 +648,16 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
|
||||
private processBlockUpdate(block: Block) {
|
||||
try {
|
||||
const subscribers = this.subscriptions.get('block')?.subscriptions.get('block');
|
||||
const manager = this.subscriptions.get('block');
|
||||
if (!manager) {
|
||||
return;
|
||||
}
|
||||
const subscribers = manager.subscriptions.get('block');
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification('block', block).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
subscribers.forEach(client =>
|
||||
this.sendWithTimeout(manager, 'block', client, rpcNotificationPayload)
|
||||
);
|
||||
this.prometheus?.sendEvent('block');
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -638,10 +667,16 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
|
||||
private processMicroblockUpdate(microblock: Microblock) {
|
||||
try {
|
||||
const subscribers = this.subscriptions.get('microblock')?.subscriptions.get('microblock');
|
||||
const manager = this.subscriptions.get('microblock');
|
||||
if (!manager) {
|
||||
return;
|
||||
}
|
||||
const subscribers = manager.subscriptions.get('microblock');
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification('microblock', microblock).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
subscribers.forEach(client =>
|
||||
this.sendWithTimeout(manager, 'microblock', client, rpcNotificationPayload)
|
||||
);
|
||||
this.prometheus?.sendEvent('microblock');
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -651,10 +686,16 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
|
||||
private processMempoolUpdate(transaction: MempoolTransaction) {
|
||||
try {
|
||||
const subscribers = this.subscriptions.get('mempool')?.subscriptions.get('mempool');
|
||||
const manager = this.subscriptions.get('mempool');
|
||||
if (!manager) {
|
||||
return;
|
||||
}
|
||||
const subscribers = manager.subscriptions.get('mempool');
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification('mempool', transaction).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
subscribers.forEach(client =>
|
||||
this.sendWithTimeout(manager, 'mempool', client, rpcNotificationPayload)
|
||||
);
|
||||
this.prometheus?.sendEvent('mempool');
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -664,10 +705,16 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
|
||||
private processNftEventUpdate(event: NftEvent) {
|
||||
try {
|
||||
const subscribers = this.subscriptions.get('nftEvent')?.subscriptions.get('nft_event');
|
||||
const manager = this.subscriptions.get('nftEvent');
|
||||
if (!manager) {
|
||||
return;
|
||||
}
|
||||
const subscribers = manager.subscriptions.get('nft_event');
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification('nft_event', event).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
subscribers.forEach(client =>
|
||||
this.sendWithTimeout(manager, 'nft_event', client, rpcNotificationPayload)
|
||||
);
|
||||
this.prometheus?.sendEvent('nft-event');
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -677,12 +724,17 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
|
||||
private processNftAssetEventUpdate(assetIdentifier: string, value: string, event: NftEvent) {
|
||||
try {
|
||||
const subscribers = this.subscriptions
|
||||
.get('nftAssetEvent')
|
||||
?.subscriptions.get(`${assetIdentifier}+${value}`);
|
||||
const manager = this.subscriptions.get('nftAssetEvent');
|
||||
if (!manager) {
|
||||
return;
|
||||
}
|
||||
const topicId = `${assetIdentifier}+${value}`;
|
||||
const subscribers = manager.subscriptions.get(topicId);
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification('nft_asset_event', event).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
subscribers.forEach(client =>
|
||||
this.sendWithTimeout(manager, topicId, client, rpcNotificationPayload)
|
||||
);
|
||||
this.prometheus?.sendEvent('nft-event');
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -695,15 +747,19 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
|
||||
private processNftCollectionEventUpdate(assetIdentifier: string, event: NftEvent) {
|
||||
try {
|
||||
const subscribers = this.subscriptions
|
||||
.get('nftCollectionEvent')
|
||||
?.subscriptions.get(assetIdentifier);
|
||||
const manager = this.subscriptions.get('nftCollectionEvent');
|
||||
if (!manager) {
|
||||
return;
|
||||
}
|
||||
const subscribers = manager.subscriptions.get(assetIdentifier);
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification(
|
||||
'nft_collection_event',
|
||||
event
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
subscribers.forEach(client =>
|
||||
this.sendWithTimeout(manager, assetIdentifier, client, rpcNotificationPayload)
|
||||
);
|
||||
this.prometheus?.sendEvent('nft-event');
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -713,4 +769,30 @@ export class WsRpcChannel extends WebSocketChannel {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private sendWithTimeout(
|
||||
manager: SubscriptionManager,
|
||||
topicId: string,
|
||||
client: WebSocket,
|
||||
payload: string
|
||||
) {
|
||||
const sendPromise = new Promise<void>((resolve, reject) =>
|
||||
client.send(payload, err => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
})
|
||||
);
|
||||
// If the payload takes more than a set number of seconds to be processed by the client,
|
||||
// it will be disconnected.
|
||||
resolveOrTimeout(sendPromise, getWsMessageTimeoutMs())
|
||||
.then(successful => {
|
||||
if (!successful) {
|
||||
manager.removeSubscription(client, topicId);
|
||||
}
|
||||
})
|
||||
.catch(_ => manager.removeSubscription(client, topicId));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import * as http from 'http';
|
||||
import PQueue from 'p-queue';
|
||||
import { AddressStxBalanceResponse, AddressTransactionWithTransfers } from 'docs/generated';
|
||||
import {
|
||||
getBlockFromDataStore,
|
||||
@@ -8,12 +9,28 @@ import {
|
||||
parseDbTx,
|
||||
} from '../../controllers/db-controller';
|
||||
import { PgStore } from '../../../datastore/pg-store';
|
||||
import { WebSocketChannel } from './web-socket-channel';
|
||||
import { ListenerType, WebSocketChannel, WebSocketPayload } from './web-socket-channel';
|
||||
import { SocketIOChannel } from './channels/socket-io-channel';
|
||||
import { WsRpcChannel } from './channels/ws-rpc-channel';
|
||||
import { parseNftEvent } from '../../../datastore/helpers';
|
||||
import { logger } from '../../../helpers';
|
||||
|
||||
export function getWsPingIntervalMs(): number {
|
||||
return parseInt(process.env['STACKS_API_WS_PING_INTERVAL'] ?? '5') * 1000;
|
||||
}
|
||||
|
||||
export function getWsPingTimeoutMs(): number {
|
||||
return parseInt(process.env['STACKS_API_WS_PING_TIMEOUT'] ?? '5') * 1000;
|
||||
}
|
||||
|
||||
export function getWsMessageTimeoutMs(): number {
|
||||
return parseInt(process.env['STACKS_API_WS_MESSAGE_TIMEOUT'] ?? '5') * 1000;
|
||||
}
|
||||
|
||||
function getWsUpdateQueueTimeoutMs(): number {
|
||||
return parseInt(process.env['STACKS_API_WS_UPDATE_QUEUE_TIMEOUT'] ?? '5') * 1000;
|
||||
}
|
||||
|
||||
/**
|
||||
* This object matches real time update `WebSocketTopics` subscriptions with internal
|
||||
* `PgStoreEventEmitter` notifications. If a match is found, the relevant data is queried from the
|
||||
@@ -23,23 +40,45 @@ export class WebSocketTransmitter {
|
||||
readonly db: PgStore;
|
||||
readonly server: http.Server;
|
||||
private channels: WebSocketChannel[] = [];
|
||||
private queue: PQueue;
|
||||
|
||||
constructor(db: PgStore, server: http.Server) {
|
||||
this.db = db;
|
||||
this.server = server;
|
||||
// This queue will send all messages through web socket channels, one at a time.
|
||||
this.queue = new PQueue({
|
||||
autoStart: true,
|
||||
concurrency: 1,
|
||||
timeout: getWsUpdateQueueTimeoutMs(),
|
||||
throwOnTimeout: true,
|
||||
});
|
||||
}
|
||||
|
||||
connect() {
|
||||
this.db.eventEmitter.addListener('blockUpdate', blockHash => this.blockUpdate(blockHash));
|
||||
this.db.eventEmitter.addListener('blockUpdate', blockHash =>
|
||||
this.queue
|
||||
.add(() => this.blockUpdate(blockHash))
|
||||
.catch(error => logger.error(`WebSocketTransmitter blockUpdate error: ${error}`))
|
||||
);
|
||||
this.db.eventEmitter.addListener('microblockUpdate', microblockHash =>
|
||||
this.microblockUpdate(microblockHash)
|
||||
this.queue
|
||||
.add(() => this.microblockUpdate(microblockHash))
|
||||
.catch(error => logger.error(`WebSocketTransmitter microblockUpdate error: ${error}`))
|
||||
);
|
||||
this.db.eventEmitter.addListener('nftEventUpdate', (txId, eventIndex) =>
|
||||
this.nftEventUpdate(txId, eventIndex)
|
||||
this.queue
|
||||
.add(() => this.nftEventUpdate(txId, eventIndex))
|
||||
.catch(error => logger.error(`WebSocketTransmitter nftEventUpdate error: ${error}`))
|
||||
);
|
||||
this.db.eventEmitter.addListener('txUpdate', txId =>
|
||||
this.queue
|
||||
.add(() => this.txUpdate(txId))
|
||||
.catch(error => logger.error(`WebSocketTransmitter txUpdate error: ${error}`))
|
||||
);
|
||||
this.db.eventEmitter.addListener('txUpdate', txId => this.txUpdate(txId));
|
||||
this.db.eventEmitter.addListener('addressUpdate', (address, blockHeight) =>
|
||||
this.addressUpdate(address, blockHeight)
|
||||
this.queue
|
||||
.add(() => this.addressUpdate(address, blockHeight))
|
||||
.catch(error => logger.error(`WebSocketTransmitter addressUpdate error: ${error}`))
|
||||
);
|
||||
|
||||
this.channels.push(new SocketIOChannel(this.server));
|
||||
@@ -48,6 +87,8 @@ export class WebSocketTransmitter {
|
||||
}
|
||||
|
||||
close(callback: (err?: Error | undefined) => void) {
|
||||
this.queue.pause();
|
||||
this.queue.clear();
|
||||
Promise.all(
|
||||
this.channels.map(
|
||||
c =>
|
||||
@@ -66,6 +107,13 @@ export class WebSocketTransmitter {
|
||||
.catch(error => callback(error));
|
||||
}
|
||||
|
||||
private send<P extends keyof WebSocketPayload>(
|
||||
payload: P,
|
||||
...args: ListenerType<WebSocketPayload[P]>
|
||||
): Promise<void[]> {
|
||||
return Promise.all(this.channels.map(c => c.send(payload, ...args)));
|
||||
}
|
||||
|
||||
private async blockUpdate(blockHash: string) {
|
||||
if (this.channels.find(c => c.hasListeners('block'))) {
|
||||
try {
|
||||
@@ -74,7 +122,7 @@ export class WebSocketTransmitter {
|
||||
db: this.db,
|
||||
});
|
||||
if (blockQuery.found) {
|
||||
this.channels.forEach(c => c.send('block', blockQuery.result));
|
||||
await this.send('block', blockQuery.result);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
@@ -90,7 +138,7 @@ export class WebSocketTransmitter {
|
||||
microblockHash: microblockHash,
|
||||
});
|
||||
if (microblockQuery.found) {
|
||||
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
|
||||
await this.send('microblock', microblockQuery.result);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
@@ -106,7 +154,7 @@ export class WebSocketTransmitter {
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (mempoolTxs.length > 0) {
|
||||
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
|
||||
await this.send('mempoolTransaction', mempoolTxs[0]);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
@@ -135,7 +183,7 @@ export class WebSocketTransmitter {
|
||||
}
|
||||
});
|
||||
if (result) {
|
||||
this.channels.forEach(c => c.send('transaction', result));
|
||||
await this.send('transaction', result);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
@@ -154,13 +202,13 @@ export class WebSocketTransmitter {
|
||||
const event = parseNftEvent(nftEvent.result);
|
||||
|
||||
if (this.channels.find(c => c.hasListeners('nftEvent'))) {
|
||||
this.channels.forEach(c => c.send('nftEvent', event));
|
||||
await this.send('nftEvent', event);
|
||||
}
|
||||
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
|
||||
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
|
||||
await this.send('nftAssetEvent', assetIdentifier, value, event);
|
||||
}
|
||||
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
|
||||
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
|
||||
await this.send('nftCollectionEvent', assetIdentifier, event);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
@@ -193,7 +241,7 @@ export class WebSocketTransmitter {
|
||||
};
|
||||
}),
|
||||
};
|
||||
this.channels.forEach(c => c.send('principalTransaction', address, result));
|
||||
await this.send('principalTransaction', address, result);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
@@ -222,7 +270,7 @@ export class WebSocketTransmitter {
|
||||
}
|
||||
return balance;
|
||||
});
|
||||
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
|
||||
await this.send('principalStxBalance', address, balance);
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
}
|
||||
|
||||
@@ -493,6 +493,58 @@ describe('socket-io', () => {
|
||||
}
|
||||
});
|
||||
|
||||
test('message timeout disconnects client', async () => {
|
||||
const address = apiServer.address;
|
||||
const socket = io(`http://${address}`, {
|
||||
reconnection: false,
|
||||
// Block message will go unanswered, triggering a disconnect.
|
||||
query: { subscriptions: `block` },
|
||||
});
|
||||
|
||||
process.env['STACKS_API_WS_MESSAGE_TIMEOUT'] = '0';
|
||||
const disconnectWaiter = waiter();
|
||||
let disconnectReason = '';
|
||||
|
||||
socket.on('disconnect', reason => {
|
||||
disconnectReason = reason;
|
||||
socket.close();
|
||||
disconnectWaiter.finish();
|
||||
});
|
||||
|
||||
socket.on('connect', async () => {
|
||||
const block = new TestBlockBuilder().addTx().build();
|
||||
await db.update(block);
|
||||
});
|
||||
|
||||
await disconnectWaiter;
|
||||
expect(disconnectReason).toBe('io server disconnect');
|
||||
});
|
||||
|
||||
test('ping timeout disconnects client', async () => {
|
||||
const address = apiServer.address;
|
||||
const socket = io(`http://${address}`, {
|
||||
reconnection: false,
|
||||
});
|
||||
|
||||
process.env['STACKS_API_WS_PING_TIMEOUT'] = '0';
|
||||
const disconnectWaiter = waiter();
|
||||
let disconnectReason = '';
|
||||
|
||||
socket.on('disconnect', reason => {
|
||||
disconnectReason = reason;
|
||||
socket.close();
|
||||
disconnectWaiter.finish();
|
||||
});
|
||||
|
||||
socket.on('connect', () => {
|
||||
// Make all pings go unanswered.
|
||||
socket.io.engine['onPacket'] = () => {};
|
||||
});
|
||||
|
||||
await disconnectWaiter;
|
||||
expect(disconnectReason).toBe('ping timeout');
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await apiServer.terminate();
|
||||
await db?.close();
|
||||
|
||||
Reference in New Issue
Block a user