mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
feat: add heartbeat to websockets
* feat: add heartbeat to websockets * chore: cancel interval on ws server close * fix: also stop heartbeat on client disconnect
This commit is contained in:
@@ -44,20 +44,32 @@ type Subscription =
|
||||
class SubscriptionManager {
|
||||
/**
|
||||
* Key = subscription topic.
|
||||
* Value = clients interested in the subscription top.
|
||||
* Value = clients interested in the subscription topic.
|
||||
*/
|
||||
subscriptions: Map<string, Set<WebSocket>> = new Map();
|
||||
|
||||
// Sockets that are responding to ping.
|
||||
liveSockets: Set<WebSocket> = new Set();
|
||||
heartbeatInterval?: NodeJS.Timeout;
|
||||
readonly heartbeatIntervalMs = 5_000;
|
||||
|
||||
addSubscription(client: WebSocket, topicId: string) {
|
||||
if (this.subscriptions.size === 0) {
|
||||
this.startHeartbeat();
|
||||
}
|
||||
let clients = this.subscriptions.get(topicId);
|
||||
if (!clients) {
|
||||
clients = new Set();
|
||||
this.subscriptions.set(topicId, clients);
|
||||
}
|
||||
clients.add(client);
|
||||
this.liveSockets.add(client);
|
||||
client.on('close', () => {
|
||||
this.removeSubscription(client, topicId);
|
||||
});
|
||||
client.on('pong', () => {
|
||||
this.liveSockets.add(client);
|
||||
});
|
||||
}
|
||||
|
||||
removeSubscription(client: WebSocket, topicId: string) {
|
||||
@@ -66,8 +78,45 @@ class SubscriptionManager {
|
||||
clients.delete(client);
|
||||
if (clients.size === 0) {
|
||||
this.subscriptions.delete(topicId);
|
||||
if (this.subscriptions.size === 0) {
|
||||
this.stopHeartbeat();
|
||||
}
|
||||
}
|
||||
}
|
||||
this.liveSockets.delete(client);
|
||||
}
|
||||
|
||||
startHeartbeat() {
|
||||
if (this.heartbeatInterval) {
|
||||
return;
|
||||
}
|
||||
this.heartbeatInterval = setInterval(() => {
|
||||
this.subscriptions.forEach((clients, topic) => {
|
||||
clients.forEach(ws => {
|
||||
// Client did not respond to a previous ping, it's dead.
|
||||
if (!this.liveSockets.has(ws)) {
|
||||
this.removeSubscription(ws, topic);
|
||||
return;
|
||||
}
|
||||
// Assume client is dead until it responds to our ping.
|
||||
this.liveSockets.delete(ws);
|
||||
ws.ping();
|
||||
});
|
||||
});
|
||||
}, this.heartbeatIntervalMs);
|
||||
}
|
||||
|
||||
stopHeartbeat() {
|
||||
if (this.heartbeatInterval) {
|
||||
clearInterval(this.heartbeatInterval);
|
||||
this.heartbeatInterval = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
close() {
|
||||
this.subscriptions.clear();
|
||||
this.liveSockets.clear();
|
||||
this.stopHeartbeat();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -463,5 +512,14 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
});
|
||||
});
|
||||
|
||||
wsServer.on('close', (_: WebSocket.Server) => {
|
||||
txUpdateSubscriptions.close();
|
||||
addressTxUpdateSubscriptions.close();
|
||||
addressBalanceUpdateSubscriptions.close();
|
||||
blockSubscriptions.close();
|
||||
microblockSubscriptions.close();
|
||||
mempoolSubscriptions.close();
|
||||
});
|
||||
|
||||
return wsServer;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user