fix: socket.io incorrect microblock and mempool updates

* fix: missing break in microblock updates

* fix: also search pruned mempool txs for socket updates

* fix: allow empty subscription array on connect
This commit is contained in:
Rafael Cárdenas
2021-10-12 10:48:13 -05:00
committed by GitHub
parent 9f721c1b18
commit 95d4108d0b
3 changed files with 11 additions and 3 deletions

View File

@@ -62,7 +62,10 @@ export class StacksApiSocketClient {
subscriptions.delete(topic);
}
// Update the subscriptions in the socket handshake so topics are persisted on re-connect.
this.socket.io.opts.query!.subscriptions = Array.from(subscriptions).join(',');
if (this.socket.io.opts.query === undefined) {
this.socket.io.opts.query = {};
}
this.socket.io.opts.query.subscriptions = Array.from(subscriptions).join(',');
return {
unsubscribe: () => {
this.handleSubscription(topic, false);

View File

@@ -87,8 +87,12 @@ export function createSocketIORouter(db: DataStore, server: http.Server) {
}
io.on('connection', socket => {
logger.info('[socket.io] new connection');
prometheus?.connect();
socket.on('disconnect', _ => prometheus?.disconnect());
socket.on('disconnect', reason => {
logger.info(`[socket.io] disconnected: ${reason}`);
prometheus?.disconnect();
});
const subscriptions = socket.handshake.query['subscriptions'];
if (subscriptions) {
// TODO: check if init topics are valid, reject connection with error if not
@@ -173,7 +177,7 @@ export function createSocketIORouter(db: DataStore, server: http.Server) {
const dbTxQuery = await db.getMempoolTx({
txId: txId,
includeUnanchored: true,
includePruned: false,
includePruned: true,
});
if (!dbTxQuery.found) {
return;

View File

@@ -608,6 +608,7 @@ export class PgDataStore
case 'microblockUpdate':
const microblock = notification.payload as PgMicroblockNotificationPayload;
this.emit('microblockUpdate', microblock.microblockHash);
break;
case 'txUpdate':
const tx = notification.payload as PgTxNotificationPayload;
this.emit('txUpdate', tx.txId);