fix: handle websocket messages with a priority queue (#1427)

* fix: handle websocket messages with a priority queue

* fix: set ping timeout to 5s on socket io

* fix: timeout socket.io connections on emit

* fix: add send timeouts to ws-rpc

* test: socket-io ping and emit timeout disconnects

* docs: new ws timeout env vars

* docs: update queue timeout env var

* style: remove unused export

* fix: socket message promises
This commit is contained in:
Rafael Cárdenas
2022-11-15 12:14:48 -06:00
committed by GitHub
parent cdc039cea8
commit f0cb01c054
6 changed files with 345 additions and 71 deletions

16
.env
View File

@@ -138,6 +138,22 @@ MAINNET_SEND_MANY_CONTRACT_ID=SP3FBR2AGK5H9QBDH3EEN6DF8EK8JY7RX8QJ5SVTE.send-man
# IMGIX_DOMAIN=https://<your domain>.imgix.net
# IMGIX_TOKEN=<your token>
# Web Socket ping interval to determine client availability, in seconds.
# STACKS_API_WS_PING_INTERVAL=5
# Web Socket ping timeout, in seconds. Clients will be dropped if they do not respond with a pong
# after this time has elapsed.
# STACKS_API_WS_PING_TIMEOUT=5
# Web Socket message timeout, in seconds. Clients will be dropped if they do not acknowledge a
# message after this time has elapsed.
# STACKS_API_WS_MESSAGE_TIMEOUT=5
# Web Socket update queue timeout, in seconds. When an update is scheduled (new block, tx update,
# etc.), we will allow this number of seconds to elapse to allow all subscribed clients to receive
# new data.
# STACKS_API_WS_UPDATE_QUEUE_TIMEOUT=5
# Specify max number of STX address to store in an in-memory LRU cache (CPU optimization).
# Defaults to 50,000, which should result in around 25 megabytes of additional memory usage.
# STACKS_ADDRESS_CACHE_SIZE=10000

View File

@@ -24,34 +24,38 @@ export type Topic =
| NftAssetEventTopic
| NftCollectionEventTopic;
// Allows timeout callbacks for messages. See
// https://socket.io/docs/v4/typescript/#emitting-with-a-timeout
type WithTimeoutAck<isSender extends boolean, args extends any[]> = isSender extends true ? [Error, ...args] : args;
export interface ClientToServerMessages {
subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void;
unsubscribe: (...topic: Topic[]) => void;
}
export interface ServerToClientMessages {
block: (block: Block) => void;
microblock: (microblock: Microblock) => void;
mempool: (transaction: MempoolTransaction) => void;
transaction: (transaction: Transaction | MempoolTransaction) => void;
export interface ServerToClientMessages<isSender extends boolean = false> {
block: (block: Block, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
transaction: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: 'nft-event']: (event: NftEvent) => void;
'nft-event': (event: NftEvent) => void;
[key: 'nft-event']: (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent) => void;
'nft-asset-event': (assetIdentifier: string, value: string, event: NftEvent) => void;
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-asset-event': (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent) => void;
'nft-collection-event': (assetIdentifier: string, event: NftEvent) => void;
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-collection-event': (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers) => void;
'address-transaction': (address: string, tx: AddressTransactionWithTransfers) => void;
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'address-transaction': (address: string, tx: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse) => void;
'address-stx-balance': (address: string, stxBalance: AddressStxBalanceResponse) => void;
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'address-stx-balance': (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
}

View File

@@ -17,12 +17,17 @@ import {
WebSocketPayload,
WebSocketTopics,
} from '../web-socket-channel';
import {
getWsMessageTimeoutMs,
getWsPingIntervalMs,
getWsPingTimeoutMs,
} from '../web-socket-transmitter';
/**
* SocketIO channel for sending real time API updates.
*/
export class SocketIOChannel extends WebSocketChannel {
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages>;
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>;
private adapter?: Adapter;
constructor(server: http.Server) {
@@ -33,9 +38,14 @@ export class SocketIOChannel extends WebSocketChannel {
}
connect(): void {
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(this.server, {
cors: { origin: '*' },
});
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>(
this.server,
{
cors: { origin: '*' },
pingInterval: getWsPingIntervalMs(),
pingTimeout: getWsPingTimeoutMs(),
}
);
this.io = io;
io.on('connection', async socket => {
@@ -153,6 +163,21 @@ export class SocketIOChannel extends WebSocketChannel {
return false;
}
private async getTopicSockets(room: Topic) {
if (!this.io) {
return;
}
const sockets = [];
const socketIds = await this.io.to(room).allSockets();
for (const id of socketIds) {
const socket = this.io.sockets.sockets.get(id);
if (socket) {
sockets.push(socket);
}
}
return sockets;
}
send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
@@ -160,35 +185,58 @@ export class SocketIOChannel extends WebSocketChannel {
if (!this.io) {
return;
}
// If a client takes more than this number of ms to respond to an event `emit`, it will be
// disconnected.
const timeout = getWsMessageTimeoutMs();
switch (payload) {
case 'block': {
const [block] = args as ListenerType<WebSocketPayload['block']>;
this.prometheus?.sendEvent('block');
this.io.to('block').emit('block', block);
void this.getTopicSockets('block').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('block', block, _ => socket.disconnect(true))
)
);
break;
}
case 'microblock': {
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
this.prometheus?.sendEvent('microblock');
this.io.to('microblock').emit('microblock', microblock);
void this.getTopicSockets('microblock').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('microblock', microblock, _ => socket.disconnect(true))
)
);
break;
}
case 'mempoolTransaction': {
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
this.prometheus?.sendEvent('mempool');
this.io.to('mempool').emit('mempool', tx);
void this.getTopicSockets('mempool').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('mempool', tx, _ => socket.disconnect(true))
)
);
break;
}
case 'transaction': {
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
this.prometheus?.sendEvent('transaction');
this.io.to(`transaction:${tx.tx_id}`).emit('transaction', tx);
void this.getTopicSockets(`transaction:${tx.tx_id}`).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('transaction', tx, _ => socket.disconnect(true))
)
);
break;
}
case 'nftEvent': {
const [event] = args as ListenerType<WebSocketPayload['nftEvent']>;
this.prometheus?.sendEvent('nft-event');
this.io.to('nft-event').emit('nft-event', event);
void this.getTopicSockets(`nft-event`).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('nft-event', event, _ => socket.disconnect(true))
)
);
break;
}
case 'nftAssetEvent': {
@@ -196,7 +244,13 @@ export class SocketIOChannel extends WebSocketChannel {
WebSocketPayload['nftAssetEvent']
>;
this.prometheus?.sendEvent('nft-asset-event');
this.io.to('nft-event').emit('nft-asset-event', assetIdentifier, value, event);
void this.getTopicSockets(`nft-event`).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit('nft-asset-event', assetIdentifier, value, event, _ => socket.disconnect(true))
)
);
break;
}
case 'nftCollectionEvent': {
@@ -204,23 +258,41 @@ export class SocketIOChannel extends WebSocketChannel {
WebSocketPayload['nftCollectionEvent']
>;
this.prometheus?.sendEvent('nft-collection-event');
this.io.to('nft-event').emit('nft-collection-event', assetIdentifier, event);
void this.getTopicSockets(`nft-event`).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit('nft-collection-event', assetIdentifier, event, _ => socket.disconnect(true))
)
);
break;
}
case 'principalTransaction': {
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
const topic: AddressTransactionTopic = `address-transaction:${principal}`;
this.prometheus?.sendEvent('address-transaction');
this.io.to(topic).emit('address-transaction', principal, tx);
this.io.to(topic).emit(topic, principal, tx);
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket
.timeout(timeout)
.emit('address-transaction', principal, tx, _ => socket.disconnect(true));
socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true));
})
);
break;
}
case 'principalStxBalance': {
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
const topic: AddressStxBalanceTopic = `address-stx-balance:${principal}`;
this.prometheus?.sendEvent('address-stx-balance');
this.io.to(topic).emit('address-stx-balance', principal, balance);
this.io.to(topic).emit(topic, principal, balance);
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket
.timeout(timeout)
.emit('address-stx-balance', principal, balance, _ => socket.disconnect(true));
socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true));
})
);
break;
}
}

