fix: incorrect websocket/socket.io transaction updates (#1197)

* feat: draft listener class

* refactor: interface draft

* feat: socket.io channel

* docs: comments

* feat: ws-rpc channel

* refactor: rename metrics.ts

* fix: socket.io tests

* fix: websocket tests

* fix: unused exports

* fix: update on stale mempool tx drop

* feat: return complete objects in ws-rpc

* test: complete ws-rpc responses

* docs: update address tx example

* fix: also take client and docs pacakge.json

* test: try to fix socket flakyness

* test: expect tx status explicitly

* fix: notify gc mempool txs outside sql tx
This commit is contained in:
Rafael Cárdenas
2022-06-16 09:32:29 -05:00
committed by GitHub
parent 7329413192
commit 8ee1da840b
26 changed files with 8039 additions and 6853 deletions

View File

@@ -287,61 +287,77 @@ Sent every time a transaction is sent or received by a specific Stacks address.
Example message if subscribed to updates for an address `SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q`:
```json
{
"tx_id": "0x1f9e737dfbebcb57f0879a44518c1cc909be0ceb8ab0bc9b38ce63e3b6847917",
"nonce": 6,
"fee_rate": "277600",
"sender_address": "SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q",
"sponsored": false,
"post_condition_mode": "deny",
"post_conditions": [
{
"type": "stx",
"condition_code": "sent_less_than_or_equal_to",
"amount": "25000000",
"principal": {
"type_id": "principal_standard",
"address": "SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q"
"tx": {
"tx_id": "0x0c818b9af6356a2eb4d64ee1b2490193d97a82392c02e7264e006ae5979aa726",
"nonce": 32,
"fee_rate": "3000",
"sender_address": "SP3BK1NNSWN719Z6KDW05RBGVS940YCN6X84STYPR",
"sponsored": false,
"post_condition_mode": "deny",
"post_conditions": [
{
"type": "stx",
"condition_code": "sent_equal_to",
"amount": "4375722",
"principal": {
"type_id": "principal_contract",
"contract_name": "newyorkcitycoin-core-v1",
"address": "SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5"
}
}
],
"anchor_mode": "any",
"is_unanchored": false,
"block_hash": "0xe2a811451fed35331cf462a9107e3453fdebba1682dfad83cbbcdc603f644ed3",
"parent_block_hash": "0x6d8653da23188d4d78ab9b6448229be68abe1bca001f8c574c094289107bce15",
"block_height": 58775,
"burn_block_time": 1651720813,
"burn_block_time_iso": "2022-05-05T03:20:13.000Z",
"parent_burn_block_time": 1651720368,
"parent_burn_block_time_iso": "2022-05-05T03:12:48.000Z",
"canonical": true,
"tx_index": 36,
"tx_status": "success",
"tx_result": {
"hex": "0x0703",
"repr": "(ok true)"
},
"microblock_hash": "",
"microblock_sequence": 2147483647,
"microblock_canonical": true,
"event_count": 1,
"events": [],
"execution_cost_read_count": 15,
"execution_cost_read_length": 31147,
"execution_cost_runtime": 81975,
"execution_cost_write_count": 2,
"execution_cost_write_length": 123,
"tx_type": "contract_call",
"contract_call": {
"contract_id": "SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5.newyorkcitycoin-core-v1",
"function_name": "claim-stacking-reward",
"function_signature": "(define-public (claim-stacking-reward (targetCycle uint)))",
"function_args": [
{
"hex": "0x0100000000000000000000000000000008",
"repr": "u8",
"name": "targetCycle",
"type": "uint"
}
]
}
},
"stx_sent": "3000",
"stx_received": "4375722",
"stx_transfers": [
{
"type": "non_fungible",
"condition_code": "not_sent",
"principal": {
"type_id": "principal_contract",
"contract_name": "stacks-skaters",
"address": "SPJW1XE278YMCEYMXB8ZFGJMH8ZVAAEDP2S2PJYG"
},
"asset": {
"contract_name": "stacks-skaters",
"asset_name": "stacks-skaters",
"contract_address": "SPJW1XE278YMCEYMXB8ZFGJMH8ZVAAEDP2S2PJYG"
},
"asset_value": {
"hex": "0x0100000000000000000000000000000000",
"repr": "u0"
}
},
{
"type": "stx",
"condition_code": "sent_less_than_or_equal_to",
"amount": "20000000",
"principal": {
"type_id": "principal_contract",
"contract_name": "stacks-skaters",
"address": "SPJW1XE278YMCEYMXB8ZFGJMH8ZVAAEDP2S2PJYG"
}
"amount": "4375722",
"sender": "SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5.newyorkcitycoin-core-v1",
"recipient": "SP3BK1NNSWN719Z6KDW05RBGVS940YCN6X84STYPR"
}
],
"anchor_mode": "any",
"tx_status": "pending",
"receipt_time": 1637172946,
"receipt_time_iso": "2021-11-17T18:15:46.000Z",
"tx_type": "contract_call",
"contract_call": {
"contract_id": "SPJW1XE278YMCEYMXB8ZFGJMH8ZVAAEDP2S2PJYG.stacks-skaters",
"function_name": "mint",
"function_signature": ""
}
"ft_transfers": [],
"nft_transfers": []
}
```
Subscribe via WebSockets:

View File

@@ -2,11 +2,8 @@ import * as JsonRpcLite from 'jsonrpc-lite';
import { EventEmitter } from 'eventemitter3';
import {
RpcTxUpdateSubscriptionParams,
RpcTxUpdateNotificationParams,
RpcAddressTxSubscriptionParams,
RpcAddressTxNotificationParams,
RpcAddressBalanceSubscriptionParams,
RpcAddressBalanceNotificationParams,
RpcSubscriptionType,
Block,
RpcBlockSubscriptionParams,
@@ -14,6 +11,9 @@ import {
Transaction,
RpcMicroblockSubscriptionParams,
RpcMempoolSubscriptionParams,
MempoolTransaction,
RpcAddressBalanceNotificationParams,
RpcAddressTxNotificationParams,
} from '@stacks/stacks-blockchain-api-types';
import { BASE_PATH } from '../generated/runtime';
@@ -35,7 +35,7 @@ export class StacksApiWebSocketClient {
block: (event: Block) => void;
microblock: (event: Microblock) => void;
mempool: (event: Transaction) => void;
txUpdate: (event: RpcTxUpdateNotificationParams) => any;
txUpdate: (event: Transaction | MempoolTransaction) => any;
addressTxUpdate: (event: RpcAddressTxNotificationParams) => void;
addressBalanceUpdate: (event: RpcAddressBalanceNotificationParams) => void;
}>();
@@ -93,10 +93,11 @@ export class StacksApiWebSocketClient {
const method = data.method as RpcSubscriptionType;
switch (method) {
case 'tx_update':
this.eventEmitter.emit('txUpdate', data.params as RpcTxUpdateNotificationParams);
this.eventEmitter.emit('txUpdate', data.params as (Transaction | MempoolTransaction));
break;
case 'address_tx_update':
this.eventEmitter.emit('addressTxUpdate', data.params as RpcAddressTxNotificationParams);
this.eventEmitter.emit('addressTxUpdate',
data.params as RpcAddressTxNotificationParams);
break;
case 'address_balance_update':
this.eventEmitter.emit(
@@ -171,11 +172,11 @@ export class StacksApiWebSocketClient {
async subscribeTxUpdates(
txId: string,
update: (event: RpcTxUpdateNotificationParams) => any
update: (event: Transaction | MempoolTransaction) => any
): Promise<Subscription> {
const params: RpcTxUpdateSubscriptionParams = { event: 'tx_update', tx_id: txId };
const subscribed = await this.rpcCall<{ tx_id: string }>('subscribe', params);
const listener = (event: RpcTxUpdateNotificationParams) => {
const listener = (event: Transaction | MempoolTransaction) => {
if (event.tx_id === subscribed.tx_id) {
update(event);
}

View File

@@ -1,18 +0,0 @@
{
"title": "RpcAddressBalanceNotificationParams",
"description": "",
"type": "object",
"required": [
"address",
"balance"
],
"additionalProperties": false,
"properties": {
"address": {
"type": "string"
},
"balance": {
"type": "string"
}
}
}

View File

@@ -18,7 +18,22 @@
"enum": ["address_balance_update"]
},
"params": {
"$ref": "./rpc-address-balance-notification-params.schema.json"
"title": "RpcAddressBalanceNotificationParams",
"allOf": [
{
"type": "object",
"additionalProperties": false,
"required": ["address"],
"properties": {
"address": {
"type": "string"
}
}
},
{
"$ref": "../../api/address/get-address-stx-balance.schema.json"
}
]
}
}
}

View File

@@ -1,33 +0,0 @@
{
"title": "RpcAddressTxNotificationParams",
"description": "",
"type": "object",
"required": [
"address",
"tx_id",
"tx_type",
"tx_status"
],
"additionalProperties": false,
"properties": {
"address": {
"type": "string"
},
"tx_id": {
"type": "string"
},
"tx_type": {
"$ref": "../transactions/transaction-type.schema.json"
},
"tx_status": {
"anyOf": [
{
"$ref": "../transactions/transaction-status.schema.json"
},
{
"$ref": "../mempool-transactions/transaction-status.schema.json"
}
]
}
}
}

View File

@@ -18,7 +18,31 @@
"enum": ["address_tx_update"]
},
"params": {
"$ref": "./rpc-address-tx-notification-params.schema.json"
"title": "RpcAddressTxNotificationParams",
"allOf": [
{
"type": "object",
"additionalProperties": false,
"required": ["address", "tx_id", "tx_type", "tx_status"],
"properties": {
"address": {
"type": "string"
},
"tx_id": {
"type": "string"
},
"tx_type": {
"$ref": "../transactions/transaction-type.schema.json"
},
"tx_status": {
"$ref": "../transactions/transaction-status.schema.json"
}
}
},
{
"$ref": "../../entities/address/transaction-with-transfers.schema.json"
}
]
}
}
}

View File

@@ -1,29 +0,0 @@
{
"title": "RpcTxUpdateNotificationParams",
"description": "",
"type": "object",
"required": [
"tx_id",
"tx_type",
"tx_status"
],
"additionalProperties": false,
"properties": {
"tx_id": {
"type": "string"
},
"tx_type": {
"$ref": "../transactions/transaction-type.schema.json"
},
"tx_status": {
"anyOf": [
{
"$ref": "../transactions/transaction-status.schema.json"
},
{
"$ref": "../mempool-transactions/transaction-status.schema.json"
}
]
}
}
}

View File

@@ -18,7 +18,14 @@
"enum": ["tx_update"]
},
"params": {
"$ref": "./rpc-tx-update-notification-params.schema.json"
"anyOf": [
{
"$ref": "../transactions/transaction.schema.json"
},
{
"$ref": "../mempool-transactions/transaction.schema.json"
}
]
}
}
}

29
docs/generated.d.ts vendored
View File

