feat: add heartbeat to websockets

* feat: add heartbeat to websockets

* chore: cancel interval on ws server close

* fix: also stop heartbeat on client disconnect
This commit is contained in:
Rafael Cárdenas
2021-11-23 08:41:26 -06:00
committed by GitHub
parent 15600b0304
commit e7d8efa9f3

View File

@@ -44,20 +44,32 @@ type Subscription =
class SubscriptionManager {
/**
* Key = subscription topic.
* Value = clients interested in the subscription top.
* Value = clients interested in the subscription topic.
*/
subscriptions: Map<string, Set<WebSocket>> = new Map();
// Sockets that are responding to ping.
liveSockets: Set<WebSocket> = new Set();
heartbeatInterval?: NodeJS.Timeout;
readonly heartbeatIntervalMs = 5_000;
addSubscription(client: WebSocket, topicId: string) {
if (this.subscriptions.size === 0) {
this.startHeartbeat();
}
let clients = this.subscriptions.get(topicId);
if (!clients) {
clients = new Set();
this.subscriptions.set(topicId, clients);
}
clients.add(client);
this.liveSockets.add(client);
client.on('close', () => {
this.removeSubscription(client, topicId);
});
client.on('pong', () => {
this.liveSockets.add(client);
});
}
removeSubscription(client: WebSocket, topicId: string) {
@@ -66,8 +78,45 @@ class SubscriptionManager {
clients.delete(client);
if (clients.size === 0) {
this.subscriptions.delete(topicId);
if (this.subscriptions.size === 0) {
this.stopHeartbeat();
}
}
}
this.liveSockets.delete(client);
}
startHeartbeat() {
if (this.heartbeatInterval) {
return;
}
this.heartbeatInterval = setInterval(() => {
this.subscriptions.forEach((clients, topic) => {
clients.forEach(ws => {
// Client did not respond to a previous ping, it's dead.
if (!this.liveSockets.has(ws)) {
this.removeSubscription(ws, topic);
return;
}
// Assume client is dead until it responds to our ping.
this.liveSockets.delete(ws);
ws.ping();
});
});
}, this.heartbeatIntervalMs);
}
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = undefined;
}
}
close() {
this.subscriptions.clear();
this.liveSockets.clear();
this.stopHeartbeat();
}
}
@@ -463,5 +512,14 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
});
});
wsServer.on('close', (_: WebSocket.Server) => {
txUpdateSubscriptions.close();
addressTxUpdateSubscriptions.close();
addressBalanceUpdateSubscriptions.close();
blockSubscriptions.close();
microblockSubscriptions.close();
mempoolSubscriptions.close();
});
return wsServer;
}