fix: socket-io client should not disconnect with no event reply (#1800)

* fix: socket-io client should not disconnect with no event reply

* ci: disable socket-io per-message timeout test
This commit is contained in:
Matthew Little
2024-01-05 18:00:40 +01:00
committed by GitHub
parent 242cb04da9
commit d596fd5cc7
3 changed files with 27 additions and 80 deletions

View File

@@ -24,23 +24,19 @@ export type Topic =
| NftAssetEventTopic
| NftCollectionEventTopic;
// Allows timeout callbacks for messages. See
// https://socket.io/docs/v4/typescript/#emitting-with-a-timeout
type WithTimeoutAck<isSender extends boolean, args extends any[]> = isSender extends true ? [Error, ...args] : args;
export interface ClientToServerMessages {
subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void;
unsubscribe: (...topic: Topic[]) => void;
}
export interface ServerToClientMessages<isSender extends boolean = false> {
block: (block: Block, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: TransactionTopic]: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
export interface ServerToClientMessages {
block: (block: Block) => void;
microblock: (microblock: Microblock) => void;
mempool: (transaction: MempoolTransaction) => void;
'nft-event': (event: NftEvent) => void;
[key: TransactionTopic]: (transaction: Transaction | MempoolTransaction) => void;
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent) => void;
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent) => void;
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers) => void;
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse) => void;
}

View File

@@ -33,7 +33,7 @@ const component = { component: 'socket-io' };
* SocketIO channel for sending real time API updates.
*/
export class SocketIOChannel extends WebSocketChannel {
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>;
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages>;
private adapter?: Adapter;
constructor(server: http.Server) {
@@ -44,14 +44,11 @@ export class SocketIOChannel extends WebSocketChannel {
}
connect(): void {
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>(
this.server,
{
cors: { origin: '*' },
pingInterval: getWsPingIntervalMs(),
pingTimeout: getWsPingTimeoutMs(),
}
);
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(this.server, {
cors: { origin: '*' },
pingInterval: getWsPingIntervalMs(),
pingTimeout: getWsPingTimeoutMs(),
});
this.io = io;
io.on('connection', async socket => {
@@ -169,13 +166,6 @@ export class SocketIOChannel extends WebSocketChannel {
return false;
}
private async getTopicSockets(room: Topic) {
if (!this.io) {
return;
}
return await this.io.to(room).fetchSockets();
}
send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
@@ -190,52 +180,32 @@ export class SocketIOChannel extends WebSocketChannel {
case 'block': {
const [block] = args as ListenerType<WebSocketPayload['block']>;
this.prometheus?.sendEvent('block');
void this.getTopicSockets('block').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('block', block, _ => socket.disconnect(true))
)
);
this.io?.to('block').emit('block', block);
break;
}
case 'microblock': {
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
this.prometheus?.sendEvent('microblock');
void this.getTopicSockets('microblock').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('microblock', microblock, _ => socket.disconnect(true))
)
);
this.io?.to('microblock').emit('microblock', microblock);
break;
}
case 'mempoolTransaction': {
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
this.prometheus?.sendEvent('mempool');
void this.getTopicSockets('mempool').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('mempool', tx, _ => socket.disconnect(true))
)
);
this.io?.to('mempool').emit('mempool', tx);
break;
}
case 'transaction': {
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
this.prometheus?.sendEvent('transaction');
const topic: TransactionTopic = `transaction:${tx.tx_id}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit(topic, tx, _ => socket.disconnect(true))
)
);
this.io?.to(topic).emit(topic, tx);
break;
}
case 'nftEvent': {
const [event] = args as ListenerType<WebSocketPayload['nftEvent']>;
this.prometheus?.sendEvent('nft-event');
void this.getTopicSockets(`nft-event`).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('nft-event', event, _ => socket.disconnect(true))
)
);
this.io?.to('nft-event').emit('nft-event', event);
break;
}
case 'nftAssetEvent': {
@@ -244,13 +214,7 @@ export class SocketIOChannel extends WebSocketChannel {
>;
this.prometheus?.sendEvent('nft-asset-event');
const topic: NftAssetEventTopic = `nft-asset-event:${assetIdentifier}+${value}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit(topic, assetIdentifier, value, event, _ => socket.disconnect(true))
)
);
this.io?.to(topic).emit(topic, assetIdentifier, value, event);
break;
}
case 'nftCollectionEvent': {
@@ -259,35 +223,21 @@ export class SocketIOChannel extends WebSocketChannel {
>;
this.prometheus?.sendEvent('nft-collection-event');
const topic: NftCollectionEventTopic = `nft-collection-event:${assetIdentifier}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit(topic, assetIdentifier, event, _ => socket.disconnect(true))
)
);
this.io?.to(topic).emit(topic, assetIdentifier, event);
break;
}
case 'principalTransaction': {
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
const topic: AddressTransactionTopic = `address-transaction:${principal}`;
this.prometheus?.sendEvent('address-transaction');
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true));
})
);
this.io?.to(topic).emit(topic, principal, tx);
break;
}
case 'principalStxBalance': {
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
const topic: AddressStxBalanceTopic = `address-stx-balance:${principal}`;
this.prometheus?.sendEvent('address-stx-balance');
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true));
})
);
this.io?.to(topic).emit(topic, principal, balance);
break;
}
}

View File

@@ -565,7 +565,8 @@ describe('socket-io', () => {
}
});
test('message timeout disconnects client', async () => {
// Per message timeout is not enabled (we don't want to require clients to explicitly reply to events)
test.skip('message timeout disconnects client', async () => {
const address = apiServer.address;
const socket = io(`http://${address}`, {
reconnection: false,