View File

@@ -1,7 +1,13 @@
import * as http from 'http';
import * as WebSocket from 'ws';
import * as net from 'net';
import { isProdEnv, isValidPrincipal, logError, normalizeHashString } from '../../../../helpers';
import {
isProdEnv,
isValidPrincipal,
logError,
normalizeHashString,
resolveOrTimeout,
} from '../../../../helpers';
import { WebSocketPrometheus } from '../web-socket-prometheus';
import {
ListenerType,
@@ -36,6 +42,7 @@ import {
RpcNftCollectionEventSubscriptionParams,
NftEvent,
} from '@stacks/stacks-blockchain-api-types';
import { getWsMessageTimeoutMs, getWsPingIntervalMs } from '../web-socket-transmitter';
type Subscription =
| RpcTxUpdateSubscriptionParams
@@ -58,7 +65,7 @@ class SubscriptionManager {
// Sockets that are responding to ping.
liveSockets: Set<WebSocket> = new Set();
heartbeatInterval?: NodeJS.Timeout;
readonly heartbeatIntervalMs = 5_000;
readonly heartbeatIntervalMs = getWsPingIntervalMs();
addSubscription(client: WebSocket, topicId: string) {
if (this.subscriptions.size === 0) {
@@ -567,10 +574,16 @@ export class WsRpcChannel extends WebSocketChannel {
private processTxUpdate(tx: Transaction | MempoolTransaction) {
try {
const subscribers = this.subscriptions.get('transaction')?.subscriptions.get(tx.tx_id);
const manager = this.subscriptions.get('transaction');
if (!manager) {
return;
}
const subscribers = manager.subscriptions.get(tx.tx_id);
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification('tx_update', tx).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
subscribers.forEach(client =>
this.sendWithTimeout(manager, tx.tx_id, client, rpcNotificationPayload)
);
this.prometheus?.sendEvent('transaction');
}
} catch (error) {
@@ -580,9 +593,11 @@ export class WsRpcChannel extends WebSocketChannel {
private processAddressUpdate(principal: string, tx: AddressTransactionWithTransfers) {
try {
const subscribers = this.subscriptions
.get('principalTransactions')
?.subscriptions.get(principal);
const manager = this.subscriptions.get('principalTransactions');
if (!manager) {
return;
}
const subscribers = manager.subscriptions.get(principal);
if (subscribers) {
const updateNotification = {
address: principal,
@@ -595,7 +610,9 @@ export class WsRpcChannel extends WebSocketChannel {
'address_tx_update',
updateNotification
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
subscribers.forEach(client =>
this.sendWithTimeout(manager, principal, client, rpcNotificationPayload)
);
this.prometheus?.sendEvent('address-transaction');
}
} catch (error) {
@@ -604,7 +621,11 @@ export class WsRpcChannel extends WebSocketChannel {
}
private processAddressBalanceUpdate(principal: string, balance: AddressStxBalanceResponse) {
const subscribers = this.subscriptions.get('principalStxBalance')?.subscriptions.get(principal);
const manager = this.subscriptions.get('principalStxBalance');
if (!manager) {
return;
}
const subscribers = manager.subscriptions.get(principal);
if (subscribers) {
try {
const balanceNotification = {
@@ -615,7 +636,9 @@ export class WsRpcChannel extends WebSocketChannel {
'address_balance_update',
balanceNotification
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
subscribers.forEach(client =>
this.sendWithTimeout(manager, principal, client, rpcNotificationPayload)
);
this.prometheus?.sendEvent('address-stx-balance');
} catch (error) {
logError(`error sending websocket stx balance update to ${principal}`, error);
@@ -625,10 +648,16 @@ export class WsRpcChannel extends WebSocketChannel {
private processBlockUpdate(block: Block) {
try {
const subscribers = this.subscriptions.get('block')?.subscriptions.get('block');
const manager = this.subscriptions.get('block');
if (!manager) {
return;
}
const subscribers = manager.subscriptions.get('block');
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification('block', block).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
subscribers.forEach(client =>
this.sendWithTimeout(manager, 'block', client, rpcNotificationPayload)
);
this.prometheus?.sendEvent('block');
}
} catch (error) {
@@ -638,10 +667,16 @@ export class WsRpcChannel extends WebSocketChannel {
private processMicroblockUpdate(microblock: Microblock) {
try {
const subscribers = this.subscriptions.get('microblock')?.subscriptions.get('microblock');
const manager = this.subscriptions.get('microblock');
if (!manager) {
return;
}
const subscribers = manager.subscriptions.get('microblock');
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification('microblock', microblock).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
subscribers.forEach(client =>
this.sendWithTimeout(manager, 'microblock', client, rpcNotificationPayload)
);
this.prometheus?.sendEvent('microblock');
}
} catch (error) {
@@ -651,10 +686,16 @@ export class WsRpcChannel extends WebSocketChannel {
private processMempoolUpdate(transaction: MempoolTransaction) {
try {
const subscribers = this.subscriptions.get('mempool')?.subscriptions.get('mempool');
const manager = this.subscriptions.get('mempool');
if (!manager) {
return;
}
const subscribers = manager.subscriptions.get('mempool');
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification('mempool', transaction).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
subscribers.forEach(client =>
this.sendWithTimeout(manager, 'mempool', client, rpcNotificationPayload)
);
this.prometheus?.sendEvent('mempool');
}
} catch (error) {
@@ -664,10 +705,16 @@ export class WsRpcChannel extends WebSocketChannel {
private processNftEventUpdate(event: NftEvent) {
try {
const subscribers = this.subscriptions.get('nftEvent')?.subscriptions.get('nft_event');
const manager = this.subscriptions.get('nftEvent');
if (!manager) {
return;
}
const subscribers = manager.subscriptions.get('nft_event');
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification('nft_event', event).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
subscribers.forEach(client =>
this.sendWithTimeout(manager, 'nft_event', client, rpcNotificationPayload)
);
this.prometheus?.sendEvent('nft-event');
}
} catch (error) {
@@ -677,12 +724,17 @@ export class WsRpcChannel extends WebSocketChannel {
private processNftAssetEventUpdate(assetIdentifier: string, value: string, event: NftEvent) {
try {
const subscribers = this.subscriptions
.get('nftAssetEvent')
?.subscriptions.get(`${assetIdentifier}+${value}`);
const manager = this.subscriptions.get('nftAssetEvent');
if (!manager) {
return;
}
const topicId = `${assetIdentifier}+${value}`;
const subscribers = manager.subscriptions.get(topicId);
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification('nft_asset_event', event).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
subscribers.forEach(client =>
this.sendWithTimeout(manager, topicId, client, rpcNotificationPayload)
);
this.prometheus?.sendEvent('nft-event');
}
} catch (error) {
@@ -695,15 +747,19 @@ export class WsRpcChannel extends WebSocketChannel {
private processNftCollectionEventUpdate(assetIdentifier: string, event: NftEvent) {
try {
const subscribers = this.subscriptions
.get('nftCollectionEvent')
?.subscriptions.get(assetIdentifier);
const manager = this.subscriptions.get('nftCollectionEvent');
if (!manager) {
return;
}
const subscribers = manager.subscriptions.get(assetIdentifier);
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification(
'nft_collection_event',
event
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
subscribers.forEach(client =>
this.sendWithTimeout(manager, assetIdentifier, client, rpcNotificationPayload)
);
this.prometheus?.sendEvent('nft-event');
}
} catch (error) {
@@ -713,4 +769,30 @@ export class WsRpcChannel extends WebSocketChannel {
);
}
}
private sendWithTimeout(
manager: SubscriptionManager,
topicId: string,
client: WebSocket,
payload: string
) {
const sendPromise = new Promise<void>((resolve, reject) =>
client.send(payload, err => {
if (err) {
reject(err);
} else {
resolve();
}
})
);
// If the payload takes more than a set number of seconds to be processed by the client,
// it will be disconnected.
resolveOrTimeout(sendPromise, getWsMessageTimeoutMs())
.then(successful => {
if (!successful) {
manager.removeSubscription(client, topicId);
}
})
.catch(_ => manager.removeSubscription(client, topicId));
}
}

View File

@@ -1,4 +1,5 @@
import * as http from 'http';
import PQueue from 'p-queue';
import { AddressStxBalanceResponse, AddressTransactionWithTransfers } from 'docs/generated';
import {
getBlockFromDataStore,
@@ -8,12 +9,28 @@ import {
parseDbTx,
} from '../../controllers/db-controller';
import { PgStore } from '../../../datastore/pg-store';
import { WebSocketChannel } from './web-socket-channel';
import { ListenerType, WebSocketChannel, WebSocketPayload } from './web-socket-channel';
import { SocketIOChannel } from './channels/socket-io-channel';
import { WsRpcChannel } from './channels/ws-rpc-channel';
import { parseNftEvent } from '../../../datastore/helpers';
import { logger } from '../../../helpers';
export function getWsPingIntervalMs(): number {
return parseInt(process.env['STACKS_API_WS_PING_INTERVAL'] ?? '5') * 1000;
}
export function getWsPingTimeoutMs(): number {
return parseInt(process.env['STACKS_API_WS_PING_TIMEOUT'] ?? '5') * 1000;
}
export function getWsMessageTimeoutMs(): number {
return parseInt(process.env['STACKS_API_WS_MESSAGE_TIMEOUT'] ?? '5') * 1000;
}
function getWsUpdateQueueTimeoutMs(): number {
return parseInt(process.env['STACKS_API_WS_UPDATE_QUEUE_TIMEOUT'] ?? '5') * 1000;
}
/**
* This object matches real time update `WebSocketTopics` subscriptions with internal
* `PgStoreEventEmitter` notifications. If a match is found, the relevant data is queried from the
@@ -23,23 +40,45 @@ export class WebSocketTransmitter {
readonly db: PgStore;
readonly server: http.Server;
private channels: WebSocketChannel[] = [];
private queue: PQueue;
constructor(db: PgStore, server: http.Server) {
this.db = db;
this.server = server;
// This queue will send all messages through web socket channels, one at a time.
this.queue = new PQueue({
autoStart: true,
concurrency: 1,
timeout: getWsUpdateQueueTimeoutMs(),
throwOnTimeout: true,
});
}
connect() {
this.db.eventEmitter.addListener('blockUpdate', blockHash => this.blockUpdate(blockHash));
this.db.eventEmitter.addListener('blockUpdate', blockHash =>
this.queue
.add(() => this.blockUpdate(blockHash))
.catch(error => logger.error(`WebSocketTransmitter blockUpdate error: ${error}`))
);
this.db.eventEmitter.addListener('microblockUpdate', microblockHash =>
this.microblockUpdate(microblockHash)
this.queue
.add(() => this.microblockUpdate(microblockHash))
.catch(error => logger.error(`WebSocketTransmitter microblockUpdate error: ${error}`))
);
this.db.eventEmitter.addListener('nftEventUpdate', (txId, eventIndex) =>
this.nftEventUpdate(txId, eventIndex)
this.queue
.add(() => this.nftEventUpdate(txId, eventIndex))
.catch(error => logger.error(`WebSocketTransmitter nftEventUpdate error: ${error}`))
);
this.db.eventEmitter.addListener('txUpdate', txId =>
this.queue
.add(() => this.txUpdate(txId))
.catch(error => logger.error(`WebSocketTransmitter txUpdate error: ${error}`))
);
this.db.eventEmitter.addListener('txUpdate', txId => this.txUpdate(txId));
this.db.eventEmitter.addListener('addressUpdate', (address, blockHeight) =>
this.addressUpdate(address, blockHeight)
this.queue
.add(() => this.addressUpdate(address, blockHeight))
.catch(error => logger.error(`WebSocketTransmitter addressUpdate error: ${error}`))
);
this.channels.push(new SocketIOChannel(this.server));
@@ -48,6 +87,8 @@ export class WebSocketTransmitter {
}
close(callback: (err?: Error | undefined) => void) {
this.queue.pause();
this.queue.clear();
Promise.all(
this.channels.map(
c =>
@@ -66,6 +107,13 @@ export class WebSocketTransmitter {
.catch(error => callback(error));
}
private send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
): Promise<void[]> {
return Promise.all(this.channels.map(c => c.send(payload, ...args)));
}
private async blockUpdate(blockHash: string) {
if (this.channels.find(c => c.hasListeners('block'))) {
try {
@@ -74,7 +122,7 @@ export class WebSocketTransmitter {
db: this.db,
});
if (blockQuery.found) {
this.channels.forEach(c => c.send('block', blockQuery.result));
await this.send('block', blockQuery.result);
}
} catch (error) {
logger.error(error);
@@ -90,7 +138,7 @@ export class WebSocketTransmitter {
microblockHash: microblockHash,
});
if (microblockQuery.found) {
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
await this.send('microblock', microblockQuery.result);
}
} catch (error) {
logger.error(error);
@@ -106,7 +154,7 @@ export class WebSocketTransmitter {
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
await this.send('mempoolTransaction', mempoolTxs[0]);
}
} catch (error) {
logger.error(error);
@@ -135,7 +183,7 @@ export class WebSocketTransmitter {
}
});
if (result) {
this.channels.forEach(c => c.send('transaction', result));
await this.send('transaction', result);
}
} catch (error) {
logger.error(error);
@@ -154,13 +202,13 @@ export class WebSocketTransmitter {
const event = parseNftEvent(nftEvent.result);
if (this.channels.find(c => c.hasListeners('nftEvent'))) {
this.channels.forEach(c => c.send('nftEvent', event));
await this.send('nftEvent', event);
}
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
await this.send('nftAssetEvent', assetIdentifier, value, event);
}
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
await this.send('nftCollectionEvent', assetIdentifier, event);
}
} catch (error) {
logger.error(error);
@@ -193,7 +241,7 @@ export class WebSocketTransmitter {
};
}),
};
this.channels.forEach(c => c.send('principalTransaction', address, result));
await this.send('principalTransaction', address, result);
}
} catch (error) {
logger.error(error);
@@ -222,7 +270,7 @@ export class WebSocketTransmitter {
}
return balance;
});
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
await this.send('principalStxBalance', address, balance);
} catch (error) {
logger.error(error);
}

View File

@@ -493,6 +493,58 @@ describe('socket-io', () => {
}
});
test('message timeout disconnects client', async () => {
const address = apiServer.address;
const socket = io(`http://${address}`, {
reconnection: false,
// Block message will go unanswered, triggering a disconnect.
query: { subscriptions: `block` },
});
process.env['STACKS_API_WS_MESSAGE_TIMEOUT'] = '0';
const disconnectWaiter = waiter();
let disconnectReason = '';
socket.on('disconnect', reason => {
disconnectReason = reason;
socket.close();
disconnectWaiter.finish();
});
socket.on('connect', async () => {
const block = new TestBlockBuilder().addTx().build();
await db.update(block);
});
await disconnectWaiter;
expect(disconnectReason).toBe('io server disconnect');
});
test('ping timeout disconnects client', async () => {
const address = apiServer.address;
const socket = io(`http://${address}`, {
reconnection: false,
});
process.env['STACKS_API_WS_PING_TIMEOUT'] = '0';
const disconnectWaiter = waiter();
let disconnectReason = '';
socket.on('disconnect', reason => {
disconnectReason = reason;
socket.close();
disconnectWaiter.finish();
});
socket.on('connect', () => {
// Make all pings go unanswered.
socket.io.engine['onPacket'] = () => {};
});
await disconnectWaiter;
expect(disconnectReason).toBe('ping timeout');
});
afterEach(async () => {
await apiServer.terminate();
await db?.close();