mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-06-17 10:51:19 +08:00
fix: socket-io client should not disconnect with no event reply (#1800)
* fix: socket-io client should not disconnect with no event reply * ci: disable socket-io per-message timeout test
This commit is contained in:
24
docs/socket-io/index.d.ts
vendored
24
docs/socket-io/index.d.ts
vendored
@@ -24,23 +24,19 @@ export type Topic =
|
||||
| NftAssetEventTopic
|
||||
| NftCollectionEventTopic;
|
||||
|
||||
// Allows timeout callbacks for messages. See
|
||||
// https://socket.io/docs/v4/typescript/#emitting-with-a-timeout
|
||||
type WithTimeoutAck<isSender extends boolean, args extends any[]> = isSender extends true ? [Error, ...args] : args;
|
||||
|
||||
export interface ClientToServerMessages {
|
||||
subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void;
|
||||
unsubscribe: (...topic: Topic[]) => void;
|
||||
}
|
||||
|
||||
export interface ServerToClientMessages<isSender extends boolean = false> {
|
||||
block: (block: Block, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
[key: TransactionTopic]: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
|
||||
export interface ServerToClientMessages {
|
||||
block: (block: Block) => void;
|
||||
microblock: (microblock: Microblock) => void;
|
||||
mempool: (transaction: MempoolTransaction) => void;
|
||||
'nft-event': (event: NftEvent) => void;
|
||||
[key: TransactionTopic]: (transaction: Transaction | MempoolTransaction) => void;
|
||||
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent) => void;
|
||||
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent) => void;
|
||||
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers) => void;
|
||||
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse) => void;
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ const component = { component: 'socket-io' };
|
||||
* SocketIO channel for sending real time API updates.
|
||||
*/
|
||||
export class SocketIOChannel extends WebSocketChannel {
|
||||
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>;
|
||||
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages>;
|
||||
private adapter?: Adapter;
|
||||
|
||||
constructor(server: http.Server) {
|
||||
@@ -44,14 +44,11 @@ export class SocketIOChannel extends WebSocketChannel {
|
||||
}
|
||||
|
||||
connect(): void {
|
||||
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>(
|
||||
this.server,
|
||||
{
|
||||
cors: { origin: '*' },
|
||||
pingInterval: getWsPingIntervalMs(),
|
||||
pingTimeout: getWsPingTimeoutMs(),
|
||||
}
|
||||
);
|
||||
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(this.server, {
|
||||
cors: { origin: '*' },
|
||||
pingInterval: getWsPingIntervalMs(),
|
||||
pingTimeout: getWsPingTimeoutMs(),
|
||||
});
|
||||
this.io = io;
|
||||
|
||||
io.on('connection', async socket => {
|
||||
@@ -169,13 +166,6 @@ export class SocketIOChannel extends WebSocketChannel {
|
||||
return false;
|
||||
}
|
||||
|
||||
private async getTopicSockets(room: Topic) {
|
||||
if (!this.io) {
|
||||
return;
|
||||
}
|
||||
return await this.io.to(room).fetchSockets();
|
||||
}
|
||||
|
||||
send<P extends keyof WebSocketPayload>(
|
||||
payload: P,
|
||||
...args: ListenerType<WebSocketPayload[P]>
|
||||
@@ -190,52 +180,32 @@ export class SocketIOChannel extends WebSocketChannel {
|
||||
case 'block': {
|
||||
const [block] = args as ListenerType<WebSocketPayload['block']>;
|
||||
this.prometheus?.sendEvent('block');
|
||||
void this.getTopicSockets('block').then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket.timeout(timeout).emit('block', block, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
this.io?.to('block').emit('block', block);
|
||||
break;
|
||||
}
|
||||
case 'microblock': {
|
||||
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
|
||||
this.prometheus?.sendEvent('microblock');
|
||||
void this.getTopicSockets('microblock').then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket.timeout(timeout).emit('microblock', microblock, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
this.io?.to('microblock').emit('microblock', microblock);
|
||||
break;
|
||||
}
|
||||
case 'mempoolTransaction': {
|
||||
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
|
||||
this.prometheus?.sendEvent('mempool');
|
||||
void this.getTopicSockets('mempool').then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket.timeout(timeout).emit('mempool', tx, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
this.io?.to('mempool').emit('mempool', tx);
|
||||
break;
|
||||
}
|
||||
case 'transaction': {
|
||||
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
|
||||
this.prometheus?.sendEvent('transaction');
|
||||
const topic: TransactionTopic = `transaction:${tx.tx_id}`;
|
||||
void this.getTopicSockets(topic).then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket.timeout(timeout).emit(topic, tx, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
this.io?.to(topic).emit(topic, tx);
|
||||
break;
|
||||
}
|
||||
case 'nftEvent': {
|
||||
const [event] = args as ListenerType<WebSocketPayload['nftEvent']>;
|
||||
this.prometheus?.sendEvent('nft-event');
|
||||
void this.getTopicSockets(`nft-event`).then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket.timeout(timeout).emit('nft-event', event, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
this.io?.to('nft-event').emit('nft-event', event);
|
||||
break;
|
||||
}
|
||||
case 'nftAssetEvent': {
|
||||
@@ -244,13 +214,7 @@ export class SocketIOChannel extends WebSocketChannel {
|
||||
>;
|
||||
this.prometheus?.sendEvent('nft-asset-event');
|
||||
const topic: NftAssetEventTopic = `nft-asset-event:${assetIdentifier}+${value}`;
|
||||
void this.getTopicSockets(topic).then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket
|
||||
.timeout(timeout)
|
||||
.emit(topic, assetIdentifier, value, event, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
this.io?.to(topic).emit(topic, assetIdentifier, value, event);
|
||||
break;
|
||||
}
|
||||
case 'nftCollectionEvent': {
|
||||
@@ -259,35 +223,21 @@ export class SocketIOChannel extends WebSocketChannel {
|
||||
>;
|
||||
this.prometheus?.sendEvent('nft-collection-event');
|
||||
const topic: NftCollectionEventTopic = `nft-collection-event:${assetIdentifier}`;
|
||||
void this.getTopicSockets(topic).then(sockets =>
|
||||
sockets?.forEach(socket =>
|
||||
socket
|
||||
.timeout(timeout)
|
||||
.emit(topic, assetIdentifier, event, _ => socket.disconnect(true))
|
||||
)
|
||||
);
|
||||
this.io?.to(topic).emit(topic, assetIdentifier, event);
|
||||
break;
|
||||
}
|
||||
case 'principalTransaction': {
|
||||
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
|
||||
const topic: AddressTransactionTopic = `address-transaction:${principal}`;
|
||||
this.prometheus?.sendEvent('address-transaction');
|
||||
void this.getTopicSockets(topic).then(sockets =>
|
||||
sockets?.forEach(socket => {
|
||||
socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true));
|
||||
})
|
||||
);
|
||||
this.io?.to(topic).emit(topic, principal, tx);
|
||||
break;
|
||||
}
|
||||
case 'principalStxBalance': {
|
||||
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
|
||||
const topic: AddressStxBalanceTopic = `address-stx-balance:${principal}`;
|
||||
this.prometheus?.sendEvent('address-stx-balance');
|
||||
void this.getTopicSockets(topic).then(sockets =>
|
||||
sockets?.forEach(socket => {
|
||||
socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true));
|
||||
})
|
||||
);
|
||||
this.io?.to(topic).emit(topic, principal, balance);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -565,7 +565,8 @@ describe('socket-io', () => {
|
||||
}
|
||||
});
|
||||
|
||||
test('message timeout disconnects client', async () => {
|
||||
// Per message timeout is not enabled (we don't want to require clients to explicitly reply to events)
|
||||
test.skip('message timeout disconnects client', async () => {
|
||||
const address = apiServer.address;
|
||||
const socket = io(`http://${address}`, {
|
||||
reconnection: false,
|
||||
|
||||
Reference in New Issue
Block a user