From d596fd5cc7efe588983d8a902771cc38c21fee82 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Fri, 5 Jan 2024 18:00:40 +0100 Subject: [PATCH] fix: socket-io client should not disconnect with no event reply (#1800) * fix: socket-io client should not disconnect with no event reply * ci: disable socket-io per-message timeout test --- docs/socket-io/index.d.ts | 24 +++--- .../routes/ws/channels/socket-io-channel.ts | 80 ++++--------------- src/tests/socket-io-tests.ts | 3 +- 3 files changed, 27 insertions(+), 80 deletions(-) diff --git a/docs/socket-io/index.d.ts b/docs/socket-io/index.d.ts index 7fe1ea03..136f43d6 100644 --- a/docs/socket-io/index.d.ts +++ b/docs/socket-io/index.d.ts @@ -24,23 +24,19 @@ export type Topic = | NftAssetEventTopic | NftCollectionEventTopic; -// Allows timeout callbacks for messages. See -// https://socket.io/docs/v4/typescript/#emitting-with-a-timeout -type WithTimeoutAck = isSender extends true ? [Error, ...args] : args; - export interface ClientToServerMessages { subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void; unsubscribe: (...topic: Topic[]) => void; } -export interface ServerToClientMessages { - block: (block: Block, callback: (...args: WithTimeoutAck) => void) => void; - microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck) => void) => void; - mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck) => void) => void; - 'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck) => void) => void; - [key: TransactionTopic]: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck) => void) => void; - [key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck) => void) => void; - [key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck) => void) => void; - [key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck) => void) => void; - [key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck) => void) => void; +export interface ServerToClientMessages { + block: (block: Block) => void; + microblock: (microblock: Microblock) => void; + mempool: (transaction: MempoolTransaction) => void; + 'nft-event': (event: NftEvent) => void; + [key: TransactionTopic]: (transaction: Transaction | MempoolTransaction) => void; + [key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent) => void; + [key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent) => void; + [key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers) => void; + [key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse) => void; } diff --git a/src/api/routes/ws/channels/socket-io-channel.ts b/src/api/routes/ws/channels/socket-io-channel.ts index c354fca1..844b4230 100644 --- a/src/api/routes/ws/channels/socket-io-channel.ts +++ b/src/api/routes/ws/channels/socket-io-channel.ts @@ -33,7 +33,7 @@ const component = { component: 'socket-io' }; * SocketIO channel for sending real time API updates. */ export class SocketIOChannel extends WebSocketChannel { - private io?: SocketIOServer>; + private io?: SocketIOServer; private adapter?: Adapter; constructor(server: http.Server) { @@ -44,14 +44,11 @@ export class SocketIOChannel extends WebSocketChannel { } connect(): void { - const io = new SocketIOServer>( - this.server, - { - cors: { origin: '*' }, - pingInterval: getWsPingIntervalMs(), - pingTimeout: getWsPingTimeoutMs(), - } - ); + const io = new SocketIOServer(this.server, { + cors: { origin: '*' }, + pingInterval: getWsPingIntervalMs(), + pingTimeout: getWsPingTimeoutMs(), + }); this.io = io; io.on('connection', async socket => { @@ -169,13 +166,6 @@ export class SocketIOChannel extends WebSocketChannel { return false; } - private async getTopicSockets(room: Topic) { - if (!this.io) { - return; - } - return await this.io.to(room).fetchSockets(); - } - send

( payload: P, ...args: ListenerType @@ -190,52 +180,32 @@ export class SocketIOChannel extends WebSocketChannel { case 'block': { const [block] = args as ListenerType; this.prometheus?.sendEvent('block'); - void this.getTopicSockets('block').then(sockets => - sockets?.forEach(socket => - socket.timeout(timeout).emit('block', block, _ => socket.disconnect(true)) - ) - ); + this.io?.to('block').emit('block', block); break; } case 'microblock': { const [microblock] = args as ListenerType; this.prometheus?.sendEvent('microblock'); - void this.getTopicSockets('microblock').then(sockets => - sockets?.forEach(socket => - socket.timeout(timeout).emit('microblock', microblock, _ => socket.disconnect(true)) - ) - ); + this.io?.to('microblock').emit('microblock', microblock); break; } case 'mempoolTransaction': { const [tx] = args as ListenerType; this.prometheus?.sendEvent('mempool'); - void this.getTopicSockets('mempool').then(sockets => - sockets?.forEach(socket => - socket.timeout(timeout).emit('mempool', tx, _ => socket.disconnect(true)) - ) - ); + this.io?.to('mempool').emit('mempool', tx); break; } case 'transaction': { const [tx] = args as ListenerType; this.prometheus?.sendEvent('transaction'); const topic: TransactionTopic = `transaction:${tx.tx_id}`; - void this.getTopicSockets(topic).then(sockets => - sockets?.forEach(socket => - socket.timeout(timeout).emit(topic, tx, _ => socket.disconnect(true)) - ) - ); + this.io?.to(topic).emit(topic, tx); break; } case 'nftEvent': { const [event] = args as ListenerType; this.prometheus?.sendEvent('nft-event'); - void this.getTopicSockets(`nft-event`).then(sockets => - sockets?.forEach(socket => - socket.timeout(timeout).emit('nft-event', event, _ => socket.disconnect(true)) - ) - ); + this.io?.to('nft-event').emit('nft-event', event); break; } case 'nftAssetEvent': { @@ -244,13 +214,7 @@ export class SocketIOChannel extends WebSocketChannel { >; this.prometheus?.sendEvent('nft-asset-event'); const topic: NftAssetEventTopic = `nft-asset-event:${assetIdentifier}+${value}`; - void this.getTopicSockets(topic).then(sockets => - sockets?.forEach(socket => - socket - .timeout(timeout) - .emit(topic, assetIdentifier, value, event, _ => socket.disconnect(true)) - ) - ); + this.io?.to(topic).emit(topic, assetIdentifier, value, event); break; } case 'nftCollectionEvent': { @@ -259,35 +223,21 @@ export class SocketIOChannel extends WebSocketChannel { >; this.prometheus?.sendEvent('nft-collection-event'); const topic: NftCollectionEventTopic = `nft-collection-event:${assetIdentifier}`; - void this.getTopicSockets(topic).then(sockets => - sockets?.forEach(socket => - socket - .timeout(timeout) - .emit(topic, assetIdentifier, event, _ => socket.disconnect(true)) - ) - ); + this.io?.to(topic).emit(topic, assetIdentifier, event); break; } case 'principalTransaction': { const [principal, tx] = args as ListenerType; const topic: AddressTransactionTopic = `address-transaction:${principal}`; this.prometheus?.sendEvent('address-transaction'); - void this.getTopicSockets(topic).then(sockets => - sockets?.forEach(socket => { - socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true)); - }) - ); + this.io?.to(topic).emit(topic, principal, tx); break; } case 'principalStxBalance': { const [principal, balance] = args as ListenerType; const topic: AddressStxBalanceTopic = `address-stx-balance:${principal}`; this.prometheus?.sendEvent('address-stx-balance'); - void this.getTopicSockets(topic).then(sockets => - sockets?.forEach(socket => { - socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true)); - }) - ); + this.io?.to(topic).emit(topic, principal, balance); break; } } diff --git a/src/tests/socket-io-tests.ts b/src/tests/socket-io-tests.ts index 993c9c57..c019dbe6 100644 --- a/src/tests/socket-io-tests.ts +++ b/src/tests/socket-io-tests.ts @@ -565,7 +565,8 @@ describe('socket-io', () => { } }); - test('message timeout disconnects client', async () => { + // Per message timeout is not enabled (we don't want to require clients to explicitly reply to events) + test.skip('message timeout disconnects client', async () => { const address = apiServer.address; const socket = io(`http://${address}`, { reconnection: false,