feat: add prometheus metrics for websockets

* feat: add prometheus metrics for websockets

* chore: add comments

* feat: add histogram for subscription duration

* chore: install histogram to socket.io

* chore: install histogram to ws

* feat: add counter for connections by remote address

* chore: consider x-forwarded-for for ip addresses

* chore: comment style updates

* feat: add custom bucket sizes to histogram
This commit is contained in:
Rafael Cárdenas
2021-11-24 20:37:28 -06:00
committed by GitHub
parent 172e6a2123
commit ab9b3de70a
3 changed files with 189 additions and 69 deletions

View File

@@ -0,0 +1,142 @@
import * as prom from 'prom-client';
import * as WebSocket from 'ws';
import { Topic } from '@stacks/stacks-blockchain-api-types';
import { Socket } from 'socket.io';
export type WebSocketMetricsPrefix = 'socket_io' | 'websocket';
export type WebSocketSubscriber = Socket | WebSocket;
interface WebSocketMetrics {
// Number of active subscriptions by topic.
subscriptions: prom.Gauge<string>;
// Time spent subscribed to a particular topic.
subscriptionTimers: prom.Histogram<string>;
// Total connections.
connectTotal: prom.Counter<string>;
// Total connections by remote address.
connectRemoteAddressTotal: prom.Counter<string>;
// Total disconnections.
disconnectTotal: prom.Counter<string>;
// Total events sent by event type.
eventsSent: prom.Counter<string>;
}
/**
* Wrapper for `prom-client` that allows us to gather metrics for Socket.io and WebSocket usage.
*/
export class WebSocketPrometheus {
private metrics: WebSocketMetrics;
// Record of all dates when a particular socket started observing an event. Useful for measuring
// total subscription time.
private subscriptions = new Map<WebSocketSubscriber, Map<string, Date>>();
constructor(metricsNamePrefix: WebSocketMetricsPrefix) {
this.metrics = {
subscriptions: new prom.Gauge({
name: `${metricsNamePrefix}_subscriptions`,
help: 'Current subscriptions',
labelNames: ['topic'],
}),
subscriptionTimers: new prom.Histogram({
name: `${metricsNamePrefix}_subscription_timers`,
help: 'Subscription timers',
labelNames: ['topic'],
buckets: [1, 5, 10, 30, 60, 180, 300, 600, 1200, 1800, 3600],
}),
connectTotal: new prom.Counter({
name: `${metricsNamePrefix}_connect_total`,
help: 'Total count of connection requests',
}),
connectRemoteAddressTotal: new prom.Counter({
name: `${metricsNamePrefix}_connect_remote_address_total`,
help: 'Total count of connection requests by remote address',
labelNames: ['remoteAddress'],
}),
disconnectTotal: new prom.Counter({
name: `${metricsNamePrefix}_disconnect_total`,
help: 'Total count of disconnections',
}),
eventsSent: new prom.Counter({
name: `${metricsNamePrefix}_events_sent`,
help: 'Total count of sent events',
labelNames: ['event'],
}),
};
}
public connect(remoteAddress: string) {
this.doConnect(remoteAddress);
}
public disconnect(subscriber: WebSocketSubscriber) {
this.doDisconnect(subscriber);
}
public subscribe(subscriber: WebSocketSubscriber, topic: Topic | Topic[] | string) {
if (Array.isArray(topic)) {
topic.forEach(t => this.doSubscribe(subscriber, t));
} else {
this.doSubscribe(subscriber, topic);
}
}
public unsubscribe(subscriber: WebSocketSubscriber, topic: Topic | string) {
this.doUnsubscribe(subscriber, topic);
}
public sendEvent(event: string) {
this.metrics.eventsSent.inc({ event: event });
}
private doConnect(remoteAddress: string) {
this.metrics.connectTotal.inc();
this.metrics.connectRemoteAddressTotal.inc({
remoteAddress: remoteAddress.split(',')[0].trim(),
});
}
private doSubscribe(subscriber: WebSocketSubscriber, topic: Topic | Topic[] | string) {
const topicStr = topic.toString();
// Increase subscription count for this topic.
this.metrics.subscriptions.inc({ topic: topicStr });
// Record the subscription date.
let map = this.subscriptions.get(subscriber);
if (!map) {
map = new Map();
this.subscriptions.set(subscriber, map);
}
map.set(topicStr, new Date());
}
private doUnsubscribe(subscriber: WebSocketSubscriber, topic: Topic | string) {
const topicStr = topic.toString();
// Decrease subscription count for this topic.
this.metrics.subscriptions.dec({ topic: topicStr });
// Measure topic subscription duration.
const map = this.subscriptions.get(subscriber);
if (map) {
const startDate = map.get(topicStr);
if (startDate) {
const elapsedSeconds = (new Date().getTime() - startDate.getTime()) / 1000;
this.metrics.subscriptionTimers.observe({ topic: topicStr }, elapsedSeconds);
map.delete(topicStr);
if (map.size === 0) {
this.subscriptions.delete(subscriber);
}
}
}
}
private doDisconnect(subscriber: WebSocketSubscriber) {
this.metrics.disconnectTotal.inc();
// Unsubscribe this socket from every topic. This will also delete its subscriptions map.
const map = this.subscriptions.get(subscriber);
if (map) {
const topics = Array.from(map.keys());
topics.forEach(topic => {
this.doUnsubscribe(subscriber, topic);
});
}
}
}

