mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 22:43:34 +08:00
feat: add prometheus metrics for websockets
* feat: add prometheus metrics for websockets * chore: add comments * feat: add histogram for subscription duration * chore: install histogram to socket.io * chore: install histogram to ws * feat: add counter for connections by remote address * chore: consider x-forwarded-for for ip addresses * chore: comment style updates * feat: add custom bucket sizes to histogram
This commit is contained in:
142
src/api/routes/ws/metrics.ts
Normal file
142
src/api/routes/ws/metrics.ts
Normal file
@@ -0,0 +1,142 @@
|
||||
import * as prom from 'prom-client';
|
||||
import * as WebSocket from 'ws';
|
||||
import { Topic } from '@stacks/stacks-blockchain-api-types';
|
||||
import { Socket } from 'socket.io';
|
||||
|
||||
export type WebSocketMetricsPrefix = 'socket_io' | 'websocket';
|
||||
|
||||
export type WebSocketSubscriber = Socket | WebSocket;
|
||||
|
||||
interface WebSocketMetrics {
|
||||
// Number of active subscriptions by topic.
|
||||
subscriptions: prom.Gauge<string>;
|
||||
// Time spent subscribed to a particular topic.
|
||||
subscriptionTimers: prom.Histogram<string>;
|
||||
// Total connections.
|
||||
connectTotal: prom.Counter<string>;
|
||||
// Total connections by remote address.
|
||||
connectRemoteAddressTotal: prom.Counter<string>;
|
||||
// Total disconnections.
|
||||
disconnectTotal: prom.Counter<string>;
|
||||
// Total events sent by event type.
|
||||
eventsSent: prom.Counter<string>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for `prom-client` that allows us to gather metrics for Socket.io and WebSocket usage.
|
||||
*/
|
||||
export class WebSocketPrometheus {
|
||||
private metrics: WebSocketMetrics;
|
||||
// Record of all dates when a particular socket started observing an event. Useful for measuring
|
||||
// total subscription time.
|
||||
private subscriptions = new Map<WebSocketSubscriber, Map<string, Date>>();
|
||||
|
||||
constructor(metricsNamePrefix: WebSocketMetricsPrefix) {
|
||||
this.metrics = {
|
||||
subscriptions: new prom.Gauge({
|
||||
name: `${metricsNamePrefix}_subscriptions`,
|
||||
help: 'Current subscriptions',
|
||||
labelNames: ['topic'],
|
||||
}),
|
||||
subscriptionTimers: new prom.Histogram({
|
||||
name: `${metricsNamePrefix}_subscription_timers`,
|
||||
help: 'Subscription timers',
|
||||
labelNames: ['topic'],
|
||||
buckets: [1, 5, 10, 30, 60, 180, 300, 600, 1200, 1800, 3600],
|
||||
}),
|
||||
connectTotal: new prom.Counter({
|
||||
name: `${metricsNamePrefix}_connect_total`,
|
||||
help: 'Total count of connection requests',
|
||||
}),
|
||||
connectRemoteAddressTotal: new prom.Counter({
|
||||
name: `${metricsNamePrefix}_connect_remote_address_total`,
|
||||
help: 'Total count of connection requests by remote address',
|
||||
labelNames: ['remoteAddress'],
|
||||
}),
|
||||
disconnectTotal: new prom.Counter({
|
||||
name: `${metricsNamePrefix}_disconnect_total`,
|
||||
help: 'Total count of disconnections',
|
||||
}),
|
||||
eventsSent: new prom.Counter({
|
||||
name: `${metricsNamePrefix}_events_sent`,
|
||||
help: 'Total count of sent events',
|
||||
labelNames: ['event'],
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
public connect(remoteAddress: string) {
|
||||
this.doConnect(remoteAddress);
|
||||
}
|
||||
|
||||
public disconnect(subscriber: WebSocketSubscriber) {
|
||||
this.doDisconnect(subscriber);
|
||||
}
|
||||
|
||||
public subscribe(subscriber: WebSocketSubscriber, topic: Topic | Topic[] | string) {
|
||||
if (Array.isArray(topic)) {
|
||||
topic.forEach(t => this.doSubscribe(subscriber, t));
|
||||
} else {
|
||||
this.doSubscribe(subscriber, topic);
|
||||
}
|
||||
}
|
||||
|
||||
public unsubscribe(subscriber: WebSocketSubscriber, topic: Topic | string) {
|
||||
this.doUnsubscribe(subscriber, topic);
|
||||
}
|
||||
|
||||
public sendEvent(event: string) {
|
||||
this.metrics.eventsSent.inc({ event: event });
|
||||
}
|
||||
|
||||
private doConnect(remoteAddress: string) {
|
||||
this.metrics.connectTotal.inc();
|
||||
this.metrics.connectRemoteAddressTotal.inc({
|
||||
remoteAddress: remoteAddress.split(',')[0].trim(),
|
||||
});
|
||||
}
|
||||
|
||||
private doSubscribe(subscriber: WebSocketSubscriber, topic: Topic | Topic[] | string) {
|
||||
const topicStr = topic.toString();
|
||||
// Increase subscription count for this topic.
|
||||
this.metrics.subscriptions.inc({ topic: topicStr });
|
||||
// Record the subscription date.
|
||||
let map = this.subscriptions.get(subscriber);
|
||||
if (!map) {
|
||||
map = new Map();
|
||||
this.subscriptions.set(subscriber, map);
|
||||
}
|
||||
map.set(topicStr, new Date());
|
||||
}
|
||||
|
||||
private doUnsubscribe(subscriber: WebSocketSubscriber, topic: Topic | string) {
|
||||
const topicStr = topic.toString();
|
||||
// Decrease subscription count for this topic.
|
||||
this.metrics.subscriptions.dec({ topic: topicStr });
|
||||
// Measure topic subscription duration.
|
||||
const map = this.subscriptions.get(subscriber);
|
||||
if (map) {
|
||||
const startDate = map.get(topicStr);
|
||||
if (startDate) {
|
||||
const elapsedSeconds = (new Date().getTime() - startDate.getTime()) / 1000;
|
||||
this.metrics.subscriptionTimers.observe({ topic: topicStr }, elapsedSeconds);
|
||||
map.delete(topicStr);
|
||||
if (map.size === 0) {
|
||||
this.subscriptions.delete(subscriber);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private doDisconnect(subscriber: WebSocketSubscriber) {
|
||||
this.metrics.disconnectTotal.inc();
|
||||
// Unsubscribe this socket from every topic. This will also delete its subscriptions map.
|
||||
const map = this.subscriptions.get(subscriber);
|
||||
if (map) {
|
||||
const topics = Array.from(map.keys());
|
||||
topics.forEach(topic => {
|
||||
this.doUnsubscribe(subscriber, topic);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
import { Server as SocketIOServer } from 'socket.io';
|
||||
import * as http from 'http';
|
||||
import * as prom from 'prom-client';
|
||||
import { DataStore } from '../../../datastore/common';
|
||||
import {
|
||||
AddressStxBalanceResponse,
|
||||
@@ -19,99 +18,46 @@ import {
|
||||
parseDbTx,
|
||||
} from '../../controllers/db-controller';
|
||||
import { isProdEnv, logError, logger } from '../../../helpers';
|
||||
|
||||
interface SocketIOMetrics {
|
||||
subscriptions: prom.Gauge<string>;
|
||||
connectTotal: prom.Counter<string>;
|
||||
disconnectTotal: prom.Counter<string>;
|
||||
eventsSent: prom.Counter<string>;
|
||||
}
|
||||
|
||||
class SocketIOPrometheus {
|
||||
private metrics: SocketIOMetrics;
|
||||
|
||||
constructor() {
|
||||
this.metrics = {
|
||||
subscriptions: new prom.Gauge({
|
||||
name: 'socket_io_subscriptions',
|
||||
help: 'Current subscriptions',
|
||||
labelNames: ['topic'],
|
||||
}),
|
||||
connectTotal: new prom.Counter({
|
||||
name: 'socket_io_connect_total',
|
||||
help: 'Total count of socket.io connection requests',
|
||||
}),
|
||||
disconnectTotal: new prom.Counter({
|
||||
name: 'socket_io_disconnect_total',
|
||||
help: 'Total count of socket.io disconnections',
|
||||
}),
|
||||
eventsSent: new prom.Counter({
|
||||
name: 'socket_io_events_sent',
|
||||
help: 'Socket.io sent events',
|
||||
labelNames: ['event'],
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
public connect() {
|
||||
this.metrics.connectTotal.inc();
|
||||
}
|
||||
|
||||
public disconnect() {
|
||||
this.metrics.disconnectTotal.inc();
|
||||
}
|
||||
|
||||
public subscribe(topic: Topic | Topic[] | string) {
|
||||
if (Array.isArray(topic)) {
|
||||
topic.forEach(t => this.metrics.subscriptions.inc({ topic: t.toString() }));
|
||||
} else {
|
||||
this.metrics.subscriptions.inc({ topic: topic.toString() });
|
||||
}
|
||||
}
|
||||
|
||||
public unsubscribe(topic: Topic | string) {
|
||||
this.metrics.subscriptions.dec({ topic: topic.toString() });
|
||||
}
|
||||
|
||||
public sendEvent(event: string) {
|
||||
this.metrics.eventsSent.inc({ event: event });
|
||||
}
|
||||
}
|
||||
import { WebSocketPrometheus } from './metrics';
|
||||
|
||||
export function createSocketIORouter(db: DataStore, server: http.Server) {
|
||||
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(server, {
|
||||
cors: { origin: '*' },
|
||||
});
|
||||
let prometheus: SocketIOPrometheus | null;
|
||||
let prometheus: WebSocketPrometheus | null;
|
||||
if (isProdEnv) {
|
||||
prometheus = new SocketIOPrometheus();
|
||||
prometheus = new WebSocketPrometheus('socket_io');
|
||||
}
|
||||
|
||||
io.on('connection', socket => {
|
||||
logger.info('[socket.io] new connection');
|
||||
prometheus?.connect();
|
||||
if (socket.handshake.headers['x-forwarded-for']) {
|
||||
prometheus?.connect(socket.handshake.headers['x-forwarded-for'] as string);
|
||||
} else {
|
||||
prometheus?.connect(socket.handshake.address);
|
||||
}
|
||||
socket.on('disconnect', reason => {
|
||||
logger.info(`[socket.io] disconnected: ${reason}`);
|
||||
prometheus?.disconnect();
|
||||
prometheus?.disconnect(socket);
|
||||
});
|
||||
const subscriptions = socket.handshake.query['subscriptions'];
|
||||
if (subscriptions) {
|
||||
// TODO: check if init topics are valid, reject connection with error if not
|
||||
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
|
||||
topics.forEach(topic => {
|
||||
prometheus?.subscribe(topic);
|
||||
prometheus?.subscribe(socket, topic);
|
||||
void socket.join(topic);
|
||||
});
|
||||
}
|
||||
socket.on('subscribe', (topic, callback) => {
|
||||
prometheus?.subscribe(topic);
|
||||
prometheus?.subscribe(socket, topic);
|
||||
void socket.join(topic);
|
||||
// TODO: check if topic is valid, and return error message if not
|
||||
callback?.(null);
|
||||
});
|
||||
socket.on('unsubscribe', (...topics) => {
|
||||
topics.forEach(topic => {
|
||||
prometheus?.unsubscribe(topic);
|
||||
prometheus?.unsubscribe(socket, topic);
|
||||
void socket.leave(topic);
|
||||
});
|
||||
});
|
||||
@@ -188,7 +134,7 @@ export function createSocketIORouter(db: DataStore, server: http.Server) {
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (mempoolTxs.length > 0) {
|
||||
prometheus?.sendEvent('tx');
|
||||
prometheus?.sendEvent('transaction');
|
||||
io.to(mempoolTopic).emit('transaction', mempoolTxs[0]);
|
||||
} else {
|
||||
const txQuery = await getTxFromDataStore(db, {
|
||||
@@ -196,7 +142,7 @@ export function createSocketIORouter(db: DataStore, server: http.Server) {
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (txQuery.found) {
|
||||
prometheus?.sendEvent('tx');
|
||||
prometheus?.sendEvent('transaction');
|
||||
io.to(mempoolTopic).emit('transaction', txQuery.result);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ import {
|
||||
} from '@stacks/stacks-blockchain-api-types';
|
||||
|
||||
import { DataStore, DbTx, DbMempoolTx } from '../../../datastore/common';
|
||||
import { normalizeHashString, logError, isValidPrincipal, logger } from '../../../helpers';
|
||||
import { normalizeHashString, logError, isValidPrincipal, isProdEnv } from '../../../helpers';
|
||||
import {
|
||||
getBlockFromDataStore,
|
||||
getMempoolTxsFromDataStore,
|
||||
@@ -32,6 +32,7 @@ import {
|
||||
getTxStatusString,
|
||||
getTxTypeString,
|
||||
} from '../../controllers/db-controller';
|
||||
import { WebSocketPrometheus } from './metrics';
|
||||
|
||||
type Subscription =
|
||||
| RpcTxUpdateSubscriptionParams
|
||||
@@ -121,6 +122,11 @@ class SubscriptionManager {
|
||||
}
|
||||
|
||||
export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket.Server {
|
||||
let prometheus: WebSocketPrometheus | null;
|
||||
if (isProdEnv) {
|
||||
prometheus = new WebSocketPrometheus('websocket');
|
||||
}
|
||||
|
||||
// Use `noServer` and the `upgrade` event to prevent the ws lib from hijacking the http.Server error event
|
||||
const wsPath = '/extended/v1/ws';
|
||||
const wsServer = new WebSocket.Server({ noServer: true, path: wsPath });
|
||||
@@ -254,8 +260,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
}
|
||||
if (subscribe) {
|
||||
txUpdateSubscriptions.addSubscription(client, txId);
|
||||
prometheus?.subscribe(client, `transaction:${txId}`);
|
||||
} else {
|
||||
txUpdateSubscriptions.removeSubscription(client, txId);
|
||||
prometheus?.unsubscribe(client, `transaction:${txId}`);
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, { tx_id: txId });
|
||||
}
|
||||
@@ -273,8 +281,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
}
|
||||
if (subscribe) {
|
||||
addressTxUpdateSubscriptions.addSubscription(client, address);
|
||||
prometheus?.subscribe(client, `address-transaction:${address}`);
|
||||
} else {
|
||||
addressTxUpdateSubscriptions.removeSubscription(client, address);
|
||||
prometheus?.unsubscribe(client, `address-transaction:${address}`);
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, { address: address });
|
||||
}
|
||||
@@ -291,8 +301,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
}
|
||||
if (subscribe) {
|
||||
addressBalanceUpdateSubscriptions.addSubscription(client, address);
|
||||
prometheus?.subscribe(client, `address-stx-balance:${address}`);
|
||||
} else {
|
||||
addressBalanceUpdateSubscriptions.removeSubscription(client, address);
|
||||
prometheus?.unsubscribe(client, `address-stx-balance:${address}`);
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, { address: address });
|
||||
}
|
||||
@@ -305,8 +317,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
) {
|
||||
if (subscribe) {
|
||||
blockSubscriptions.addSubscription(client, params.event);
|
||||
prometheus?.subscribe(client, 'block');
|
||||
} else {
|
||||
blockSubscriptions.removeSubscription(client, params.event);
|
||||
prometheus?.unsubscribe(client, 'block');
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, {});
|
||||
}
|
||||
@@ -319,8 +333,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
) {
|
||||
if (subscribe) {
|
||||
microblockSubscriptions.addSubscription(client, params.event);
|
||||
prometheus?.subscribe(client, 'microblock');
|
||||
} else {
|
||||
microblockSubscriptions.removeSubscription(client, params.event);
|
||||
prometheus?.unsubscribe(client, 'microblock');
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, {});
|
||||
}
|
||||
@@ -333,8 +349,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
) {
|
||||
if (subscribe) {
|
||||
mempoolSubscriptions.addSubscription(client, params.event);
|
||||
prometheus?.subscribe(client, 'mempool');
|
||||
} else {
|
||||
mempoolSubscriptions.removeSubscription(client, params.event);
|
||||
prometheus?.unsubscribe(client, 'mempool');
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, {});
|
||||
}
|
||||
@@ -369,6 +387,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
updateNotification
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('transaction');
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket tx update for ${txId}`, error);
|
||||
@@ -399,6 +418,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
updateNotification
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('address-transaction');
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -427,6 +447,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
balanceNotification
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('address-stx-balance');
|
||||
} catch (error) {
|
||||
logError(`error sending websocket stx balance update to ${address}`, error);
|
||||
}
|
||||
@@ -443,6 +464,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
const block = blockQuery.result;
|
||||
const rpcNotificationPayload = jsonRpcNotification('block', block).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('block');
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -462,6 +484,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
const microblock = microblockQuery.result;
|
||||
const rpcNotificationPayload = jsonRpcNotification('microblock', microblock).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('microblock');
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -481,6 +504,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
const mempoolTx = mempoolTxs[0];
|
||||
const rpcNotificationPayload = jsonRpcNotification('mempool', mempoolTx).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('mempool');
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -507,9 +531,17 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
|
||||
});
|
||||
|
||||
wsServer.on('connection', (clientSocket, req) => {
|
||||
if (req.headers['x-forwarded-for']) {
|
||||
prometheus?.connect(req.headers['x-forwarded-for'] as string);
|
||||
} else if (req.socket.remoteAddress) {
|
||||
prometheus?.connect(req.socket.remoteAddress);
|
||||
}
|
||||
clientSocket.on('message', data => {
|
||||
void handleClientMessage(clientSocket, data);
|
||||
});
|
||||
clientSocket.on('close', (_: WebSocket) => {
|
||||
prometheus?.disconnect(clientSocket);
|
||||
});
|
||||
});
|
||||
|
||||
wsServer.on('close', (_: WebSocket.Server) => {
|
||||
|
||||
Reference in New Issue
Block a user