@@ -214,11 +214,9 @@ export type SchemaMergeRootStub =
| TransactionType
| Transaction
| InboundStxTransfer
| RpcAddressBalanceNotificationParams
| RpcAddressBalanceNotificationResponse
| RpcAddressBalanceSubscriptionParams
| RpcAddressBalanceSubscriptionRequest
| RpcAddressTxNotificationParams
| RpcAddressTxNotificationResponse
| RpcAddressTxSubscriptionParams
| RpcAddressTxSubscriptionRequest
@@ -232,7 +230,6 @@ export type SchemaMergeRootStub =
| RpcMicroblockSubscriptionParams
| RpcMicroblockSubscriptionRequest
| RpcSubscriptionType
| RpcTxUpdateNotificationParams
| RpcTxUpdateNotificationResponse
| RpcTxUpdateSubscriptionParams
| RpcTxUpdateSubscriptionRequest;
@@ -703,6 +700,15 @@ export type TransactionMetadata =
* String literal of all Stacks 2.0 transaction types
*/
export type TransactionType = "token_transfer" | "smart_contract" | "contract_call" | "poison_microblock" | "coinbase";
export type RpcAddressBalanceNotificationParams = {
address: string;
} & AddressStxBalanceResponse;
export type RpcAddressTxNotificationParams = {
address: string;
tx_id: string;
tx_type: TransactionType;
tx_status: TransactionStatus;
} & AddressTransactionWithTransfers;
export type RpcSubscriptionType =
| "tx_update"
| "address_tx_update"
@@ -3233,10 +3239,6 @@ export interface TransactionNotFound {
tx_id: string;
};
}
export interface RpcAddressBalanceNotificationParams {
address: string;
balance: string;
}
export interface RpcAddressBalanceNotificationResponse {
jsonrpc: "2.0";
method: "address_balance_update";
@@ -3252,12 +3254,6 @@ export interface RpcAddressBalanceSubscriptionRequest {
method: "address_balance_update";
params: RpcAddressBalanceSubscriptionParams;
}
export interface RpcAddressTxNotificationParams {
address: string;
tx_id: string;
tx_type: TransactionType;
tx_status: TransactionStatus | MempoolTransactionStatus;
}
export interface RpcAddressTxNotificationResponse {
jsonrpc: "2.0";
method: "address_tx_update";
@@ -3315,15 +3311,10 @@ export interface RpcMicroblockSubscriptionRequest {
method: "microblock";
params: RpcMicroblockSubscriptionParams;
}
export interface RpcTxUpdateNotificationParams {
tx_id: string;
tx_type: TransactionType;
tx_status: TransactionStatus | MempoolTransactionStatus;
}
export interface RpcTxUpdateNotificationResponse {
jsonrpc: "2.0";
method: "tx_update";
params: RpcTxUpdateNotificationParams;
params: Transaction | MempoolTransaction;
}
export interface RpcTxUpdateSubscriptionParams {
event: "tx_update";

12411
docs/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -17,18 +17,14 @@ import {
BaseTransaction,
Block,
CoinbaseTransactionMetadata,
ContractCallTransaction,
ContractCallTransactionMetadata,
MempoolContractCallTransaction,
MempoolTransaction,
MempoolTransactionStatus,
Microblock,
PoisonMicroblockTransactionMetadata,
PostCondition,
RosettaBlock,
RosettaParentBlockIdentifier,
RosettaTransaction,
SmartContractTransaction,
SmartContractTransactionMetadata,
TokenTransferTransactionMetadata,
Transaction,
@@ -58,24 +54,14 @@ import {
DbTx,
DbTxStatus,
DbTxTypeId,
DbSmartContract,
DbSearchResultWithMetadata,
BaseTx,
DbMinerReward,
StxUnlockEvent,
} from '../../datastore/common';
import {
unwrapOptional,
bufferToHexPrefixString,
ElementType,
FoundOrNot,
hexToBuffer,
logger,
unixEpochToIso,
EMPTY_HASH_256,
} from '../../helpers';
import { unwrapOptional, FoundOrNot, logger, unixEpochToIso, EMPTY_HASH_256 } from '../../helpers';
import { serializePostCondition, serializePostConditionMode } from '../serializers/post-conditions';
import { getOperations, parseTransactionMemo, processUnlockingEvents } from '../../rosetta-helpers';
import { getOperations, parseTransactionMemo } from '../../rosetta-helpers';
import { PgStore } from '../../datastore/pg-store';
export function parseTxTypeStrings(values: string[]): TransactionType[] {
@@ -140,9 +126,7 @@ export function getTxTypeId(typeString: Transaction['tx_type']): DbTxTypeId {
}
}
export function getTxStatusString(
txStatus: DbTxStatus
): TransactionStatus | MempoolTransactionStatus {
function getTxStatusString(txStatus: DbTxStatus): TransactionStatus | MempoolTransactionStatus {
switch (txStatus) {
case DbTxStatus.Pending:
return 'pending';
@@ -159,6 +143,7 @@ export function getTxStatusString(
case DbTxStatus.DroppedTooExpensive:
return 'dropped_too_expensive';
case DbTxStatus.DroppedStaleGarbageCollect:
case DbTxStatus.DroppedApiGarbageCollect:
return 'dropped_stale_garbage_collect';
default:
throw new Error(`Unexpected DbTxStatus: ${txStatus}`);

View File

@@ -25,8 +25,6 @@ import { createRosettaAccountRouter } from './routes/rosetta/account';
import { createRosettaConstructionRouter } from './routes/rosetta/construction';
import { apiDocumentationUrl, isProdEnv, logError, logger, LogLevel, waiter } from '../helpers';
import { InvalidRequestError } from '../errors';
import { createWsRpcRouter } from './routes/ws/ws-rpc';
import { createSocketIORouter } from './routes/ws/socket-io';
import { createBurnchainRouter } from './routes/burnchain';
import { createBnsNamespacesRouter } from './routes/bns/namespaces';
import { createBnsPriceRouter } from './routes/bns/pricing';
@@ -48,12 +46,12 @@ import * as path from 'path';
import * as fs from 'fs';
import { PgStore } from '../datastore/pg-store';
import { PgWriteStore } from '../datastore/pg-write-store';
import { WebSocketTransmitter } from './routes/ws/web-socket-transmitter';
export interface ApiServer {
expressApp: express.Express;
server: Server;
wss: WebSocket.Server;
io: SocketIO.Server;
ws: WebSocketTransmitter;
address: string;
datastore: PgStore;
terminate: () => Promise<void>;
@@ -347,11 +345,8 @@ export async function startApiServer(opts: {
});
});
// Setup socket.io server
const io = createSocketIORouter(datastore, server);
// Setup websockets RPC endpoint
const wss = createWsRpcRouter(datastore, server);
const ws = new WebSocketTransmitter(datastore, server);
ws.connect();
await new Promise<void>((resolve, reject) => {
try {
@@ -368,25 +363,13 @@ export async function startApiServer(opts: {
const terminate = async () => {
await new Promise<void>((resolve, reject) => {
logger.info('Closing Socket.io server...');
io.close(error => {
logger.info('Closing WebSocket channels...');
ws.close(error => {
if (error) {
logError('Failed to gracefully close Socket.io server', error);
logError('Failed to gracefully close WebSocket channels', error);
reject(error);
} else {
logger.info('API socket.io server closed.');
resolve();
}
});
});
await new Promise<void>((resolve, reject) => {
logger.info('Closing WebSocket server...');
wss.close(error => {
if (error) {
logError('Failed to gracefully close WebSocket server.');
reject(error);
} else {
logger.info('WebSocket server closed.');
logger.info('API WebSocket channels closed.');
resolve();
}
});
@@ -405,14 +388,13 @@ export async function startApiServer(opts: {
const forceKill = async () => {
logger.info('Force closing API server...');
const [ioClosePromise, wssClosePromise, serverClosePromise] = [waiter(), waiter(), waiter()];
io.close(() => ioClosePromise.finish());
wss.close(() => wssClosePromise.finish());
const [wsClosePromise, serverClosePromise] = [waiter(), waiter()];
ws.close(() => wsClosePromise.finish());
server.close(() => serverClosePromise.finish());
for (const socket of serverSockets) {
socket.destroy();
}
await Promise.allSettled([ioClosePromise, wssClosePromise, serverClosePromise]);
await Promise.allSettled([wsClosePromise, serverClosePromise]);
};
const addr = server.address();
@@ -423,8 +405,7 @@ export async function startApiServer(opts: {
return {
expressApp: app,
server: server,
wss: wss,
io: io,
ws: ws,
address: addrStr,
datastore: datastore,
terminate: terminate,

View File

@@ -0,0 +1,229 @@
import {
AddressStxBalanceTopic,
AddressTransactionTopic,
ClientToServerMessages,
ServerToClientMessages,
Topic,
} from 'docs/socket-io';
import * as http from 'http';
import { Server as SocketIOServer } from 'socket.io';
import { Adapter } from 'socket.io-adapter';
import { isValidTxId } from '../../../../api/query-helpers';
import { isProdEnv, isValidPrincipal, logger } from '../../../../helpers';
import { WebSocketPrometheus } from '../web-socket-prometheus';
import {
ListenerType,
WebSocketChannel,
WebSocketPayload,
WebSocketTopics,
} from '../web-socket-channel';
/**
* SocketIO channel for sending real time API updates.
*/
export class SocketIOChannel extends WebSocketChannel {
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages>;
private adapter?: Adapter;
constructor(server: http.Server) {
super(server);
if (isProdEnv) {
this.prometheus = new WebSocketPrometheus('socket_io');
}
}
connect(): void {
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(this.server, {
cors: { origin: '*' },
});
this.io = io;
io.on('connection', async socket => {
logger.info(`[socket.io] new connection: ${socket.id}`);
if (socket.handshake.headers['x-forwarded-for']) {
this.prometheus?.connect(socket.handshake.headers['x-forwarded-for'] as string);
} else {
this.prometheus?.connect(socket.handshake.address);
}
const subscriptions = socket.handshake.query['subscriptions'];
if (subscriptions) {
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
for (const topic of topics) {
this.prometheus?.subscribe(socket, topic);
await socket.join(topic);
}
}
socket.on('disconnect', reason => {
logger.info(`[socket.io] disconnected ${socket.id}: ${reason}`);
this.prometheus?.disconnect(socket);
});
socket.on('subscribe', async (topic, callback) => {
if (!this.getInvalidSubscriptionTopics(topic)) {
this.prometheus?.subscribe(socket, topic);
await socket.join(topic);
callback?.(null);
}
});
socket.on('unsubscribe', async (...topics) => {
for (const topic of topics) {
this.prometheus?.unsubscribe(socket, topic);
await socket.leave(topic);
}
});
});
// Validate topic subscriptions upon connection.
io.use((socket, next) => {
const subscriptions = socket.handshake.query['subscriptions'];
if (subscriptions) {
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
const invalidSubs = this.getInvalidSubscriptionTopics(topics as Topic[]);
if (invalidSubs) {
const error = new Error(`Invalid topic: ${invalidSubs.join(', ')}`);
next(error);
} else {
next();
}
} else {
next();
}
});
const adapter = io.of('/').adapter;
adapter.on('create-room', room => {
logger.info(`[socket.io] room created: ${room}`);
});
adapter.on('delete-room', room => {
logger.info(`[socket.io] room deleted: ${room}`);
});
adapter.on('join-room', (room, id) => {
logger.info(`[socket.io] socket ${id} joined room: ${room}`);
});
adapter.on('leave-room', (room, id) => {
logger.info(`[socket.io] socket ${id} left room: ${room}`);
});
this.adapter = adapter;
}
close(callback?: (err?: Error) => void): void {
if (!this.io && callback) {
callback();
}
this.io?.close(callback);
this.io = undefined;
}
hasListeners<P extends keyof WebSocketTopics>(
topic: P,
...args: ListenerType<WebSocketTopics[P]>
): boolean {
if (!this.adapter) {
return false;
}
switch (topic) {
case 'block':
return this.adapter.rooms.has('block');
case 'microblock':
return this.adapter.rooms.has('microblock');
case 'mempool':
return this.adapter.rooms.has('mempool');
case 'transaction': {
const [txId] = args as ListenerType<WebSocketTopics['transaction']>;
return this.adapter.rooms.has(`transaction:${txId}`);
}
case 'principalTransactions': {
const [principal] = args as ListenerType<WebSocketTopics['principalTransactions']>;
return this.adapter.rooms.has(`address-transaction:${principal}`);
}
case 'principalStxBalance': {
const [principal] = args as ListenerType<WebSocketTopics['principalStxBalance']>;
return this.adapter.rooms.has(`address-stx-balance:${principal}`);
}
}
return false;
}
send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
): void {
if (!this.io) {
return;
}
switch (payload) {
case 'block': {
const [block] = args as ListenerType<WebSocketPayload['block']>;
this.prometheus?.sendEvent('block');
this.io.to('block').emit('block', block);
break;
}
case 'microblock': {
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
this.prometheus?.sendEvent('microblock');
this.io.to('microblock').emit('microblock', microblock);
break;
}
case 'mempoolTransaction': {
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
this.prometheus?.sendEvent('mempool');
this.io.to('mempool').emit('mempool', tx);
break;
}
case 'transaction': {
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
this.prometheus?.sendEvent('transaction');
this.io.to(`transaction:${tx.tx_id}`).emit('transaction', tx);
break;
}
case 'principalTransaction': {
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
const topic: AddressTransactionTopic = `address-transaction:${principal}`;
this.prometheus?.sendEvent('address-transaction');
this.io.to(topic).emit('address-transaction', principal, tx);
this.io.to(topic).emit(topic, principal, tx);
break;
}
case 'principalStxBalance': {
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
const topic: AddressStxBalanceTopic = `address-stx-balance:${principal}`;
this.prometheus?.sendEvent('address-stx-balance');
this.io.to(topic).emit('address-stx-balance', principal, balance);
this.io.to(topic).emit(topic, principal, balance);
break;
}
}
}
private getInvalidSubscriptionTopics(subscriptions: Topic | Topic[]): undefined | string[] {
const isSubValid = (sub: Topic): undefined | string => {
if (sub.includes(':')) {
const txOrAddr = sub.split(':')[0];
const value = sub.split(':')[1];
switch (txOrAddr) {
case 'address-transaction':
case 'address-stx-balance':
return isValidPrincipal(value) ? undefined : sub;
case 'transaction':
return isValidTxId(value) ? undefined : sub;
default:
return sub;
}
}
switch (sub) {
case 'block':
case 'mempool':
case 'microblock':
return undefined;
default:
return sub;
}
};
if (!Array.isArray(subscriptions)) {
const invalidSub = isSubValid(subscriptions);
return invalidSub ? [invalidSub] : undefined;
}
const validatedSubs = subscriptions.map(isSubValid);
const invalidSubs = validatedSubs.filter(validSub => typeof validSub === 'string');
return invalidSubs.length === 0 ? undefined : (invalidSubs as string[]);
}
}

View File

@@ -0,0 +1,564 @@
import * as http from 'http';
import * as WebSocket from 'ws';
import * as net from 'net';
import { isProdEnv, isValidPrincipal, logError, normalizeHashString } from '../../../../helpers';
import { WebSocketPrometheus } from '../web-socket-prometheus';
import {
ListenerType,
WebSocketChannel,
WebSocketPayload,
WebSocketTopics,
} from '../web-socket-channel';
import {
JsonRpcError,
JsonRpc,
IParsedObjectRequest,
parse as parseRpcString,
error as jsonRpcError,
notification as jsonRpcNotification,
success as jsonRpcSuccess,
} from 'jsonrpc-lite';
import {
RpcTxUpdateSubscriptionParams,
RpcAddressTxSubscriptionParams,
RpcAddressBalanceSubscriptionParams,
RpcBlockSubscriptionParams,
RpcMicroblockSubscriptionParams,
RpcMempoolSubscriptionParams,
Block,
Microblock,
MempoolTransaction,
Transaction,
AddressTransactionWithTransfers,
AddressStxBalanceResponse,
} from '@stacks/stacks-blockchain-api-types';
type Subscription =
| RpcTxUpdateSubscriptionParams
| RpcAddressTxSubscriptionParams
| RpcAddressBalanceSubscriptionParams
| RpcBlockSubscriptionParams
| RpcMicroblockSubscriptionParams
| RpcMempoolSubscriptionParams;
class SubscriptionManager {
/**
* Key = subscription topic.
* Value = clients interested in the subscription topic.
*/
subscriptions: Map<string, Set<WebSocket>> = new Map();
// Sockets that are responding to ping.
liveSockets: Set<WebSocket> = new Set();
heartbeatInterval?: NodeJS.Timeout;
readonly heartbeatIntervalMs = 5_000;
addSubscription(client: WebSocket, topicId: string) {
if (this.subscriptions.size === 0) {
this.startHeartbeat();
}
let clients = this.subscriptions.get(topicId);
if (!clients) {
clients = new Set();
this.subscriptions.set(topicId, clients);
}
clients.add(client);
this.liveSockets.add(client);
client.on('close', () => {
this.removeSubscription(client, topicId);
});
client.on('pong', () => {
this.liveSockets.add(client);
});
}
removeSubscription(client: WebSocket, topicId: string) {
const clients = this.subscriptions.get(topicId);
if (clients) {
clients.delete(client);
if (clients.size === 0) {
this.subscriptions.delete(topicId);
if (this.subscriptions.size === 0) {
this.stopHeartbeat();
}
}
}
this.liveSockets.delete(client);
}
startHeartbeat() {
if (this.heartbeatInterval) {
return;
}
this.heartbeatInterval = setInterval(() => {
this.subscriptions.forEach((clients, topic) => {
clients.forEach(ws => {
// Client did not respond to a previous ping, it's dead.
if (!this.liveSockets.has(ws)) {
this.removeSubscription(ws, topic);
return;
}
// Assume client is dead until it responds to our ping.
this.liveSockets.delete(ws);
ws.ping();
});
});
}, this.heartbeatIntervalMs);
}
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = undefined;
}
}
close() {
this.subscriptions.clear();
this.liveSockets.clear();
this.stopHeartbeat();
}
}
/**
* WebSocket RPC channel for sending real time API updates.
*/
export class WsRpcChannel extends WebSocketChannel {
private subscriptions = new Map<keyof WebSocketTopics, SubscriptionManager>();
private wsServer?: WebSocket.Server;
constructor(server: http.Server) {
super(server);
if (isProdEnv) {
this.prometheus = new WebSocketPrometheus('websocket');
}
}
connect(): void {
// Use `noServer` and the `upgrade` event to prevent the ws lib from hijacking the http.Server error event
const wsPath = '/extended/v1/ws';
const wsServer = new WebSocket.Server({ noServer: true, path: wsPath });
this.server.on('upgrade', (request: http.IncomingMessage, socket, head) => {
if (request.url?.startsWith(wsPath)) {
wsServer.handleUpgrade(request, socket as net.Socket, head, ws => {
wsServer.emit('connection', ws, request);
});
}
});
this.subscriptions.set('block', new SubscriptionManager());
this.subscriptions.set('microblock', new SubscriptionManager());
this.subscriptions.set('mempool', new SubscriptionManager());
this.subscriptions.set('transaction', new SubscriptionManager());
this.subscriptions.set('principalTransactions', new SubscriptionManager());
this.subscriptions.set('principalStxBalance', new SubscriptionManager());
wsServer.on('connection', (clientSocket, req) => {
if (req.headers['x-forwarded-for']) {
this.prometheus?.connect(req.headers['x-forwarded-for'] as string);
} else if (req.socket.remoteAddress) {
this.prometheus?.connect(req.socket.remoteAddress);
}
clientSocket.on('message', data => {
this.handleClientMessage(clientSocket, data);
});
clientSocket.on('close', (_: WebSocket) => {
this.prometheus?.disconnect(clientSocket);
});
});
wsServer.on('close', (_: WebSocket.Server) => {
this.subscriptions.forEach(manager => manager.close());
});
this.wsServer = wsServer;
}
close(callback?: (err?: Error) => void): void {
if (!this.wsServer && callback) {
callback();
}
this.wsServer?.close(callback);
this.wsServer = undefined;
}
hasListeners<P extends keyof WebSocketTopics>(
topic: P,
...args: ListenerType<WebSocketTopics[P]>
): boolean {
const manager = this.subscriptions.get(topic);
if (!this.wsServer || !manager) {
return false;
}
switch (topic) {
case 'block':
return manager.subscriptions.get('block') !== undefined;
case 'microblock':
return manager.subscriptions.get('microblock') !== undefined;
case 'mempool':
return manager.subscriptions.get('mempool') !== undefined;
case 'transaction': {
const [txId] = args as ListenerType<WebSocketTopics['transaction']>;
return manager.subscriptions.get(txId) !== undefined;
}
case 'principalTransactions': {
const [principal] = args as ListenerType<WebSocketTopics['principalTransactions']>;
return manager.subscriptions.get(principal) !== undefined;
}
case 'principalStxBalance': {
const [principal] = args as ListenerType<WebSocketTopics['principalStxBalance']>;
return manager.subscriptions.get(principal) !== undefined;
}
}
return false;
}
send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
): void {
if (!this.wsServer) {
return;
}
switch (payload) {
case 'block': {
const [block] = args as ListenerType<WebSocketPayload['block']>;
this.processBlockUpdate(block);
break;
}
case 'microblock': {
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
this.processMicroblockUpdate(microblock);
break;
}
case 'mempoolTransaction': {
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
this.processMempoolUpdate(tx);
break;
}
case 'transaction': {
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
this.processTxUpdate(tx);
break;
}
case 'principalTransaction': {
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
this.processAddressUpdate(principal, tx);
break;
}
case 'principalStxBalance': {
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
this.processAddressBalanceUpdate(principal, balance);
break;
}
}
}
private handleClientMessage(client: WebSocket, data: WebSocket.Data) {
try {
if (typeof data !== 'string') {
throw JsonRpcError.parseError(`unexpected data type: ${data.constructor.name}`);
}
const parsedRpcReq = parseRpcString(data);
const isBatchRequest = Array.isArray(parsedRpcReq);
let rpcReqs = Array.isArray(parsedRpcReq) ? parsedRpcReq : [parsedRpcReq];
// Ignore client notifications, spec dictates server should never respond to these.
rpcReqs = rpcReqs.filter(req => req.type !== 'notification');
const responses: JsonRpc[] = rpcReqs.map(rpcReq => {
switch (rpcReq.type) {
case 'request':
return this.handleClientRpcReq(client, rpcReq);
case 'error':
return jsonRpcError(
rpcReq.payload.id,
JsonRpcError.invalidRequest('unexpected error msg from client')
);
case 'success':
return jsonRpcError(
rpcReq.payload.id,
JsonRpcError.invalidRequest('unexpected success msg from client')
);
case 'invalid':
return jsonRpcError(null as any, rpcReq.payload);
default:
return jsonRpcError(
null as any,
JsonRpcError.invalidRequest('unexpected msg type from client')
);
}
});
if (isBatchRequest) {
client.send(JSON.stringify(responses));
} else if (responses.length === 1) {
client.send(responses[0].serialize());
}
} catch (err: any) {
// Response `id` is null for invalid JSON requests (or other errors where the request ID isn't known).
try {
const res = err instanceof JsonRpcError ? err : JsonRpcError.internalError(err.toString());
this.sendRpcResponse(client, jsonRpcError(null as any, res));
} catch (error) {
// ignore any errors here
}
}
}
private sendRpcResponse(client: WebSocket, res: JsonRpc) {
client.send(res.serialize());
}
/** Route supported RPC methods */
private handleClientRpcReq(client: WebSocket, req: IParsedObjectRequest): JsonRpc {
switch (req.payload.method) {
case 'subscribe':
return this.handleClientSubscription(client, req, true);
case 'unsubscribe':
return this.handleClientSubscription(client, req, false);
default:
return jsonRpcError(req.payload.id, JsonRpcError.methodNotFound(null));
}
}
/** Route supported subscription events */
private handleClientSubscription(
client: WebSocket,
req: IParsedObjectRequest,
subscribe: boolean
): JsonRpc {
const params = req.payload.params as Subscription;
if (!params || !params.event) {
return jsonRpcError(
req.payload.id,
JsonRpcError.invalidParams('subscription requests must include an event name')
);
}
switch (params.event) {
case 'tx_update':
return this.handleTxUpdateSubscription(client, req, params, subscribe);
case 'address_tx_update':
return this.handleAddressTxUpdateSubscription(client, req, params, subscribe);
case 'address_balance_update':
return this.handleAddressBalanceUpdateSubscription(client, req, params, subscribe);
case 'block':
return this.handleBlockUpdateSubscription(client, req, params, subscribe);
case 'microblock':
return this.handleMicroblockUpdateSubscription(client, req, params, subscribe);
case 'mempool':
return this.handleMempoolUpdateSubscription(client, req, params, subscribe);
default:
return jsonRpcError(
req.payload.id,
JsonRpcError.invalidParams('subscription request must use a valid event name')
);
}
}
/** Process client request for tx update notifications */
private handleTxUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcTxUpdateSubscriptionParams,
subscribe: boolean
): JsonRpc {
const txId = normalizeHashString(params.tx_id);
if (!txId) {
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid tx_id'));
}
if (subscribe) {
this.subscriptions.get('transaction')?.addSubscription(client, txId);
this.prometheus?.subscribe(client, `transaction:${txId}`);
} else {
this.subscriptions.get('transaction')?.removeSubscription(client, txId);
this.prometheus?.unsubscribe(client, `transaction:${txId}`);
}
return jsonRpcSuccess(req.payload.id, { tx_id: txId });
}
/** Process client request for address tx update notifications */
private handleAddressTxUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcAddressTxSubscriptionParams,
subscribe: boolean
): JsonRpc {
const address = params.address;
if (!isValidPrincipal(address)) {
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid address'));
}
if (subscribe) {
this.subscriptions.get('principalTransactions')?.addSubscription(client, address);
this.prometheus?.subscribe(client, `address-transaction:${address}`);
} else {
this.subscriptions.get('principalTransactions')?.removeSubscription(client, address);
this.prometheus?.unsubscribe(client, `address-transaction:${address}`);
}
return jsonRpcSuccess(req.payload.id, { address: address });
}
private handleAddressBalanceUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcAddressBalanceSubscriptionParams,
subscribe: boolean
): JsonRpc {
const address = params.address;
if (!isValidPrincipal(address)) {
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid address'));
}
if (subscribe) {
this.subscriptions.get('principalStxBalance')?.addSubscription(client, address);
this.prometheus?.subscribe(client, `address-stx-balance:${address}`);
} else {
this.subscriptions.get('principalStxBalance')?.removeSubscription(client, address);
this.prometheus?.unsubscribe(client, `address-stx-balance:${address}`);
}
return jsonRpcSuccess(req.payload.id, { address: address });
}
private handleBlockUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcBlockSubscriptionParams,
subscribe: boolean
) {
if (subscribe) {
this.subscriptions.get('block')?.addSubscription(client, params.event);
this.prometheus?.subscribe(client, 'block');
} else {
this.subscriptions.get('block')?.removeSubscription(client, params.event);
this.prometheus?.unsubscribe(client, 'block');
}
return jsonRpcSuccess(req.payload.id, {});
}
private handleMicroblockUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcMicroblockSubscriptionParams,
subscribe: boolean
) {
if (subscribe) {
this.subscriptions.get('microblock')?.addSubscription(client, params.event);
this.prometheus?.subscribe(client, 'microblock');
} else {
this.subscriptions.get('microblock')?.removeSubscription(client, params.event);
this.prometheus?.unsubscribe(client, 'microblock');
}
return jsonRpcSuccess(req.payload.id, {});
}
private handleMempoolUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcMempoolSubscriptionParams,
subscribe: boolean
) {
if (subscribe) {
this.subscriptions.get('mempool')?.addSubscription(client, params.event);
this.prometheus?.subscribe(client, 'mempool');
} else {
this.subscriptions.get('mempool')?.removeSubscription(client, params.event);
this.prometheus?.unsubscribe(client, 'mempool');
}
return jsonRpcSuccess(req.payload.id, {});
}
private processTxUpdate(tx: Transaction | MempoolTransaction) {
try {
const subscribers = this.subscriptions.get('transaction')?.subscriptions.get(tx.tx_id);
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification('tx_update', tx).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
this.prometheus?.sendEvent('transaction');
}
} catch (error) {
logError(`error sending websocket tx update for ${tx.tx_id}`, error);
}
}
private processAddressUpdate(principal: string, tx: AddressTransactionWithTransfers) {
try {
const subscribers = this.subscriptions
.get('principalTransactions')
?.subscriptions.get(principal);
if (subscribers) {
const updateNotification = {
address: principal,
tx_id: tx.tx.tx_id,
tx_status: tx.tx.tx_status,
tx_type: tx.tx.tx_type,
...tx,
};
const rpcNotificationPayload = jsonRpcNotification(
'address_tx_update',
updateNotification
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
this.prometheus?.sendEvent('address-transaction');
}
} catch (error) {
logError(`error sending websocket address tx updates to ${principal}`, error);
}
}
private processAddressBalanceUpdate(principal: string, balance: AddressStxBalanceResponse) {
const subscribers = this.subscriptions.get('principalStxBalance')?.subscriptions.get(principal);
if (subscribers) {
try {
const balanceNotification = {
address: principal,
...balance,
};
const rpcNotificationPayload = jsonRpcNotification(
'address_balance_update',
balanceNotification
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
this.prometheus?.sendEvent('address-stx-balance');
} catch (error) {
logError(`error sending websocket stx balance update to ${principal}`, error);
}
}
}
private processBlockUpdate(block: Block) {
try {
const subscribers = this.subscriptions.get('block')?.subscriptions.get('block');
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification('block', block).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
this.prometheus?.sendEvent('block');
}
} catch (error) {
logError(`error sending websocket block updates`, error);
}
}
private processMicroblockUpdate(microblock: Microblock) {
try {
const subscribers = this.subscriptions.get('microblock')?.subscriptions.get('microblock');
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification('microblock', microblock).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
this.prometheus?.sendEvent('microblock');
}
} catch (error) {
logError(`error sending websocket microblock updates`, error);
}
}
private processMempoolUpdate(transaction: MempoolTransaction) {
try {
const subscribers = this.subscriptions.get('mempool')?.subscriptions.get('mempool');
if (subscribers) {
const rpcNotificationPayload = jsonRpcNotification('mempool', transaction).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
this.prometheus?.sendEvent('mempool');
}
} catch (error) {
logError(`error sending websocket mempool updates`, error);
}
}
}

View File

@@ -1,281 +0,0 @@
import { Server as SocketIOServer } from 'socket.io';
import * as http from 'http';
import {
AddressStxBalanceResponse,
AddressStxBalanceTopic,
AddressTransactionTopic,
AddressTransactionWithTransfers,
ClientToServerMessages,
Topic,
ServerToClientMessages,
} from '@stacks/stacks-blockchain-api-types';
import {
getBlockFromDataStore,
getMempoolTxsFromDataStore,
getMicroblockFromDataStore,
getTxFromDataStore,
parseDbTx,
} from '../../controllers/db-controller';
import { isProdEnv, isValidPrincipal, logError, logger } from '../../../helpers';
import { WebSocketPrometheus } from './metrics';
import { PgStore } from '../../../datastore/pg-store';
import { isValidTxId } from '../../../api/query-helpers';
function getInvalidSubscriptionTopics(subscriptions: Topic | Topic[]): undefined | string[] {
const isSubValid = (sub: Topic): undefined | string => {
if (sub.includes(':')) {
const txOrAddr = sub.split(':')[0];
const value = sub.split(':')[1];
switch (txOrAddr) {
case 'address-transaction':
case 'address-stx-balance':
return isValidPrincipal(value) ? undefined : sub;
case 'transaction':
return isValidTxId(value) ? undefined : sub;
default:
return sub;
}
}
switch (sub) {
case 'block':
case 'mempool':
case 'microblock':
return undefined;
default:
return sub;
}
};
if (!Array.isArray(subscriptions)) {
const invalidSub = isSubValid(subscriptions);
return invalidSub ? [invalidSub] : undefined;
}
const validatedSubs = subscriptions.map(isSubValid);
const invalidSubs = validatedSubs.filter(validSub => typeof validSub === 'string');
return invalidSubs.length === 0 ? undefined : (invalidSubs as string[]);
}
export function createSocketIORouter(db: PgStore, server: http.Server) {
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(server, {
cors: { origin: '*' },
});
let prometheus: WebSocketPrometheus | null;
if (isProdEnv) {
prometheus = new WebSocketPrometheus('socket_io');
}
io.on('connection', async socket => {
logger.info(`[socket.io] new connection: ${socket.id}`);
if (socket.handshake.headers['x-forwarded-for']) {
prometheus?.connect(socket.handshake.headers['x-forwarded-for'] as string);
} else {
prometheus?.connect(socket.handshake.address);
}
const subscriptions = socket.handshake.query['subscriptions'];
if (subscriptions) {
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
for (const topic of topics) {
prometheus?.subscribe(socket, topic);
await socket.join(topic);
}
}
socket.on('disconnect', reason => {
logger.info(`[socket.io] disconnected ${socket.id}: ${reason}`);
prometheus?.disconnect(socket);
});
socket.on('subscribe', async (topic, callback) => {
if (!getInvalidSubscriptionTopics(topic)) {
prometheus?.subscribe(socket, topic);
await socket.join(topic);
callback?.(null);
}
});
socket.on('unsubscribe', async (...topics) => {
for (const topic of topics) {
prometheus?.unsubscribe(socket, topic);
await socket.leave(topic);
}
});
});
// Middleware checks for the invalid topic subscriptions and terminates connection if found any
io.use((socket, next) => {
const subscriptions = socket.handshake.query['subscriptions'];
if (subscriptions) {
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
const invalidSubs = getInvalidSubscriptionTopics(topics as Topic[]);
if (invalidSubs) {
const error = new Error(`Invalid topic: ${invalidSubs.join(', ')}`);
next(error);
} else {
next();
}
} else {
next();
}
});
const adapter = io.of('/').adapter;
adapter.on('create-room', room => {
logger.info(`[socket.io] room created: ${room}`);
});
adapter.on('delete-room', room => {
logger.info(`[socket.io] room deleted: ${room}`);
});
adapter.on('join-room', (room, id) => {
logger.info(`[socket.io] socket ${id} joined room: ${room}`);
});
adapter.on('leave-room', (room, id) => {
logger.info(`[socket.io] socket ${id} left room: ${room}`);
});
db.eventEmitter.on('blockUpdate', async blockHash => {
// Only parse and emit data if there are currently subscriptions to the blocks topic
const blockTopic: Topic = 'block';
if (adapter.rooms.has(blockTopic)) {
const blockQuery = await getBlockFromDataStore({ blockIdentifer: { hash: blockHash }, db });
if (!blockQuery.found) {
return;
}
const block = blockQuery.result;
prometheus?.sendEvent('block');
io.to(blockTopic).emit('block', block);
}
});
db.eventEmitter.on('microblockUpdate', async microblockHash => {
const microblockTopic: Topic = 'microblock';
if (adapter.rooms.has(microblockTopic)) {
const microblockQuery = await getMicroblockFromDataStore({
db: db,
microblockHash: microblockHash,
});
if (!microblockQuery.found) {
return;
}
const microblock = microblockQuery.result;
prometheus?.sendEvent('microblock');
io.to(microblockTopic).emit('microblock', microblock);
}
});
db.eventEmitter.on('txUpdate', async txId => {
// Mempool updates
const mempoolTopic: Topic = 'mempool';
if (adapter.rooms.has(mempoolTopic)) {
const mempoolTxs = await getMempoolTxsFromDataStore(db, {
txIds: [txId],
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
const mempoolTx = mempoolTxs[0];
prometheus?.sendEvent('mempool');
io.to(mempoolTopic).emit('mempool', mempoolTx);
}
}
// Individual tx updates
const txTopic: Topic = `transaction:${txId}`;
if (adapter.rooms.has(txTopic)) {
const mempoolTxs = await getMempoolTxsFromDataStore(db, {
txIds: [txId],
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
prometheus?.sendEvent('transaction');
io.to(mempoolTopic).emit('transaction', mempoolTxs[0]);
} else {
const txQuery = await getTxFromDataStore(db, {
txId: txId,
includeUnanchored: true,
});
if (txQuery.found) {
prometheus?.sendEvent('transaction');
io.to(mempoolTopic).emit('transaction', txQuery.result);
}
}
}
});
db.eventEmitter.on('addressUpdate', async (address, blockHeight) => {
const addrTxTopic: AddressTransactionTopic = `address-transaction:${address}` as const;
const addrStxBalanceTopic: AddressStxBalanceTopic = `address-stx-balance:${address}` as const;
if (!adapter.rooms.has(addrTxTopic) && !adapter.rooms.has(addrStxBalanceTopic)) {
return;
}
const dbTxsQuery = await db.getAddressTxsWithAssetTransfers({
stxAddress: address,
blockHeight: blockHeight,
atSingleBlock: true,
});
if (dbTxsQuery.total == 0) {
return;
}
const addressTxs = dbTxsQuery.results;
// Address txs updates
if (adapter.rooms.has(addrTxTopic)) {
addressTxs.forEach(addressTx => {
const parsedTx = parseDbTx(addressTx.tx);
const result: AddressTransactionWithTransfers = {
tx: parsedTx,
stx_sent: addressTx.stx_sent.toString(),
stx_received: addressTx.stx_received.toString(),
stx_transfers: addressTx.stx_transfers.map(value => {
return {
amount: value.amount.toString(),
sender: value.sender,
recipient: value.recipient,
};
}),
};
prometheus?.sendEvent('address-transaction');
io.to(addrTxTopic).emit('address-transaction', address, result);
io.to(addrTxTopic).emit(addrTxTopic, address, result);
});
}
// Address STX balance updates
if (adapter.rooms.has(addrStxBalanceTopic)) {
// Get latest balance (in case multiple txs come in from different blocks)
const blockHeights = addressTxs.map(tx => tx.tx.block_height);
const latestBlock = Math.max(...blockHeights);
getAddressStxBalance(address, latestBlock)
.then(balance => {
prometheus?.sendEvent('address-stx-balance');
io.to(addrStxBalanceTopic).emit('address-stx-balance', address, balance);
io.to(addrStxBalanceTopic).emit(addrStxBalanceTopic, address, balance);
})
.catch(error => {
logError(`[socket.io] Error querying STX balance update for ${address}`, error);
});
}
});
async function getAddressStxBalance(address: string, blockHeight: number) {
const stxBalanceResult = await db.getStxBalanceAtBlock(address, blockHeight);
const tokenOfferingLocked = await db.getTokenOfferingLocked(address, blockHeight);
const result: AddressStxBalanceResponse = {
balance: stxBalanceResult.balance.toString(),
total_sent: stxBalanceResult.totalSent.toString(),
total_received: stxBalanceResult.totalReceived.toString(),
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
lock_tx_id: stxBalanceResult.lockTxId,
locked: stxBalanceResult.locked.toString(),
lock_height: stxBalanceResult.lockHeight,
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
};
if (tokenOfferingLocked.found) {
result.token_offering_locked = tokenOfferingLocked.result;
}
return result;
}
return io;
}

View File

@@ -0,0 +1,76 @@
import * as http from 'http';
import {
AddressStxBalanceResponse,
AddressTransactionWithTransfers,
Block,
MempoolTransaction,
Microblock,
Transaction,
} from 'docs/generated';
import { WebSocketPrometheus } from './web-socket-prometheus';
/**
* Topics that external API users may subscribe to when looking for real time updates.
* Not to be confused with `PgStoreEventEmitter` internal events or `WebSocketPayload` messages.
*/
export type WebSocketTopics = {
block: () => void;
microblock: () => void;
mempool: () => void;
transaction: (txId: string) => void;
principalTransactions: (principal: string) => void;
principalStxBalance: (principal: string) => void;
};
/**
* Payloads that can be sent to external API users depending on which `WebSocketTopics` they are
* subscribed to. Each of these contains a full database object relevant to the response so each
* channel may format the output according to its needs.
*/
export type WebSocketPayload = {
block: (block: Block) => void;
microblock: (microblock: Microblock) => void;
mempoolTransaction: (transaction: MempoolTransaction) => void;
transaction: (transaction: Transaction | MempoolTransaction) => void;
principalTransaction: (principal: string, transaction: AddressTransactionWithTransfers) => void;
principalStxBalance: (principal: string, stxBalance: AddressStxBalanceResponse) => void;
};
/**
* Type hack taken from https://github.com/bterlson/strict-event-emitter-types to dynamically set
* the argument types depending on the selected topic or payload.
*/
export type ListenerType<T> = [T] extends [(...args: infer U) => any]
? U
: [T] extends [void]
? []
: [T];
/**
* A channel that accepts user subscriptions to real time updates and responds with relevant
* payloads through WebSockets (or its flavors like Socket.IO).
*/
export abstract class WebSocketChannel {
readonly server: http.Server;
protected prometheus?: WebSocketPrometheus;
constructor(server: http.Server) {
this.server = server;
}
abstract connect(): void;
abstract close(callback?: (err?: Error | undefined) => void): void;
/** Checks if the channel has listeners for the specified topic */
abstract hasListeners<P extends keyof WebSocketTopics>(
topic: P,
...args: ListenerType<WebSocketTopics[P]>
): boolean;
/** Sends a payload through the channel */
abstract send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
): void;
}

View File

@@ -0,0 +1,170 @@
import * as http from 'http';
import { AddressStxBalanceResponse, AddressTransactionWithTransfers } from 'docs/generated';
import {
getBlockFromDataStore,
getMempoolTxsFromDataStore,
getMicroblockFromDataStore,
getTxFromDataStore,
parseDbTx,
} from '../../controllers/db-controller';
import { PgStore } from '../../../datastore/pg-store';
import { WebSocketChannel } from './web-socket-channel';
import { SocketIOChannel } from './channels/socket-io-channel';
import { WsRpcChannel } from './channels/ws-rpc-channel';
/**
* This object matches real time update `WebSocketTopics` subscriptions with internal
* `PgStoreEventEmitter` notifications. If a match is found, the relevant data is queried from the
* database and returned to users using all available WebSocket channels.
*/
export class WebSocketTransmitter {
readonly db: PgStore;
readonly server: http.Server;
private channels: WebSocketChannel[] = [];
constructor(db: PgStore, server: http.Server) {
this.db = db;
this.server = server;
}
connect() {
this.db.eventEmitter.addListener('blockUpdate', blockHash => this.blockUpdate(blockHash));
this.db.eventEmitter.addListener('microblockUpdate', microblockHash =>
this.microblockUpdate(microblockHash)
);
this.db.eventEmitter.addListener('txUpdate', txId => this.txUpdate(txId));
this.db.eventEmitter.addListener('addressUpdate', (address, blockHeight) =>
this.addressUpdate(address, blockHeight)
);
this.channels.push(new SocketIOChannel(this.server));
this.channels.push(new WsRpcChannel(this.server));
this.channels.forEach(c => c.connect());
}
close(callback: (err?: Error | undefined) => void) {
Promise.all(
this.channels.map(
c =>
new Promise<void>((resolve, reject) => {
c.close(error => {
if (error) {
reject(error);
} else {
resolve();
}
});
})
)
)
.then(_ => callback())
.catch(error => callback(error));
}
private async blockUpdate(blockHash: string) {
if (this.channels.find(c => c.hasListeners('block'))) {
const blockQuery = await getBlockFromDataStore({
blockIdentifer: { hash: blockHash },
db: this.db,
});
if (blockQuery.found) {
this.channels.forEach(c => c.send('block', blockQuery.result));
}
}
}
private async microblockUpdate(microblockHash: string) {
if (this.channels.find(c => c.hasListeners('microblock'))) {
const microblockQuery = await getMicroblockFromDataStore({
db: this.db,
microblockHash: microblockHash,
});
if (microblockQuery.found) {
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
}
}
}
private async txUpdate(txId: string) {
if (this.channels.find(c => c.hasListeners('mempool'))) {
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
txIds: [txId],
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
}
}
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
// Look at the `txs` table first so we always prefer the confirmed transaction.
const txQuery = await getTxFromDataStore(this.db, {
txId: txId,
includeUnanchored: true,
});
if (txQuery.found) {
this.channels.forEach(c => c.send('transaction', txQuery.result));
} else {
// Tx is not yet confirmed, look at `mempool_txs`.
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
txIds: [txId],
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
}
}
}
}
private async addressUpdate(address: string, blockHeight: number) {
if (this.channels.find(c => c.hasListeners('principalTransactions', address))) {
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
stxAddress: address,
blockHeight: blockHeight,
atSingleBlock: true,
});
if (dbTxsQuery.total == 0) {
return;
}
const addressTxs = dbTxsQuery.results;
for (const addressTx of addressTxs) {
const parsedTx = parseDbTx(addressTx.tx);
const result: AddressTransactionWithTransfers = {
tx: parsedTx,
stx_sent: addressTx.stx_sent.toString(),
stx_received: addressTx.stx_received.toString(),
stx_transfers: addressTx.stx_transfers.map(value => {
return {
amount: value.amount.toString(),
sender: value.sender,
recipient: value.recipient,
};
}),
};
this.channels.forEach(c => c.send('principalTransaction', address, result));
}
}
if (this.channels.find(c => c.hasListeners('principalStxBalance', address))) {
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
const balance: AddressStxBalanceResponse = {
balance: stxBalanceResult.balance.toString(),
total_sent: stxBalanceResult.totalSent.toString(),
total_received: stxBalanceResult.totalReceived.toString(),
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
lock_tx_id: stxBalanceResult.lockTxId,
locked: stxBalanceResult.locked.toString(),
lock_height: stxBalanceResult.lockHeight,
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
};
if (tokenOfferingLocked.found) {
balance.token_offering_locked = tokenOfferingLocked.result;
}
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
}
}
}

View File

@@ -1,559 +0,0 @@
import {
JsonRpcError,
JsonRpc,
IParsedObjectRequest,
parse as parseRpcString,
error as jsonRpcError,
notification as jsonRpcNotification,
success as jsonRpcSuccess,
} from 'jsonrpc-lite';
import * as WebSocket from 'ws';
import * as http from 'http';
import * as net from 'net';
import PQueue from 'p-queue';
import {
RpcTxUpdateSubscriptionParams,
RpcAddressTxSubscriptionParams,
RpcAddressBalanceSubscriptionParams,
RpcAddressBalanceNotificationParams,
RpcAddressTxNotificationParams,
RpcBlockSubscriptionParams,
RpcMicroblockSubscriptionParams,
RpcMempoolSubscriptionParams,
RpcTxUpdateNotificationParams,
} from '@stacks/stacks-blockchain-api-types';
import { DbTx, DbMempoolTx } from '../../../datastore/common';
import { normalizeHashString, logError, isValidPrincipal, isProdEnv } from '../../../helpers';
import {
getBlockFromDataStore,
getMempoolTxsFromDataStore,
getMicroblockFromDataStore,
getTxStatusString,
getTxTypeString,
} from '../../controllers/db-controller';
import { WebSocketPrometheus } from './metrics';
import { PgStore } from '../../../datastore/pg-store';
type Subscription =
| RpcTxUpdateSubscriptionParams
| RpcAddressTxSubscriptionParams
| RpcAddressBalanceSubscriptionParams
| RpcBlockSubscriptionParams
| RpcMicroblockSubscriptionParams
| RpcMempoolSubscriptionParams;
class SubscriptionManager {
/**
* Key = subscription topic.
* Value = clients interested in the subscription topic.
*/
subscriptions: Map<string, Set<WebSocket>> = new Map();
// Sockets that are responding to ping.
liveSockets: Set<WebSocket> = new Set();
heartbeatInterval?: NodeJS.Timeout;
readonly heartbeatIntervalMs = 5_000;
addSubscription(client: WebSocket, topicId: string) {
if (this.subscriptions.size === 0) {
this.startHeartbeat();
}
let clients = this.subscriptions.get(topicId);
if (!clients) {
clients = new Set();
this.subscriptions.set(topicId, clients);
}
clients.add(client);
this.liveSockets.add(client);
client.on('close', () => {
this.removeSubscription(client, topicId);
});
client.on('pong', () => {
this.liveSockets.add(client);
});
}
removeSubscription(client: WebSocket, topicId: string) {
const clients = this.subscriptions.get(topicId);
if (clients) {
clients.delete(client);
if (clients.size === 0) {
this.subscriptions.delete(topicId);
if (this.subscriptions.size === 0) {
this.stopHeartbeat();
}
}
}
this.liveSockets.delete(client);
}
startHeartbeat() {
if (this.heartbeatInterval) {
return;
}
this.heartbeatInterval = setInterval(() => {
this.subscriptions.forEach((clients, topic) => {
clients.forEach(ws => {
// Client did not respond to a previous ping, it's dead.
if (!this.liveSockets.has(ws)) {
this.removeSubscription(ws, topic);
return;
}
// Assume client is dead until it responds to our ping.
this.liveSockets.delete(ws);
ws.ping();
});
});
}, this.heartbeatIntervalMs);
}
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = undefined;
}
}
close() {
this.subscriptions.clear();
this.liveSockets.clear();
this.stopHeartbeat();
}
}
export function createWsRpcRouter(db: PgStore, server: http.Server): WebSocket.Server {
let prometheus: WebSocketPrometheus | null;
if (isProdEnv) {
prometheus = new WebSocketPrometheus('websocket');
}
// Use `noServer` and the `upgrade` event to prevent the ws lib from hijacking the http.Server error event
const wsPath = '/extended/v1/ws';
const wsServer = new WebSocket.Server({ noServer: true, path: wsPath });
server.on('upgrade', (request: http.IncomingMessage, socket, head) => {
if (request.url?.startsWith(wsPath)) {
wsServer.handleUpgrade(request, socket as net.Socket, head, ws => {
wsServer.emit('connection', ws, request);
});
}
});
const txUpdateSubscriptions = new SubscriptionManager();
const addressTxUpdateSubscriptions = new SubscriptionManager();
const addressBalanceUpdateSubscriptions = new SubscriptionManager();
const blockSubscriptions = new SubscriptionManager();
const microblockSubscriptions = new SubscriptionManager();
const mempoolSubscriptions = new SubscriptionManager();
function handleClientMessage(client: WebSocket, data: WebSocket.Data) {
try {
if (typeof data !== 'string') {
throw JsonRpcError.parseError(`unexpected data type: ${data.constructor.name}`);
}
const parsedRpcReq = parseRpcString(data);
const isBatchRequest = Array.isArray(parsedRpcReq);
let rpcReqs = Array.isArray(parsedRpcReq) ? parsedRpcReq : [parsedRpcReq];
// Ignore client notifications, spec dictates server should never respond to these.
rpcReqs = rpcReqs.filter(req => req.type !== 'notification');
const responses: JsonRpc[] = rpcReqs.map(rpcReq => {
switch (rpcReq.type) {
case 'request':
return handleClientRpcReq(client, rpcReq);
case 'error':
return jsonRpcError(
rpcReq.payload.id,
JsonRpcError.invalidRequest('unexpected error msg from client')
);
case 'success':
return jsonRpcError(
rpcReq.payload.id,
JsonRpcError.invalidRequest('unexpected success msg from client')
);
case 'invalid':
return jsonRpcError(null as any, rpcReq.payload);
default:
return jsonRpcError(
null as any,
JsonRpcError.invalidRequest('unexpected msg type from client')
);
}
});
if (isBatchRequest) {
client.send(JSON.stringify(responses));
} else if (responses.length === 1) {
client.send(responses[0].serialize());
}
} catch (err: any) {
// Response `id` is null for invalid JSON requests (or other errors where the request ID isn't known).
try {
const res = err instanceof JsonRpcError ? err : JsonRpcError.internalError(err.toString());
sendRpcResponse(client, jsonRpcError(null as any, res));
} catch (error) {
// ignore any errors here
}
}
}
function sendRpcResponse(client: WebSocket, res: JsonRpc) {
client.send(res.serialize());
}
/** Route supported RPC methods */
function handleClientRpcReq(client: WebSocket, req: IParsedObjectRequest): JsonRpc {
switch (req.payload.method) {
case 'subscribe':
return handleClientSubscription(client, req, true);
case 'unsubscribe':
return handleClientSubscription(client, req, false);
default:
return jsonRpcError(req.payload.id, JsonRpcError.methodNotFound(null));
}
}
/** Route supported subscription events */
function handleClientSubscription(
client: WebSocket,
req: IParsedObjectRequest,
subscribe: boolean
): JsonRpc {
const params = req.payload.params as Subscription;
if (!params || !params.event) {
return jsonRpcError(
req.payload.id,
JsonRpcError.invalidParams('subscription requests must include an event name')
);
}
switch (params.event) {
case 'tx_update':
return handleTxUpdateSubscription(client, req, params, subscribe);
case 'address_tx_update':
return handleAddressTxUpdateSubscription(client, req, params, subscribe);
case 'address_balance_update':
return handleAddressBalanceUpdateSubscription(client, req, params, subscribe);
case 'block':
return handleBlockUpdateSubscription(client, req, params, subscribe);
case 'microblock':
return handleMicroblockUpdateSubscription(client, req, params, subscribe);
case 'mempool':
return handleMempoolUpdateSubscription(client, req, params, subscribe);
default:
return jsonRpcError(
req.payload.id,
JsonRpcError.invalidParams('subscription request must use a valid event name')
);
}
}
/** Process client request for tx update notifications */
function handleTxUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcTxUpdateSubscriptionParams,
subscribe: boolean
): JsonRpc {
const txId = normalizeHashString(params.tx_id);
if (!txId) {
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid tx_id'));
}
if (subscribe) {
txUpdateSubscriptions.addSubscription(client, txId);
prometheus?.subscribe(client, `transaction:${txId}`);
} else {
txUpdateSubscriptions.removeSubscription(client, txId);
prometheus?.unsubscribe(client, `transaction:${txId}`);
}
return jsonRpcSuccess(req.payload.id, { tx_id: txId });
}
/** Process client request for address tx update notifications */
function handleAddressTxUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcAddressTxSubscriptionParams,
subscribe: boolean
): JsonRpc {
const address = params.address;
if (!isValidPrincipal(address)) {
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid address'));
}
if (subscribe) {
addressTxUpdateSubscriptions.addSubscription(client, address);
prometheus?.subscribe(client, `address-transaction:${address}`);
} else {
addressTxUpdateSubscriptions.removeSubscription(client, address);
prometheus?.unsubscribe(client, `address-transaction:${address}`);
}
return jsonRpcSuccess(req.payload.id, { address: address });
}
function handleAddressBalanceUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcAddressBalanceSubscriptionParams,
subscribe: boolean
): JsonRpc {
const address = params.address;
if (!isValidPrincipal(address)) {
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid address'));
}
if (subscribe) {
addressBalanceUpdateSubscriptions.addSubscription(client, address);
prometheus?.subscribe(client, `address-stx-balance:${address}`);
} else {
addressBalanceUpdateSubscriptions.removeSubscription(client, address);
prometheus?.unsubscribe(client, `address-stx-balance:${address}`);
}
return jsonRpcSuccess(req.payload.id, { address: address });
}
function handleBlockUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcBlockSubscriptionParams,
subscribe: boolean
) {
if (subscribe) {
blockSubscriptions.addSubscription(client, params.event);
prometheus?.subscribe(client, 'block');
} else {
blockSubscriptions.removeSubscription(client, params.event);
prometheus?.unsubscribe(client, 'block');
}
return jsonRpcSuccess(req.payload.id, {});
}
function handleMicroblockUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcMicroblockSubscriptionParams,
subscribe: boolean
) {
if (subscribe) {
microblockSubscriptions.addSubscription(client, params.event);
prometheus?.subscribe(client, 'microblock');
} else {
microblockSubscriptions.removeSubscription(client, params.event);
prometheus?.unsubscribe(client, 'microblock');
}
return jsonRpcSuccess(req.payload.id, {});
}
function handleMempoolUpdateSubscription(
client: WebSocket,
req: IParsedObjectRequest,
params: RpcMempoolSubscriptionParams,
subscribe: boolean
) {
if (subscribe) {
mempoolSubscriptions.addSubscription(client, params.event);
prometheus?.subscribe(client, 'mempool');
} else {
mempoolSubscriptions.removeSubscription(client, params.event);
prometheus?.unsubscribe(client, 'mempool');
}
return jsonRpcSuccess(req.payload.id, {});
}
async function processTxUpdate(txId: string) {
try {
const subscribers = txUpdateSubscriptions.subscriptions.get(txId);
if (subscribers) {
let tx: DbTx | DbMempoolTx; // Tx updates can come from both mempool and mined txs.
const dbMempoolTxQuery = await db.getMempoolTx({
txId: txId,
includeUnanchored: true,
includePruned: true,
});
if (dbMempoolTxQuery.found) {
tx = dbMempoolTxQuery.result;
} else {
const dbTxQuery = await db.getTx({ txId: txId, includeUnanchored: true });
if (dbTxQuery.found) {
tx = dbTxQuery.result;
} else {
return;
}
}
const updateNotification: RpcTxUpdateNotificationParams = {
tx_id: tx.tx_id,
tx_status: getTxStatusString(tx.status),
tx_type: getTxTypeString(tx.type_id),
};
const rpcNotificationPayload = jsonRpcNotification(
'tx_update',
updateNotification
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('transaction');
}
} catch (error) {
logError(`error sending websocket tx update for ${txId}`, error);
}
}
async function processAddressUpdate(address: string, blockHeight: number) {
try {
const subscribers = addressTxUpdateSubscriptions.subscriptions.get(address);
if (subscribers) {
const dbTxsQuery = await db.getAddressTxsWithAssetTransfers({
stxAddress: address,
blockHeight: blockHeight,
atSingleBlock: true,
});
if (dbTxsQuery.total == 0) {
return;
}
const addressTxs = dbTxsQuery.results;
addressTxs.forEach(tx => {
const updateNotification: RpcAddressTxNotificationParams = {
address: address,
tx_id: tx.tx.tx_id,
tx_status: getTxStatusString(tx.tx.status),
tx_type: getTxTypeString(tx.tx.type_id),
};
const rpcNotificationPayload = jsonRpcNotification(
'address_tx_update',
updateNotification
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('address-transaction');
});
}
} catch (error) {
logError(`error sending websocket address tx updates to ${address}`, error);
}
}
// Queue to process balance update notifications
const addrBalanceProcessorQueue = new PQueue({ concurrency: 1 });
async function processAddressBalanceUpdate(address: string) {
const subscribers = addressBalanceUpdateSubscriptions.subscriptions.get(address);
if (subscribers) {
await addrBalanceProcessorQueue.add(async () => {
try {
const balance = await db.getStxBalance({
stxAddress: address,
includeUnanchored: true,
});
const balanceNotification: RpcAddressBalanceNotificationParams = {
address: address,
balance: balance.balance.toString(),
};
const rpcNotificationPayload = jsonRpcNotification(
'address_balance_update',
balanceNotification
).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('address-stx-balance');
} catch (error) {
logError(`error sending websocket stx balance update to ${address}`, error);
}
});
}
}
async function processBlockUpdate(blockHash: string) {
try {
const subscribers = blockSubscriptions.subscriptions.get('block');
if (subscribers) {
const blockQuery = await getBlockFromDataStore({ blockIdentifer: { hash: blockHash }, db });
if (blockQuery.found) {
const block = blockQuery.result;
const rpcNotificationPayload = jsonRpcNotification('block', block).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('block');
}
}
} catch (error) {
logError(`error sending websocket block updates`, error);
}
}
async function processMicroblockUpdate(microblockHash: string) {
try {
const subscribers = microblockSubscriptions.subscriptions.get('microblock');
if (subscribers) {
const microblockQuery = await getMicroblockFromDataStore({
microblockHash: microblockHash,
db,
});
if (microblockQuery.found) {
const microblock = microblockQuery.result;
const rpcNotificationPayload = jsonRpcNotification('microblock', microblock).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('microblock');
}
}
} catch (error) {
logError(`error sending websocket microblock updates`, error);
}
}
async function processMempoolUpdate(txId: string) {
try {
const subscribers = mempoolSubscriptions.subscriptions.get('mempool');
if (subscribers) {
const mempoolTxs = await getMempoolTxsFromDataStore(db, {
txIds: [txId],
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
const mempoolTx = mempoolTxs[0];
const rpcNotificationPayload = jsonRpcNotification('mempool', mempoolTx).serialize();
subscribers.forEach(client => client.send(rpcNotificationPayload));
prometheus?.sendEvent('mempool');
}
}
} catch (error) {
logError(`error sending websocket mempool updates`, error);
}
}
db.eventEmitter.addListener('txUpdate', async txId => {
await processTxUpdate(txId);
await processMempoolUpdate(txId);
});
db.eventEmitter.addListener('addressUpdate', async (address, blockHeight) => {
await processAddressUpdate(address, blockHeight);
await processAddressBalanceUpdate(address);
});
db.eventEmitter.addListener('blockUpdate', async blockHash => {
await processBlockUpdate(blockHash);
});
db.eventEmitter.addListener('microblockUpdate', async microblockHash => {
await processMicroblockUpdate(microblockHash);
});
wsServer.on('connection', (clientSocket, req) => {
if (req.headers['x-forwarded-for']) {
prometheus?.connect(req.headers['x-forwarded-for'] as string);
} else if (req.socket.remoteAddress) {
prometheus?.connect(req.socket.remoteAddress);
}
clientSocket.on('message', data => {
handleClientMessage(clientSocket, data);
});
clientSocket.on('close', (_: WebSocket) => {
prometheus?.disconnect(clientSocket);
});
});
wsServer.on('close', (_: WebSocket.Server) => {
txUpdateSubscriptions.close();
addressTxUpdateSubscriptions.close();
addressBalanceUpdateSubscriptions.close();
blockSubscriptions.close();
microblockSubscriptions.close();
mempoolSubscriptions.close();
});
return wsServer;
}

View File

@@ -938,6 +938,7 @@ export class PgStore {
DbTxStatus.DroppedReplaceAcrossFork,
DbTxStatus.DroppedTooExpensive,
DbTxStatus.DroppedStaleGarbageCollect,
DbTxStatus.DroppedApiGarbageCollect,
];
const resultQuery = await sql<(MempoolTxQueryResult & { count: number })[]>`
SELECT ${unsafeCols(sql, [

View File

@@ -187,6 +187,7 @@ export class PgWriteStore extends PgStore {
async update(data: DataStoreBlockUpdateData): Promise<void> {
const tokenMetadataQueueEntries: DbTokenMetadataQueueEntry[] = [];
let garbageCollectedMempoolTxs: string[] = [];
await this.sql.begin(async sql => {
const chainTip = await this.getChainTip(sql);
await this.handleReorg(sql, data.block, chainTip.blockHeight);
@@ -359,10 +360,13 @@ export class PgWriteStore extends PgStore {
}
await this.refreshNftCustody(sql, batchedTxData);
await this.refreshMaterializedView(sql, 'chain_tip');
const deletedMempoolTxs = await this.deleteGarbageCollectedMempoolTxs(sql);
if (deletedMempoolTxs.deletedTxs.length > 0) {
logger.verbose(`Garbage collected ${deletedMempoolTxs.deletedTxs.length} mempool txs`);
const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql);
if (mempoolGarbageResults.deletedTxs.length > 0) {
logger.verbose(
`Garbage collected ${mempoolGarbageResults.deletedTxs.length} mempool txs`
);
}
garbageCollectedMempoolTxs = mempoolGarbageResults.deletedTxs;
const tokenContractDeployments = data.txs
.filter(entry => entry.tx.type_id === DbTxTypeId.SmartContract)
@@ -397,6 +401,9 @@ export class PgWriteStore extends PgStore {
for (const tx of data.txs) {
await this.notifier.sendTx({ txId: tx.tx.tx_id });
}
for (const txId of garbageCollectedMempoolTxs) {
await this.notifier?.sendTx({ txId: txId });
}
await this.emitAddressTxUpdates(data.txs);
for (const tokenMetadataQueueEntry of tokenMetadataQueueEntries) {
await this.notifier.sendTokenMetadata({ queueId: tokenMetadataQueueEntry.queueId });

View File

@@ -671,9 +671,6 @@ export async function getOrAddAsync<K, V>(
return val;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type ElementType<T extends any[]> = T extends (infer U)[] ? U : never;
export type FoundOrNot<T> = { found: true; result: T } | { found: false; result?: T };
export function timeout(ms: number, abortController?: AbortController): Promise<void> {

View File

@@ -158,7 +158,7 @@ export async function getOperations(
return operations;
}
export function processUnlockingEvents(events: StxUnlockEvent[], operations: RosettaOperation[]) {
function processUnlockingEvents(events: StxUnlockEvent[], operations: RosettaOperation[]) {
events.forEach(event => {
operations.push(makeStakeUnlockOperation(event, operations.length));
});

View File

@@ -216,7 +216,7 @@ function testTx(args?: TestTxArgs): DataStoreTxEventData {
microblock_hash: args?.microblock_hash ?? MICROBLOCK_HASH,
token_transfer_amount: args?.token_transfer_amount ?? TOKEN_TRANSFER_AMOUNT,
token_transfer_recipient_address: args?.token_transfer_recipient_address ?? RECIPIENT_ADDRESS,
token_transfer_memo: args?.token_transfer_memo,
token_transfer_memo: args?.token_transfer_memo ?? '',
smart_contract_contract_id: args?.smart_contract_contract_id,
smart_contract_source_code: args?.smart_contract_source_code,
execution_cost_read_count: 0,

View File

@@ -103,12 +103,15 @@ describe('socket-io', () => {
});
const mempoolWaiter: Waiter<MempoolTransaction> = waiter();
const txWaiters: Waiter<MempoolTransaction | Transaction>[] = [waiter(), waiter()];
let txWaiterIndex = 0;
socket.on('mempool', tx => {
mempoolWaiter.finish(tx);
});
socket.on('transaction', tx => {
txWaiters[txWaiterIndex++].finish(tx);
if (tx.tx_status === 'pending') {
txWaiters[0].finish(tx);
} else {
txWaiters[1].finish(tx);
}
});
const block = new TestBlockBuilder().addTx().build();
@@ -116,23 +119,23 @@ describe('socket-io', () => {
const mempoolTx = testMempoolTx({ tx_id: '0x01', status: DbTxStatus.Pending });
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
const mempoolResult = await mempoolWaiter;
const txResult = await txWaiters[0];
const microblock = new TestMicroblockStreamBuilder()
.addMicroblock()
.addTx({ tx_id: '0x01' })
.build();
await db.updateMicroblocks(microblock);
const mempoolResult = await mempoolWaiter;
const txResult = await txWaiters[0];
const txMicroblockResult = await txWaiters[1];
try {
expect(mempoolResult.tx_status).toEqual('pending');
expect(mempoolResult.tx_id).toEqual('0x01');
expect(txResult.tx_status).toEqual('pending');
expect(txResult.tx_id).toEqual('0x01');
expect(txMicroblockResult.tx_id).toEqual('0x01');
expect(txMicroblockResult.tx_status).toEqual('pending');
expect(txMicroblockResult.tx_status).toEqual('success');
} finally {
socket.emit('unsubscribe', 'mempool');
socket.emit('unsubscribe', 'transaction:0x01');
@@ -140,6 +143,52 @@ describe('socket-io', () => {
}
});
test('socket-io > mempool txs', async () => {
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = '0';
const address = apiServer.address;
const socket = io(`http://${address}`, {
reconnection: false,
query: { subscriptions: 'mempool' },
});
const txWaiters: Waiter<MempoolTransaction | Transaction>[] = [waiter(), waiter()];
socket.on('mempool', tx => {
if (tx.tx_status === 'pending') {
txWaiters[0].finish(tx);
} else {
txWaiters[1].finish(tx);
}
});
const block1 = new TestBlockBuilder({ block_height: 1, index_block_hash: '0x01' })
.addTx({ tx_id: '0x0101' })
.build();
await db.update(block1);
const mempoolTx = testMempoolTx({ tx_id: '0x01', status: DbTxStatus.Pending });
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
const pendingResult = await txWaiters[0];
const block2 = new TestBlockBuilder({
block_height: 2,
index_block_hash: '0x02',
parent_index_block_hash: '0x01',
})
.addTx({ tx_id: '0x0201' })
.build();
await db.update(block2);
const droppedResult = await txWaiters[1];
try {
expect(pendingResult.tx_id).toEqual('0x01');
expect(pendingResult.tx_status).toEqual('pending');
expect(droppedResult.tx_id).toEqual('0x01');
expect(droppedResult.tx_status).toEqual('dropped_stale_garbage_collect');
} finally {
socket.emit('unsubscribe', 'mempool');
socket.close();
}
});
test('socket-io > address tx updates', async () => {
const addr1 = 'ST28D4Q6RCQSJ6F7TEYWQDS4N1RXYEP9YBWMYSB97';
const socket = io(`http://${apiServer.address}`, {

View File

@@ -6,7 +6,6 @@ import { once } from 'events';
import { RpcWebSocketClient } from 'rpc-websocket-client';
import {
RpcTxUpdateSubscriptionParams,
RpcTxUpdateNotificationParams,
RpcAddressTxSubscriptionParams,
RpcAddressTxNotificationParams,
RpcAddressBalanceSubscriptionParams,
@@ -84,7 +83,7 @@ describe('websocket notifications', () => {
const mempoolWaiter: Waiter<MempoolTransaction> = waiter();
client.onNotification.push(msg => {
if (msg.method === 'tx_update') {
const txUpdate: RpcTxUpdateNotificationParams = msg.params;
const txUpdate: RpcAddressTxNotificationParams = msg.params;
txUpdates[updateIndex++]?.finish(txUpdate.tx_status);
}
if (msg.method === 'mempool') {
@@ -115,14 +114,14 @@ describe('websocket notifications', () => {
// check for microblock tx update notification
const txStatus2 = await txUpdates[1];
expect(txStatus2).toBe('pending');
expect(txStatus2).toBe('success');
// update DB with TX after WS server is sent txid to monitor
db.eventEmitter.emit('txUpdate', txId);
// check for tx update notification
const txStatus3 = await txUpdates[2];
expect(txStatus3).toBe('pending');
expect(txStatus3).toBe('success');
// unsubscribe from notifications for this tx
const unsubscribeResult = await client.call('unsubscribe', subParams1);
@@ -272,6 +271,56 @@ describe('websocket notifications', () => {
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
tx_status: 'success',
tx_type: 'token_transfer',
stx_received: '100',
stx_sent: '150',
stx_transfers: [
{
amount: '100',
recipient: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
sender: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
},
],
tx: {
anchor_mode: 'any',
block_hash: '0x01',
block_height: 1,
burn_block_time: 94869286,
burn_block_time_iso: '1973-01-03T00:34:46.000Z',
canonical: true,
event_count: 0,
events: [],
execution_cost_read_count: 0,
execution_cost_read_length: 0,
execution_cost_runtime: 0,
execution_cost_write_count: 0,
execution_cost_write_length: 0,
fee_rate: '50',
is_unanchored: false,
microblock_canonical: true,
microblock_hash: '0x123466',
microblock_sequence: 0,
nonce: 0,
parent_block_hash: '0x123456',
parent_burn_block_time: 94869286,
parent_burn_block_time_iso: '1973-01-03T00:34:46.000Z',
post_condition_mode: 'allow',
post_conditions: [],
sender_address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
sponsored: false,
token_transfer: {
amount: '100',
memo: '0x',
recipient_address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
},
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
tx_index: 0,
tx_result: {
hex: '0x0703',
repr: '(ok true)',
},
tx_status: 'success',
tx_type: 'token_transfer',
},
});
const microblock = new TestMicroblockStreamBuilder()
@@ -295,6 +344,56 @@ describe('websocket notifications', () => {
tx_id: '0x8913',
tx_status: 'success',
tx_type: 'token_transfer',
stx_received: '150',
stx_sent: '200',
stx_transfers: [
{
amount: '150',
recipient: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
sender: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
},
],
tx: {
anchor_mode: 'any',
block_hash: '0x123456',
block_height: 2,
burn_block_time: 94869286,
burn_block_time_iso: '1973-01-03T00:34:46.000Z',
canonical: true,
event_count: 0,
events: [],
execution_cost_read_count: 0,
execution_cost_read_length: 0,
execution_cost_runtime: 0,
execution_cost_write_count: 0,
execution_cost_write_length: 0,
fee_rate: '50',
is_unanchored: false,
microblock_canonical: true,
microblock_hash: '0x11',
microblock_sequence: 0,
nonce: 0,
parent_block_hash: '0x01',
parent_burn_block_time: 94869286,
parent_burn_block_time_iso: '1973-01-03T00:34:46.000Z',
post_condition_mode: 'allow',
post_conditions: [],
sender_address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
sponsored: false,
token_transfer: {
amount: '150',
memo: '0x',
recipient_address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
},
tx_id: '0x8913',
tx_index: 0,
tx_result: {
hex: '0x0703',
repr: '(ok true)',
},
tx_status: 'success',
tx_type: 'token_transfer',
},
});
} finally {
await client.call('unsubscribe', subParams);
@@ -349,6 +448,15 @@ describe('websocket notifications', () => {
expect(txUpdate1).toEqual({
address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
balance: '100',
burnchain_lock_height: 0,
burnchain_unlock_height: 0,
lock_height: 0,
lock_tx_id: '',
locked: '0',
total_fees_sent: '0',
total_miner_rewards_received: '0',
total_received: '100',
total_sent: '0',
});
const unsubscribeResult = await client.call('unsubscribe', subParams1);
@@ -387,6 +495,50 @@ describe('websocket notifications', () => {
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
tx_status: 'success',
tx_type: 'token_transfer',
stx_received: '0',
stx_sent: '50',
stx_transfers: [],
tx: {
anchor_mode: 'any',
block_hash: '0x123456',
block_height: 1,
burn_block_time: 94869286,
burn_block_time_iso: '1973-01-03T00:34:46.000Z',
canonical: true,
event_count: 0,
events: [],
execution_cost_read_count: 0,
execution_cost_read_length: 0,
execution_cost_runtime: 0,
execution_cost_write_count: 0,
execution_cost_write_length: 0,
fee_rate: '50',
is_unanchored: false,
microblock_canonical: true,
microblock_hash: '0x123466',
microblock_sequence: 0,
nonce: 0,
parent_block_hash: '0x123456',
parent_burn_block_time: 94869286,
parent_burn_block_time_iso: '1973-01-03T00:34:46.000Z',
post_condition_mode: 'allow',
post_conditions: [],
sender_address: 'ST3GQB6WGCWKDNFNPSQRV8DY93JN06XPZ2ZE9EVMA',
sponsored: false,
token_transfer: {
amount: '100',
memo: '0x',
recipient_address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
},
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
tx_index: 0,
tx_result: {
hex: '0x0703',
repr: '(ok true)',
},
tx_status: 'success',
tx_type: 'token_transfer',
},
});
await subscription.unsubscribe();
} finally {