View File

@@ -1,6 +1,5 @@
import { Server as SocketIOServer } from 'socket.io';
import * as http from 'http';
import * as prom from 'prom-client';
import { DataStore } from '../../../datastore/common';
import {
AddressStxBalanceResponse,
@@ -19,99 +18,46 @@ import {
parseDbTx,
} from '../../controllers/db-controller';
import { isProdEnv, logError, logger } from '../../../helpers';
interface SocketIOMetrics {
subscriptions: prom.Gauge<string>;
connectTotal: prom.Counter<string>;
disconnectTotal: prom.Counter<string>;
eventsSent: prom.Counter<string>;
}
class SocketIOPrometheus {
private metrics: SocketIOMetrics;
constructor() {
this.metrics = {
subscriptions: new prom.Gauge({
name: 'socket_io_subscriptions',
help: 'Current subscriptions',
labelNames: ['topic'],
}),
connectTotal: new prom.Counter({
name: 'socket_io_connect_total',
help: 'Total count of socket.io connection requests',
}),
disconnectTotal: new prom.Counter({
name: 'socket_io_disconnect_total',
help: 'Total count of socket.io disconnections',
}),
eventsSent: new prom.Counter({
name: 'socket_io_events_sent',
help: 'Socket.io sent events',
labelNames: ['event'],
}),
};
}
public connect() {
this.metrics.connectTotal.inc();
}
public disconnect() {
this.metrics.disconnectTotal.inc();
}
public subscribe(topic: Topic | Topic[] | string) {
if (Array.isArray(topic)) {
topic.forEach(t => this.metrics.subscriptions.inc({ topic: t.toString() }));
} else {
this.metrics.subscriptions.inc({ topic: topic.toString() });
}
}
public unsubscribe(topic: Topic | string) {
this.metrics.subscriptions.dec({ topic: topic.toString() });
}
public sendEvent(event: string) {
this.metrics.eventsSent.inc({ event: event });
}
}
import { WebSocketPrometheus } from './metrics';
export function createSocketIORouter(db: DataStore, server: http.Server) {
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(server, {
cors: { origin: '*' },
});
let prometheus: SocketIOPrometheus | null;
let prometheus: WebSocketPrometheus | null;
if (isProdEnv) {
prometheus = new SocketIOPrometheus();
prometheus = new WebSocketPrometheus('socket_io');
}
io.on('connection', socket => {
logger.info('[socket.io] new connection');
prometheus?.connect();
if (socket.handshake.headers['x-forwarded-for']) {
prometheus?.connect(socket.handshake.headers['x-forwarded-for'] as string);
} else {
prometheus?.connect(socket.handshake.address);
}
socket.on('disconnect', reason => {
logger.info(`[socket.io] disconnected: ${reason}`);
prometheus?.disconnect();
prometheus?.disconnect(socket);
});
const subscriptions = socket.handshake.query['subscriptions'];
if (subscriptions) {
// TODO: check if init topics are valid, reject connection with error if not
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
topics.forEach(topic => {
prometheus?.subscribe(topic);
prometheus?.subscribe(socket, topic);
void socket.join(topic);
});
}
socket.on('subscribe', (topic, callback) => {
prometheus?.subscribe(topic);
prometheus?.subscribe(socket, topic);
void socket.join(topic);
// TODO: check if topic is valid, and return error message if not
callback?.(null);
});
socket.on('unsubscribe', (...topics) => {
topics.forEach(topic => {
prometheus?.unsubscribe(topic);
prometheus?.unsubscribe(socket, topic);
void socket.leave(topic);
});
});
@@ -188,7 +134,7 @@ export function createSocketIORouter(db: DataStore, server: http.Server) {
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
prometheus?.sendEvent('tx');
prometheus?.sendEvent('transaction');
io.to(mempoolTopic).emit('transaction', mempoolTxs[0]);
} else {
const txQuery = await getTxFromDataStore(db, {
@@ -196,7 +142,7 @@ export function createSocketIORouter(db: DataStore, server: http.Server) {
includeUnanchored: true,
});
if (txQuery.found) {
prometheus?.sendEvent('tx');
prometheus?.sendEvent('transaction');
io.to(mempoolTopic).emit('transaction', txQuery.result);
}
}

View File

@@ -24,7 +24,7 @@ import {
} from '@stacks/stacks-blockchain-api-types';
import { DataStore, DbTx, DbMempoolTx } from '../../../datastore/common';
import { normalizeHashString, logError, isValidPrincipal, logger } from '../../../helpers';
import { normalizeHashString, logError, isValidPrincipal, isProdEnv } from '../../../helpers';
import {
getBlockFromDataStore,
getMempoolTxsFromDataStore,
@@ -32,6 +32,7 @@ import {
getTxStatusString,
getTxTypeString,
} from '../../controllers/db-controller';
import { WebSocketPrometheus } from './metrics';
type Subscription =
| RpcTxUpdateSubscriptionParams
@@ -121,6 +122,11 @@ class SubscriptionManager {
}
export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket.Server {
let prometheus: WebSocketPrometheus | null;
if (isProdEnv) {
prometheus = new WebSocketPrometheus('websocket');
}
// Use `noServer` and the `upgrade` event to prevent the ws lib from hijacking the http.Server error event
const wsPath = '/extended/v1/ws';
const wsServer = new WebSocket.Server({ noServer: true, path: wsPath });
@@ -254,8 +260,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
}
if (subscribe) {
txUpdateSubscriptions.addSubscription(client, txId);
prometheus?.subscribe(client, `transaction:${txId}`);
} else {
txUpdateSubscriptions.removeSubscription(client, txId);
prometheus?.unsubscribe(client, `transaction:${txId}`);
}
return jsonRpcSuccess(req.payload.id, { tx_id: txId });
}
@@ -273,8 +281,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
}
if (subscribe) {
addressTxUpdateSubscriptions.addSubscription(client, address);
prometheus?.subscribe(client, `address-transaction:${address}`);
} else {
addressTxUpdateSubscriptions.removeSubscription(client, address);
prometheus?.unsubscribe(client, `address-transaction:${address}`);
}
return jsonRpcSuccess(req.payload.id, { address: address });
}
@@ -291,8 +301,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
}
if (subscribe) {
addressBalanceUpdateSubscriptions.addSubscription(client, address);
prometheus?.subscribe(client, `address-stx-balance:${address}`);
} else {
addressBalanceUpdateSubscriptions.removeSubscription(client, address);
prometheus?.unsubscribe(client, `address-stx-balance:${address}`);
}
return jsonRpcSuccess(req.payload.id, { address: address });
}
@@ -305,8 +317,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
) {
if (subscribe) {
blockSubscriptions.addSubscription(client, params.event);
prometheus?.subscribe(client, 'block');
} else {
blockSubscriptions.removeSubscription(client, params.event);
prometheus?.unsubscribe(client, 'block');
}
return jsonRpcSuccess(req.payload.id, {});
}
@@ -319,8 +333,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
) {
if (subscribe) {
microblockSubscriptions.addSubscription(client, params.event);
prometheus?.subscribe(client, 'microblock');
} else {
microblockSubscriptions.removeSubscription(client, params.event);
prometheus?.unsubscribe(client, 'microblock');
}
return jsonRpcSuccess(req.payload.id, {});
}
@@ -333,8 +349,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
) {
if (subscribe) {
mempoolSubscriptions.addSubscription(client, params.event);
prometheus?.subscribe(client, 'mempool');
} else {
mempoolSubscriptions.removeSubscription(client, params.event);
prometheus?.unsubscribe(client, 'mempool');
}
return jsonRpcSuccess(req.payload.id, {});
}
@@ -369,6 +387,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
updateNotification
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('transaction');
}
} catch (error) {
logError(`error sending websocket tx update for ${txId}`, error);
@@ -399,6 +418,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
updateNotification
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('address-transaction');
});
}
} catch (error) {
@@ -427,6 +447,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
balanceNotification
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('address-stx-balance');
} catch (error) {
logError(`error sending websocket stx balance update to ${address}`, error);
}
@@ -443,6 +464,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
const block = blockQuery.result;
const rpcNotificationPayload = jsonRpcNotification('block', block).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('block');
}
}
} catch (error) {
@@ -462,6 +484,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
const microblock = microblockQuery.result;
const rpcNotificationPayload = jsonRpcNotification('microblock', microblock).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('microblock');
}
}
} catch (error) {
@@ -481,6 +504,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
const mempoolTx = mempoolTxs[0];
const rpcNotificationPayload = jsonRpcNotification('mempool', mempoolTx).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('mempool');
}
}
} catch (error) {
@@ -507,9 +531,17 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
});
wsServer.on('connection', (clientSocket, req) => {
if (req.headers['x-forwarded-for']) {
prometheus?.connect(req.headers['x-forwarded-for'] as string);
} else if (req.socket.remoteAddress) {
prometheus?.connect(req.socket.remoteAddress);
}
clientSocket.on('message', data => {
void handleClientMessage(clientSocket, data);
});
clientSocket.on('close', (_: WebSocket) => {
prometheus?.disconnect(clientSocket);
});
});
wsServer.on('close', (_: WebSocket.Server) => {