mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-05-13 20:16:45 +08:00
fix: incorrect websocket/socket.io transaction updates (#1197)
* feat: draft listener class * refactor: interface draft * feat: socket.io channel * docs: comments * feat: ws-rpc channel * refactor: rename metrics.ts * fix: socket.io tests * fix: websocket tests * fix: unused exports * fix: update on stale mempool tx drop * feat: return complete objects in ws-rpc * test: complete ws-rpc responses * docs: update address tx example * fix: also take client and docs pacakge.json * test: try to fix socket flakyness * test: expect tx status explicitly * fix: notify gc mempool txs outside sql tx
This commit is contained in:
116
client/README.md
116
client/README.md
@@ -287,61 +287,77 @@ Sent every time a transaction is sent or received by a specific Stacks address.
|
||||
Example message if subscribed to updates for an address `SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q`:
|
||||
```json
|
||||
{
|
||||
"tx_id": "0x1f9e737dfbebcb57f0879a44518c1cc909be0ceb8ab0bc9b38ce63e3b6847917",
|
||||
"nonce": 6,
|
||||
"fee_rate": "277600",
|
||||
"sender_address": "SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q",
|
||||
"sponsored": false,
|
||||
"post_condition_mode": "deny",
|
||||
"post_conditions": [
|
||||
{
|
||||
"type": "stx",
|
||||
"condition_code": "sent_less_than_or_equal_to",
|
||||
"amount": "25000000",
|
||||
"principal": {
|
||||
"type_id": "principal_standard",
|
||||
"address": "SP3C5SSYVKPAWTR8Y63CVYBR65GD3MG7K80526D1Q"
|
||||
"tx": {
|
||||
"tx_id": "0x0c818b9af6356a2eb4d64ee1b2490193d97a82392c02e7264e006ae5979aa726",
|
||||
"nonce": 32,
|
||||
"fee_rate": "3000",
|
||||
"sender_address": "SP3BK1NNSWN719Z6KDW05RBGVS940YCN6X84STYPR",
|
||||
"sponsored": false,
|
||||
"post_condition_mode": "deny",
|
||||
"post_conditions": [
|
||||
{
|
||||
"type": "stx",
|
||||
"condition_code": "sent_equal_to",
|
||||
"amount": "4375722",
|
||||
"principal": {
|
||||
"type_id": "principal_contract",
|
||||
"contract_name": "newyorkcitycoin-core-v1",
|
||||
"address": "SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5"
|
||||
}
|
||||
}
|
||||
],
|
||||
"anchor_mode": "any",
|
||||
"is_unanchored": false,
|
||||
"block_hash": "0xe2a811451fed35331cf462a9107e3453fdebba1682dfad83cbbcdc603f644ed3",
|
||||
"parent_block_hash": "0x6d8653da23188d4d78ab9b6448229be68abe1bca001f8c574c094289107bce15",
|
||||
"block_height": 58775,
|
||||
"burn_block_time": 1651720813,
|
||||
"burn_block_time_iso": "2022-05-05T03:20:13.000Z",
|
||||
"parent_burn_block_time": 1651720368,
|
||||
"parent_burn_block_time_iso": "2022-05-05T03:12:48.000Z",
|
||||
"canonical": true,
|
||||
"tx_index": 36,
|
||||
"tx_status": "success",
|
||||
"tx_result": {
|
||||
"hex": "0x0703",
|
||||
"repr": "(ok true)"
|
||||
},
|
||||
"microblock_hash": "",
|
||||
"microblock_sequence": 2147483647,
|
||||
"microblock_canonical": true,
|
||||
"event_count": 1,
|
||||
"events": [],
|
||||
"execution_cost_read_count": 15,
|
||||
"execution_cost_read_length": 31147,
|
||||
"execution_cost_runtime": 81975,
|
||||
"execution_cost_write_count": 2,
|
||||
"execution_cost_write_length": 123,
|
||||
"tx_type": "contract_call",
|
||||
"contract_call": {
|
||||
"contract_id": "SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5.newyorkcitycoin-core-v1",
|
||||
"function_name": "claim-stacking-reward",
|
||||
"function_signature": "(define-public (claim-stacking-reward (targetCycle uint)))",
|
||||
"function_args": [
|
||||
{
|
||||
"hex": "0x0100000000000000000000000000000008",
|
||||
"repr": "u8",
|
||||
"name": "targetCycle",
|
||||
"type": "uint"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"stx_sent": "3000",
|
||||
"stx_received": "4375722",
|
||||
"stx_transfers": [
|
||||
{
|
||||
"type": "non_fungible",
|
||||
"condition_code": "not_sent",
|
||||
"principal": {
|
||||
"type_id": "principal_contract",
|
||||
"contract_name": "stacks-skaters",
|
||||
"address": "SPJW1XE278YMCEYMXB8ZFGJMH8ZVAAEDP2S2PJYG"
|
||||
},
|
||||
"asset": {
|
||||
"contract_name": "stacks-skaters",
|
||||
"asset_name": "stacks-skaters",
|
||||
"contract_address": "SPJW1XE278YMCEYMXB8ZFGJMH8ZVAAEDP2S2PJYG"
|
||||
},
|
||||
"asset_value": {
|
||||
"hex": "0x0100000000000000000000000000000000",
|
||||
"repr": "u0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "stx",
|
||||
"condition_code": "sent_less_than_or_equal_to",
|
||||
"amount": "20000000",
|
||||
"principal": {
|
||||
"type_id": "principal_contract",
|
||||
"contract_name": "stacks-skaters",
|
||||
"address": "SPJW1XE278YMCEYMXB8ZFGJMH8ZVAAEDP2S2PJYG"
|
||||
}
|
||||
"amount": "4375722",
|
||||
"sender": "SP2H8PY27SEZ03MWRKS5XABZYQN17ETGQS3527SA5.newyorkcitycoin-core-v1",
|
||||
"recipient": "SP3BK1NNSWN719Z6KDW05RBGVS940YCN6X84STYPR"
|
||||
}
|
||||
],
|
||||
"anchor_mode": "any",
|
||||
"tx_status": "pending",
|
||||
"receipt_time": 1637172946,
|
||||
"receipt_time_iso": "2021-11-17T18:15:46.000Z",
|
||||
"tx_type": "contract_call",
|
||||
"contract_call": {
|
||||
"contract_id": "SPJW1XE278YMCEYMXB8ZFGJMH8ZVAAEDP2S2PJYG.stacks-skaters",
|
||||
"function_name": "mint",
|
||||
"function_signature": ""
|
||||
}
|
||||
"ft_transfers": [],
|
||||
"nft_transfers": []
|
||||
}
|
||||
```
|
||||
Subscribe via WebSockets:
|
||||
|
||||
@@ -2,11 +2,8 @@ import * as JsonRpcLite from 'jsonrpc-lite';
|
||||
import { EventEmitter } from 'eventemitter3';
|
||||
import {
|
||||
RpcTxUpdateSubscriptionParams,
|
||||
RpcTxUpdateNotificationParams,
|
||||
RpcAddressTxSubscriptionParams,
|
||||
RpcAddressTxNotificationParams,
|
||||
RpcAddressBalanceSubscriptionParams,
|
||||
RpcAddressBalanceNotificationParams,
|
||||
RpcSubscriptionType,
|
||||
Block,
|
||||
RpcBlockSubscriptionParams,
|
||||
@@ -14,6 +11,9 @@ import {
|
||||
Transaction,
|
||||
RpcMicroblockSubscriptionParams,
|
||||
RpcMempoolSubscriptionParams,
|
||||
MempoolTransaction,
|
||||
RpcAddressBalanceNotificationParams,
|
||||
RpcAddressTxNotificationParams,
|
||||
} from '@stacks/stacks-blockchain-api-types';
|
||||
import { BASE_PATH } from '../generated/runtime';
|
||||
|
||||
@@ -35,7 +35,7 @@ export class StacksApiWebSocketClient {
|
||||
block: (event: Block) => void;
|
||||
microblock: (event: Microblock) => void;
|
||||
mempool: (event: Transaction) => void;
|
||||
txUpdate: (event: RpcTxUpdateNotificationParams) => any;
|
||||
txUpdate: (event: Transaction | MempoolTransaction) => any;
|
||||
addressTxUpdate: (event: RpcAddressTxNotificationParams) => void;
|
||||
addressBalanceUpdate: (event: RpcAddressBalanceNotificationParams) => void;
|
||||
}>();
|
||||
@@ -93,10 +93,11 @@ export class StacksApiWebSocketClient {
|
||||
const method = data.method as RpcSubscriptionType;
|
||||
switch (method) {
|
||||
case 'tx_update':
|
||||
this.eventEmitter.emit('txUpdate', data.params as RpcTxUpdateNotificationParams);
|
||||
this.eventEmitter.emit('txUpdate', data.params as (Transaction | MempoolTransaction));
|
||||
break;
|
||||
case 'address_tx_update':
|
||||
this.eventEmitter.emit('addressTxUpdate', data.params as RpcAddressTxNotificationParams);
|
||||
this.eventEmitter.emit('addressTxUpdate',
|
||||
data.params as RpcAddressTxNotificationParams);
|
||||
break;
|
||||
case 'address_balance_update':
|
||||
this.eventEmitter.emit(
|
||||
@@ -171,11 +172,11 @@ export class StacksApiWebSocketClient {
|
||||
|
||||
async subscribeTxUpdates(
|
||||
txId: string,
|
||||
update: (event: RpcTxUpdateNotificationParams) => any
|
||||
update: (event: Transaction | MempoolTransaction) => any
|
||||
): Promise<Subscription> {
|
||||
const params: RpcTxUpdateSubscriptionParams = { event: 'tx_update', tx_id: txId };
|
||||
const subscribed = await this.rpcCall<{ tx_id: string }>('subscribe', params);
|
||||
const listener = (event: RpcTxUpdateNotificationParams) => {
|
||||
const listener = (event: Transaction | MempoolTransaction) => {
|
||||
if (event.tx_id === subscribed.tx_id) {
|
||||
update(event);
|
||||
}
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
{
|
||||
"title": "RpcAddressBalanceNotificationParams",
|
||||
"description": "",
|
||||
"type": "object",
|
||||
"required": [
|
||||
"address",
|
||||
"balance"
|
||||
],
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"address": {
|
||||
"type": "string"
|
||||
},
|
||||
"balance": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,22 @@
|
||||
"enum": ["address_balance_update"]
|
||||
},
|
||||
"params": {
|
||||
"$ref": "./rpc-address-balance-notification-params.schema.json"
|
||||
"title": "RpcAddressBalanceNotificationParams",
|
||||
"allOf": [
|
||||
{
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"required": ["address"],
|
||||
"properties": {
|
||||
"address": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"$ref": "../../api/address/get-address-stx-balance.schema.json"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
{
|
||||
"title": "RpcAddressTxNotificationParams",
|
||||
"description": "",
|
||||
"type": "object",
|
||||
"required": [
|
||||
"address",
|
||||
"tx_id",
|
||||
"tx_type",
|
||||
"tx_status"
|
||||
],
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"address": {
|
||||
"type": "string"
|
||||
},
|
||||
"tx_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"tx_type": {
|
||||
"$ref": "../transactions/transaction-type.schema.json"
|
||||
},
|
||||
"tx_status": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "../transactions/transaction-status.schema.json"
|
||||
},
|
||||
{
|
||||
"$ref": "../mempool-transactions/transaction-status.schema.json"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,31 @@
|
||||
"enum": ["address_tx_update"]
|
||||
},
|
||||
"params": {
|
||||
"$ref": "./rpc-address-tx-notification-params.schema.json"
|
||||
"title": "RpcAddressTxNotificationParams",
|
||||
"allOf": [
|
||||
{
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"required": ["address", "tx_id", "tx_type", "tx_status"],
|
||||
"properties": {
|
||||
"address": {
|
||||
"type": "string"
|
||||
},
|
||||
"tx_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"tx_type": {
|
||||
"$ref": "../transactions/transaction-type.schema.json"
|
||||
},
|
||||
"tx_status": {
|
||||
"$ref": "../transactions/transaction-status.schema.json"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"$ref": "../../entities/address/transaction-with-transfers.schema.json"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
{
|
||||
"title": "RpcTxUpdateNotificationParams",
|
||||
"description": "",
|
||||
"type": "object",
|
||||
"required": [
|
||||
"tx_id",
|
||||
"tx_type",
|
||||
"tx_status"
|
||||
],
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"tx_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"tx_type": {
|
||||
"$ref": "../transactions/transaction-type.schema.json"
|
||||
},
|
||||
"tx_status": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "../transactions/transaction-status.schema.json"
|
||||
},
|
||||
{
|
||||
"$ref": "../mempool-transactions/transaction-status.schema.json"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,14 @@
|
||||
"enum": ["tx_update"]
|
||||
},
|
||||
"params": {
|
||||
"$ref": "./rpc-tx-update-notification-params.schema.json"
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "../transactions/transaction.schema.json"
|
||||
},
|
||||
{
|
||||
"$ref": "../mempool-transactions/transaction.schema.json"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
29
docs/generated.d.ts
vendored
29
docs/generated.d.ts
vendored
@@ -214,11 +214,9 @@ export type SchemaMergeRootStub =
|
||||
| TransactionType
|
||||
| Transaction
|
||||
| InboundStxTransfer
|
||||
| RpcAddressBalanceNotificationParams
|
||||
| RpcAddressBalanceNotificationResponse
|
||||
| RpcAddressBalanceSubscriptionParams
|
||||
| RpcAddressBalanceSubscriptionRequest
|
||||
| RpcAddressTxNotificationParams
|
||||
| RpcAddressTxNotificationResponse
|
||||
| RpcAddressTxSubscriptionParams
|
||||
| RpcAddressTxSubscriptionRequest
|
||||
@@ -232,7 +230,6 @@ export type SchemaMergeRootStub =
|
||||
| RpcMicroblockSubscriptionParams
|
||||
| RpcMicroblockSubscriptionRequest
|
||||
| RpcSubscriptionType
|
||||
| RpcTxUpdateNotificationParams
|
||||
| RpcTxUpdateNotificationResponse
|
||||
| RpcTxUpdateSubscriptionParams
|
||||
| RpcTxUpdateSubscriptionRequest;
|
||||
@@ -703,6 +700,15 @@ export type TransactionMetadata =
|
||||
* String literal of all Stacks 2.0 transaction types
|
||||
*/
|
||||
export type TransactionType = "token_transfer" | "smart_contract" | "contract_call" | "poison_microblock" | "coinbase";
|
||||
export type RpcAddressBalanceNotificationParams = {
|
||||
address: string;
|
||||
} & AddressStxBalanceResponse;
|
||||
export type RpcAddressTxNotificationParams = {
|
||||
address: string;
|
||||
tx_id: string;
|
||||
tx_type: TransactionType;
|
||||
tx_status: TransactionStatus;
|
||||
} & AddressTransactionWithTransfers;
|
||||
export type RpcSubscriptionType =
|
||||
| "tx_update"
|
||||
| "address_tx_update"
|
||||
@@ -3233,10 +3239,6 @@ export interface TransactionNotFound {
|
||||
tx_id: string;
|
||||
};
|
||||
}
|
||||
export interface RpcAddressBalanceNotificationParams {
|
||||
address: string;
|
||||
balance: string;
|
||||
}
|
||||
export interface RpcAddressBalanceNotificationResponse {
|
||||
jsonrpc: "2.0";
|
||||
method: "address_balance_update";
|
||||
@@ -3252,12 +3254,6 @@ export interface RpcAddressBalanceSubscriptionRequest {
|
||||
method: "address_balance_update";
|
||||
params: RpcAddressBalanceSubscriptionParams;
|
||||
}
|
||||
export interface RpcAddressTxNotificationParams {
|
||||
address: string;
|
||||
tx_id: string;
|
||||
tx_type: TransactionType;
|
||||
tx_status: TransactionStatus | MempoolTransactionStatus;
|
||||
}
|
||||
export interface RpcAddressTxNotificationResponse {
|
||||
jsonrpc: "2.0";
|
||||
method: "address_tx_update";
|
||||
@@ -3315,15 +3311,10 @@ export interface RpcMicroblockSubscriptionRequest {
|
||||
method: "microblock";
|
||||
params: RpcMicroblockSubscriptionParams;
|
||||
}
|
||||
export interface RpcTxUpdateNotificationParams {
|
||||
tx_id: string;
|
||||
tx_type: TransactionType;
|
||||
tx_status: TransactionStatus | MempoolTransactionStatus;
|
||||
}
|
||||
export interface RpcTxUpdateNotificationResponse {
|
||||
jsonrpc: "2.0";
|
||||
method: "tx_update";
|
||||
params: RpcTxUpdateNotificationParams;
|
||||
params: Transaction | MempoolTransaction;
|
||||
}
|
||||
export interface RpcTxUpdateSubscriptionParams {
|
||||
event: "tx_update";
|
||||
|
||||
12411
docs/package-lock.json
generated
12411
docs/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -17,18 +17,14 @@ import {
|
||||
BaseTransaction,
|
||||
Block,
|
||||
CoinbaseTransactionMetadata,
|
||||
ContractCallTransaction,
|
||||
ContractCallTransactionMetadata,
|
||||
MempoolContractCallTransaction,
|
||||
MempoolTransaction,
|
||||
MempoolTransactionStatus,
|
||||
Microblock,
|
||||
PoisonMicroblockTransactionMetadata,
|
||||
PostCondition,
|
||||
RosettaBlock,
|
||||
RosettaParentBlockIdentifier,
|
||||
RosettaTransaction,
|
||||
SmartContractTransaction,
|
||||
SmartContractTransactionMetadata,
|
||||
TokenTransferTransactionMetadata,
|
||||
Transaction,
|
||||
@@ -58,24 +54,14 @@ import {
|
||||
DbTx,
|
||||
DbTxStatus,
|
||||
DbTxTypeId,
|
||||
DbSmartContract,
|
||||
DbSearchResultWithMetadata,
|
||||
BaseTx,
|
||||
DbMinerReward,
|
||||
StxUnlockEvent,
|
||||
} from '../../datastore/common';
|
||||
import {
|
||||
unwrapOptional,
|
||||
bufferToHexPrefixString,
|
||||
ElementType,
|
||||
FoundOrNot,
|
||||
hexToBuffer,
|
||||
logger,
|
||||
unixEpochToIso,
|
||||
EMPTY_HASH_256,
|
||||
} from '../../helpers';
|
||||
import { unwrapOptional, FoundOrNot, logger, unixEpochToIso, EMPTY_HASH_256 } from '../../helpers';
|
||||
import { serializePostCondition, serializePostConditionMode } from '../serializers/post-conditions';
|
||||
import { getOperations, parseTransactionMemo, processUnlockingEvents } from '../../rosetta-helpers';
|
||||
import { getOperations, parseTransactionMemo } from '../../rosetta-helpers';
|
||||
import { PgStore } from '../../datastore/pg-store';
|
||||
|
||||
export function parseTxTypeStrings(values: string[]): TransactionType[] {
|
||||
@@ -140,9 +126,7 @@ export function getTxTypeId(typeString: Transaction['tx_type']): DbTxTypeId {
|
||||
}
|
||||
}
|
||||
|
||||
export function getTxStatusString(
|
||||
txStatus: DbTxStatus
|
||||
): TransactionStatus | MempoolTransactionStatus {
|
||||
function getTxStatusString(txStatus: DbTxStatus): TransactionStatus | MempoolTransactionStatus {
|
||||
switch (txStatus) {
|
||||
case DbTxStatus.Pending:
|
||||
return 'pending';
|
||||
@@ -159,6 +143,7 @@ export function getTxStatusString(
|
||||
case DbTxStatus.DroppedTooExpensive:
|
||||
return 'dropped_too_expensive';
|
||||
case DbTxStatus.DroppedStaleGarbageCollect:
|
||||
case DbTxStatus.DroppedApiGarbageCollect:
|
||||
return 'dropped_stale_garbage_collect';
|
||||
default:
|
||||
throw new Error(`Unexpected DbTxStatus: ${txStatus}`);
|
||||
|
||||
@@ -25,8 +25,6 @@ import { createRosettaAccountRouter } from './routes/rosetta/account';
|
||||
import { createRosettaConstructionRouter } from './routes/rosetta/construction';
|
||||
import { apiDocumentationUrl, isProdEnv, logError, logger, LogLevel, waiter } from '../helpers';
|
||||
import { InvalidRequestError } from '../errors';
|
||||
import { createWsRpcRouter } from './routes/ws/ws-rpc';
|
||||
import { createSocketIORouter } from './routes/ws/socket-io';
|
||||
import { createBurnchainRouter } from './routes/burnchain';
|
||||
import { createBnsNamespacesRouter } from './routes/bns/namespaces';
|
||||
import { createBnsPriceRouter } from './routes/bns/pricing';
|
||||
@@ -48,12 +46,12 @@ import * as path from 'path';
|
||||
import * as fs from 'fs';
|
||||
import { PgStore } from '../datastore/pg-store';
|
||||
import { PgWriteStore } from '../datastore/pg-write-store';
|
||||
import { WebSocketTransmitter } from './routes/ws/web-socket-transmitter';
|
||||
|
||||
export interface ApiServer {
|
||||
expressApp: express.Express;
|
||||
server: Server;
|
||||
wss: WebSocket.Server;
|
||||
io: SocketIO.Server;
|
||||
ws: WebSocketTransmitter;
|
||||
address: string;
|
||||
datastore: PgStore;
|
||||
terminate: () => Promise<void>;
|
||||
@@ -347,11 +345,8 @@ export async function startApiServer(opts: {
|
||||
});
|
||||
});
|
||||
|
||||
// Setup socket.io server
|
||||
const io = createSocketIORouter(datastore, server);
|
||||
|
||||
// Setup websockets RPC endpoint
|
||||
const wss = createWsRpcRouter(datastore, server);
|
||||
const ws = new WebSocketTransmitter(datastore, server);
|
||||
ws.connect();
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
try {
|
||||
@@ -368,25 +363,13 @@ export async function startApiServer(opts: {
|
||||
|
||||
const terminate = async () => {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
logger.info('Closing Socket.io server...');
|
||||
io.close(error => {
|
||||
logger.info('Closing WebSocket channels...');
|
||||
ws.close(error => {
|
||||
if (error) {
|
||||
logError('Failed to gracefully close Socket.io server', error);
|
||||
logError('Failed to gracefully close WebSocket channels', error);
|
||||
reject(error);
|
||||
} else {
|
||||
logger.info('API socket.io server closed.');
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
logger.info('Closing WebSocket server...');
|
||||
wss.close(error => {
|
||||
if (error) {
|
||||
logError('Failed to gracefully close WebSocket server.');
|
||||
reject(error);
|
||||
} else {
|
||||
logger.info('WebSocket server closed.');
|
||||
logger.info('API WebSocket channels closed.');
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
@@ -405,14 +388,13 @@ export async function startApiServer(opts: {
|
||||
|
||||
const forceKill = async () => {
|
||||
logger.info('Force closing API server...');
|
||||
const [ioClosePromise, wssClosePromise, serverClosePromise] = [waiter(), waiter(), waiter()];
|
||||
io.close(() => ioClosePromise.finish());
|
||||
wss.close(() => wssClosePromise.finish());
|
||||
const [wsClosePromise, serverClosePromise] = [waiter(), waiter()];
|
||||
ws.close(() => wsClosePromise.finish());
|
||||
server.close(() => serverClosePromise.finish());
|
||||
for (const socket of serverSockets) {
|
||||
socket.destroy();
|
||||
}
|
||||
await Promise.allSettled([ioClosePromise, wssClosePromise, serverClosePromise]);
|
||||
await Promise.allSettled([wsClosePromise, serverClosePromise]);
|
||||
};
|
||||
|
||||
const addr = server.address();
|
||||
@@ -423,8 +405,7 @@ export async function startApiServer(opts: {
|
||||
return {
|
||||
expressApp: app,
|
||||
server: server,
|
||||
wss: wss,
|
||||
io: io,
|
||||
ws: ws,
|
||||
address: addrStr,
|
||||
datastore: datastore,
|
||||
terminate: terminate,
|
||||
|
||||
229
src/api/routes/ws/channels/socket-io-channel.ts
Normal file
229
src/api/routes/ws/channels/socket-io-channel.ts
Normal file
@@ -0,0 +1,229 @@
|
||||
import {
|
||||
AddressStxBalanceTopic,
|
||||
AddressTransactionTopic,
|
||||
ClientToServerMessages,
|
||||
ServerToClientMessages,
|
||||
Topic,
|
||||
} from 'docs/socket-io';
|
||||
import * as http from 'http';
|
||||
import { Server as SocketIOServer } from 'socket.io';
|
||||
import { Adapter } from 'socket.io-adapter';
|
||||
import { isValidTxId } from '../../../../api/query-helpers';
|
||||
import { isProdEnv, isValidPrincipal, logger } from '../../../../helpers';
|
||||
import { WebSocketPrometheus } from '../web-socket-prometheus';
|
||||
import {
|
||||
ListenerType,
|
||||
WebSocketChannel,
|
||||
WebSocketPayload,
|
||||
WebSocketTopics,
|
||||
} from '../web-socket-channel';
|
||||
|
||||
/**
|
||||
* SocketIO channel for sending real time API updates.
|
||||
*/
|
||||
export class SocketIOChannel extends WebSocketChannel {
|
||||
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages>;
|
||||
private adapter?: Adapter;
|
||||
|
||||
constructor(server: http.Server) {
|
||||
super(server);
|
||||
if (isProdEnv) {
|
||||
this.prometheus = new WebSocketPrometheus('socket_io');
|
||||
}
|
||||
}
|
||||
|
||||
connect(): void {
|
||||
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(this.server, {
|
||||
cors: { origin: '*' },
|
||||
});
|
||||
this.io = io;
|
||||
|
||||
io.on('connection', async socket => {
|
||||
logger.info(`[socket.io] new connection: ${socket.id}`);
|
||||
if (socket.handshake.headers['x-forwarded-for']) {
|
||||
this.prometheus?.connect(socket.handshake.headers['x-forwarded-for'] as string);
|
||||
} else {
|
||||
this.prometheus?.connect(socket.handshake.address);
|
||||
}
|
||||
const subscriptions = socket.handshake.query['subscriptions'];
|
||||
if (subscriptions) {
|
||||
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
|
||||
for (const topic of topics) {
|
||||
this.prometheus?.subscribe(socket, topic);
|
||||
await socket.join(topic);
|
||||
}
|
||||
}
|
||||
socket.on('disconnect', reason => {
|
||||
logger.info(`[socket.io] disconnected ${socket.id}: ${reason}`);
|
||||
this.prometheus?.disconnect(socket);
|
||||
});
|
||||
socket.on('subscribe', async (topic, callback) => {
|
||||
if (!this.getInvalidSubscriptionTopics(topic)) {
|
||||
this.prometheus?.subscribe(socket, topic);
|
||||
await socket.join(topic);
|
||||
callback?.(null);
|
||||
}
|
||||
});
|
||||
socket.on('unsubscribe', async (...topics) => {
|
||||
for (const topic of topics) {
|
||||
this.prometheus?.unsubscribe(socket, topic);
|
||||
await socket.leave(topic);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Validate topic subscriptions upon connection.
|
||||
io.use((socket, next) => {
|
||||
const subscriptions = socket.handshake.query['subscriptions'];
|
||||
if (subscriptions) {
|
||||
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
|
||||
const invalidSubs = this.getInvalidSubscriptionTopics(topics as Topic[]);
|
||||
if (invalidSubs) {
|
||||
const error = new Error(`Invalid topic: ${invalidSubs.join(', ')}`);
|
||||
next(error);
|
||||
} else {
|
||||
next();
|
||||
}
|
||||
} else {
|
||||
next();
|
||||
}
|
||||
});
|
||||
|
||||
const adapter = io.of('/').adapter;
|
||||
adapter.on('create-room', room => {
|
||||
logger.info(`[socket.io] room created: ${room}`);
|
||||
});
|
||||
adapter.on('delete-room', room => {
|
||||
logger.info(`[socket.io] room deleted: ${room}`);
|
||||
});
|
||||
adapter.on('join-room', (room, id) => {
|
||||
logger.info(`[socket.io] socket ${id} joined room: ${room}`);
|
||||
});
|
||||
adapter.on('leave-room', (room, id) => {
|
||||
logger.info(`[socket.io] socket ${id} left room: ${room}`);
|
||||
});
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
close(callback?: (err?: Error) => void): void {
|
||||
if (!this.io && callback) {
|
||||
callback();
|
||||
}
|
||||
this.io?.close(callback);
|
||||
this.io = undefined;
|
||||
}
|
||||
|
||||
hasListeners<P extends keyof WebSocketTopics>(
|
||||
topic: P,
|
||||
...args: ListenerType<WebSocketTopics[P]>
|
||||
): boolean {
|
||||
if (!this.adapter) {
|
||||
return false;
|
||||
}
|
||||
switch (topic) {
|
||||
case 'block':
|
||||
return this.adapter.rooms.has('block');
|
||||
case 'microblock':
|
||||
return this.adapter.rooms.has('microblock');
|
||||
case 'mempool':
|
||||
return this.adapter.rooms.has('mempool');
|
||||
case 'transaction': {
|
||||
const [txId] = args as ListenerType<WebSocketTopics['transaction']>;
|
||||
return this.adapter.rooms.has(`transaction:${txId}`);
|
||||
}
|
||||
case 'principalTransactions': {
|
||||
const [principal] = args as ListenerType<WebSocketTopics['principalTransactions']>;
|
||||
return this.adapter.rooms.has(`address-transaction:${principal}`);
|
||||
}
|
||||
case 'principalStxBalance': {
|
||||
const [principal] = args as ListenerType<WebSocketTopics['principalStxBalance']>;
|
||||
return this.adapter.rooms.has(`address-stx-balance:${principal}`);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
send<P extends keyof WebSocketPayload>(
|
||||
payload: P,
|
||||
...args: ListenerType<WebSocketPayload[P]>
|
||||
): void {
|
||||
if (!this.io) {
|
||||
return;
|
||||
}
|
||||
switch (payload) {
|
||||
case 'block': {
|
||||
const [block] = args as ListenerType<WebSocketPayload['block']>;
|
||||
this.prometheus?.sendEvent('block');
|
||||
this.io.to('block').emit('block', block);
|
||||
break;
|
||||
}
|
||||
case 'microblock': {
|
||||
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
|
||||
this.prometheus?.sendEvent('microblock');
|
||||
this.io.to('microblock').emit('microblock', microblock);
|
||||
break;
|
||||
}
|
||||
case 'mempoolTransaction': {
|
||||
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
|
||||
this.prometheus?.sendEvent('mempool');
|
||||
this.io.to('mempool').emit('mempool', tx);
|
||||
break;
|
||||
}
|
||||
case 'transaction': {
|
||||
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
|
||||
this.prometheus?.sendEvent('transaction');
|
||||
this.io.to(`transaction:${tx.tx_id}`).emit('transaction', tx);
|
||||
break;
|
||||
}
|
||||
case 'principalTransaction': {
|
||||
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
|
||||
const topic: AddressTransactionTopic = `address-transaction:${principal}`;
|
||||
this.prometheus?.sendEvent('address-transaction');
|
||||
this.io.to(topic).emit('address-transaction', principal, tx);
|
||||
this.io.to(topic).emit(topic, principal, tx);
|
||||
break;
|
||||
}
|
||||
case 'principalStxBalance': {
|
||||
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
|
||||
const topic: AddressStxBalanceTopic = `address-stx-balance:${principal}`;
|
||||
this.prometheus?.sendEvent('address-stx-balance');
|
||||
this.io.to(topic).emit('address-stx-balance', principal, balance);
|
||||
this.io.to(topic).emit(topic, principal, balance);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private getInvalidSubscriptionTopics(subscriptions: Topic | Topic[]): undefined | string[] {
|
||||
const isSubValid = (sub: Topic): undefined | string => {
|
||||
if (sub.includes(':')) {
|
||||
const txOrAddr = sub.split(':')[0];
|
||||
const value = sub.split(':')[1];
|
||||
switch (txOrAddr) {
|
||||
case 'address-transaction':
|
||||
case 'address-stx-balance':
|
||||
return isValidPrincipal(value) ? undefined : sub;
|
||||
case 'transaction':
|
||||
return isValidTxId(value) ? undefined : sub;
|
||||
default:
|
||||
return sub;
|
||||
}
|
||||
}
|
||||
switch (sub) {
|
||||
case 'block':
|
||||
case 'mempool':
|
||||
case 'microblock':
|
||||
return undefined;
|
||||
default:
|
||||
return sub;
|
||||
}
|
||||
};
|
||||
if (!Array.isArray(subscriptions)) {
|
||||
const invalidSub = isSubValid(subscriptions);
|
||||
return invalidSub ? [invalidSub] : undefined;
|
||||
}
|
||||
const validatedSubs = subscriptions.map(isSubValid);
|
||||
const invalidSubs = validatedSubs.filter(validSub => typeof validSub === 'string');
|
||||
return invalidSubs.length === 0 ? undefined : (invalidSubs as string[]);
|
||||
}
|
||||
}
|
||||
564
src/api/routes/ws/channels/ws-rpc-channel.ts
Normal file
564
src/api/routes/ws/channels/ws-rpc-channel.ts
Normal file
@@ -0,0 +1,564 @@
|
||||
import * as http from 'http';
|
||||
import * as WebSocket from 'ws';
|
||||
import * as net from 'net';
|
||||
import { isProdEnv, isValidPrincipal, logError, normalizeHashString } from '../../../../helpers';
|
||||
import { WebSocketPrometheus } from '../web-socket-prometheus';
|
||||
import {
|
||||
ListenerType,
|
||||
WebSocketChannel,
|
||||
WebSocketPayload,
|
||||
WebSocketTopics,
|
||||
} from '../web-socket-channel';
|
||||
import {
|
||||
JsonRpcError,
|
||||
JsonRpc,
|
||||
IParsedObjectRequest,
|
||||
parse as parseRpcString,
|
||||
error as jsonRpcError,
|
||||
notification as jsonRpcNotification,
|
||||
success as jsonRpcSuccess,
|
||||
} from 'jsonrpc-lite';
|
||||
import {
|
||||
RpcTxUpdateSubscriptionParams,
|
||||
RpcAddressTxSubscriptionParams,
|
||||
RpcAddressBalanceSubscriptionParams,
|
||||
RpcBlockSubscriptionParams,
|
||||
RpcMicroblockSubscriptionParams,
|
||||
RpcMempoolSubscriptionParams,
|
||||
Block,
|
||||
Microblock,
|
||||
MempoolTransaction,
|
||||
Transaction,
|
||||
AddressTransactionWithTransfers,
|
||||
AddressStxBalanceResponse,
|
||||
} from '@stacks/stacks-blockchain-api-types';
|
||||
|
||||
type Subscription =
|
||||
| RpcTxUpdateSubscriptionParams
|
||||
| RpcAddressTxSubscriptionParams
|
||||
| RpcAddressBalanceSubscriptionParams
|
||||
| RpcBlockSubscriptionParams
|
||||
| RpcMicroblockSubscriptionParams
|
||||
| RpcMempoolSubscriptionParams;
|
||||
|
||||
class SubscriptionManager {
|
||||
/**
|
||||
* Key = subscription topic.
|
||||
* Value = clients interested in the subscription topic.
|
||||
*/
|
||||
subscriptions: Map<string, Set<WebSocket>> = new Map();
|
||||
|
||||
// Sockets that are responding to ping.
|
||||
liveSockets: Set<WebSocket> = new Set();
|
||||
heartbeatInterval?: NodeJS.Timeout;
|
||||
readonly heartbeatIntervalMs = 5_000;
|
||||
|
||||
addSubscription(client: WebSocket, topicId: string) {
|
||||
if (this.subscriptions.size === 0) {
|
||||
this.startHeartbeat();
|
||||
}
|
||||
let clients = this.subscriptions.get(topicId);
|
||||
if (!clients) {
|
||||
clients = new Set();
|
||||
this.subscriptions.set(topicId, clients);
|
||||
}
|
||||
clients.add(client);
|
||||
this.liveSockets.add(client);
|
||||
client.on('close', () => {
|
||||
this.removeSubscription(client, topicId);
|
||||
});
|
||||
client.on('pong', () => {
|
||||
this.liveSockets.add(client);
|
||||
});
|
||||
}
|
||||
|
||||
removeSubscription(client: WebSocket, topicId: string) {
|
||||
const clients = this.subscriptions.get(topicId);
|
||||
if (clients) {
|
||||
clients.delete(client);
|
||||
if (clients.size === 0) {
|
||||
this.subscriptions.delete(topicId);
|
||||
if (this.subscriptions.size === 0) {
|
||||
this.stopHeartbeat();
|
||||
}
|
||||
}
|
||||
}
|
||||
this.liveSockets.delete(client);
|
||||
}
|
||||
|
||||
startHeartbeat() {
|
||||
if (this.heartbeatInterval) {
|
||||
return;
|
||||
}
|
||||
this.heartbeatInterval = setInterval(() => {
|
||||
this.subscriptions.forEach((clients, topic) => {
|
||||
clients.forEach(ws => {
|
||||
// Client did not respond to a previous ping, it's dead.
|
||||
if (!this.liveSockets.has(ws)) {
|
||||
this.removeSubscription(ws, topic);
|
||||
return;
|
||||
}
|
||||
// Assume client is dead until it responds to our ping.
|
||||
this.liveSockets.delete(ws);
|
||||
ws.ping();
|
||||
});
|
||||
});
|
||||
}, this.heartbeatIntervalMs);
|
||||
}
|
||||
|
||||
stopHeartbeat() {
|
||||
if (this.heartbeatInterval) {
|
||||
clearInterval(this.heartbeatInterval);
|
||||
this.heartbeatInterval = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
close() {
|
||||
this.subscriptions.clear();
|
||||
this.liveSockets.clear();
|
||||
this.stopHeartbeat();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* WebSocket RPC channel for sending real time API updates.
|
||||
*/
|
||||
export class WsRpcChannel extends WebSocketChannel {
|
||||
private subscriptions = new Map<keyof WebSocketTopics, SubscriptionManager>();
|
||||
private wsServer?: WebSocket.Server;
|
||||
|
||||
constructor(server: http.Server) {
|
||||
super(server);
|
||||
if (isProdEnv) {
|
||||
this.prometheus = new WebSocketPrometheus('websocket');
|
||||
}
|
||||
}
|
||||
|
||||
connect(): void {
|
||||
// Use `noServer` and the `upgrade` event to prevent the ws lib from hijacking the http.Server error event
|
||||
const wsPath = '/extended/v1/ws';
|
||||
const wsServer = new WebSocket.Server({ noServer: true, path: wsPath });
|
||||
this.server.on('upgrade', (request: http.IncomingMessage, socket, head) => {
|
||||
if (request.url?.startsWith(wsPath)) {
|
||||
wsServer.handleUpgrade(request, socket as net.Socket, head, ws => {
|
||||
wsServer.emit('connection', ws, request);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
this.subscriptions.set('block', new SubscriptionManager());
|
||||
this.subscriptions.set('microblock', new SubscriptionManager());
|
||||
this.subscriptions.set('mempool', new SubscriptionManager());
|
||||
this.subscriptions.set('transaction', new SubscriptionManager());
|
||||
this.subscriptions.set('principalTransactions', new SubscriptionManager());
|
||||
this.subscriptions.set('principalStxBalance', new SubscriptionManager());
|
||||
|
||||
wsServer.on('connection', (clientSocket, req) => {
|
||||
if (req.headers['x-forwarded-for']) {
|
||||
this.prometheus?.connect(req.headers['x-forwarded-for'] as string);
|
||||
} else if (req.socket.remoteAddress) {
|
||||
this.prometheus?.connect(req.socket.remoteAddress);
|
||||
}
|
||||
clientSocket.on('message', data => {
|
||||
this.handleClientMessage(clientSocket, data);
|
||||
});
|
||||
clientSocket.on('close', (_: WebSocket) => {
|
||||
this.prometheus?.disconnect(clientSocket);
|
||||
});
|
||||
});
|
||||
wsServer.on('close', (_: WebSocket.Server) => {
|
||||
this.subscriptions.forEach(manager => manager.close());
|
||||
});
|
||||
|
||||
this.wsServer = wsServer;
|
||||
}
|
||||
|
||||
close(callback?: (err?: Error) => void): void {
|
||||
if (!this.wsServer && callback) {
|
||||
callback();
|
||||
}
|
||||
this.wsServer?.close(callback);
|
||||
this.wsServer = undefined;
|
||||
}
|
||||
|
||||
hasListeners<P extends keyof WebSocketTopics>(
|
||||
topic: P,
|
||||
...args: ListenerType<WebSocketTopics[P]>
|
||||
): boolean {
|
||||
const manager = this.subscriptions.get(topic);
|
||||
if (!this.wsServer || !manager) {
|
||||
return false;
|
||||
}
|
||||
switch (topic) {
|
||||
case 'block':
|
||||
return manager.subscriptions.get('block') !== undefined;
|
||||
case 'microblock':
|
||||
return manager.subscriptions.get('microblock') !== undefined;
|
||||
case 'mempool':
|
||||
return manager.subscriptions.get('mempool') !== undefined;
|
||||
case 'transaction': {
|
||||
const [txId] = args as ListenerType<WebSocketTopics['transaction']>;
|
||||
return manager.subscriptions.get(txId) !== undefined;
|
||||
}
|
||||
case 'principalTransactions': {
|
||||
const [principal] = args as ListenerType<WebSocketTopics['principalTransactions']>;
|
||||
return manager.subscriptions.get(principal) !== undefined;
|
||||
}
|
||||
case 'principalStxBalance': {
|
||||
const [principal] = args as ListenerType<WebSocketTopics['principalStxBalance']>;
|
||||
return manager.subscriptions.get(principal) !== undefined;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
send<P extends keyof WebSocketPayload>(
|
||||
payload: P,
|
||||
...args: ListenerType<WebSocketPayload[P]>
|
||||
): void {
|
||||
if (!this.wsServer) {
|
||||
return;
|
||||
}
|
||||
switch (payload) {
|
||||
case 'block': {
|
||||
const [block] = args as ListenerType<WebSocketPayload['block']>;
|
||||
this.processBlockUpdate(block);
|
||||
break;
|
||||
}
|
||||
case 'microblock': {
|
||||
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
|
||||
this.processMicroblockUpdate(microblock);
|
||||
break;
|
||||
}
|
||||
case 'mempoolTransaction': {
|
||||
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
|
||||
this.processMempoolUpdate(tx);
|
||||
break;
|
||||
}
|
||||
case 'transaction': {
|
||||
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
|
||||
this.processTxUpdate(tx);
|
||||
break;
|
||||
}
|
||||
case 'principalTransaction': {
|
||||
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
|
||||
this.processAddressUpdate(principal, tx);
|
||||
break;
|
||||
}
|
||||
case 'principalStxBalance': {
|
||||
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
|
||||
this.processAddressBalanceUpdate(principal, balance);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private handleClientMessage(client: WebSocket, data: WebSocket.Data) {
|
||||
try {
|
||||
if (typeof data !== 'string') {
|
||||
throw JsonRpcError.parseError(`unexpected data type: ${data.constructor.name}`);
|
||||
}
|
||||
const parsedRpcReq = parseRpcString(data);
|
||||
const isBatchRequest = Array.isArray(parsedRpcReq);
|
||||
let rpcReqs = Array.isArray(parsedRpcReq) ? parsedRpcReq : [parsedRpcReq];
|
||||
|
||||
// Ignore client notifications, spec dictates server should never respond to these.
|
||||
rpcReqs = rpcReqs.filter(req => req.type !== 'notification');
|
||||
|
||||
const responses: JsonRpc[] = rpcReqs.map(rpcReq => {
|
||||
switch (rpcReq.type) {
|
||||
case 'request':
|
||||
return this.handleClientRpcReq(client, rpcReq);
|
||||
case 'error':
|
||||
return jsonRpcError(
|
||||
rpcReq.payload.id,
|
||||
JsonRpcError.invalidRequest('unexpected error msg from client')
|
||||
);
|
||||
case 'success':
|
||||
return jsonRpcError(
|
||||
rpcReq.payload.id,
|
||||
JsonRpcError.invalidRequest('unexpected success msg from client')
|
||||
);
|
||||
case 'invalid':
|
||||
return jsonRpcError(null as any, rpcReq.payload);
|
||||
default:
|
||||
return jsonRpcError(
|
||||
null as any,
|
||||
JsonRpcError.invalidRequest('unexpected msg type from client')
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
if (isBatchRequest) {
|
||||
client.send(JSON.stringify(responses));
|
||||
} else if (responses.length === 1) {
|
||||
client.send(responses[0].serialize());
|
||||
}
|
||||
} catch (err: any) {
|
||||
// Response `id` is null for invalid JSON requests (or other errors where the request ID isn't known).
|
||||
try {
|
||||
const res = err instanceof JsonRpcError ? err : JsonRpcError.internalError(err.toString());
|
||||
this.sendRpcResponse(client, jsonRpcError(null as any, res));
|
||||
} catch (error) {
|
||||
// ignore any errors here
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private sendRpcResponse(client: WebSocket, res: JsonRpc) {
|
||||
client.send(res.serialize());
|
||||
}
|
||||
|
||||
/** Route supported RPC methods */
|
||||
private handleClientRpcReq(client: WebSocket, req: IParsedObjectRequest): JsonRpc {
|
||||
switch (req.payload.method) {
|
||||
case 'subscribe':
|
||||
return this.handleClientSubscription(client, req, true);
|
||||
case 'unsubscribe':
|
||||
return this.handleClientSubscription(client, req, false);
|
||||
default:
|
||||
return jsonRpcError(req.payload.id, JsonRpcError.methodNotFound(null));
|
||||
}
|
||||
}
|
||||
|
||||
/** Route supported subscription events */
|
||||
private handleClientSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
subscribe: boolean
|
||||
): JsonRpc {
|
||||
const params = req.payload.params as Subscription;
|
||||
if (!params || !params.event) {
|
||||
return jsonRpcError(
|
||||
req.payload.id,
|
||||
JsonRpcError.invalidParams('subscription requests must include an event name')
|
||||
);
|
||||
}
|
||||
switch (params.event) {
|
||||
case 'tx_update':
|
||||
return this.handleTxUpdateSubscription(client, req, params, subscribe);
|
||||
case 'address_tx_update':
|
||||
return this.handleAddressTxUpdateSubscription(client, req, params, subscribe);
|
||||
case 'address_balance_update':
|
||||
return this.handleAddressBalanceUpdateSubscription(client, req, params, subscribe);
|
||||
case 'block':
|
||||
return this.handleBlockUpdateSubscription(client, req, params, subscribe);
|
||||
case 'microblock':
|
||||
return this.handleMicroblockUpdateSubscription(client, req, params, subscribe);
|
||||
case 'mempool':
|
||||
return this.handleMempoolUpdateSubscription(client, req, params, subscribe);
|
||||
default:
|
||||
return jsonRpcError(
|
||||
req.payload.id,
|
||||
JsonRpcError.invalidParams('subscription request must use a valid event name')
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/** Process client request for tx update notifications */
|
||||
private handleTxUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcTxUpdateSubscriptionParams,
|
||||
subscribe: boolean
|
||||
): JsonRpc {
|
||||
const txId = normalizeHashString(params.tx_id);
|
||||
if (!txId) {
|
||||
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid tx_id'));
|
||||
}
|
||||
if (subscribe) {
|
||||
this.subscriptions.get('transaction')?.addSubscription(client, txId);
|
||||
this.prometheus?.subscribe(client, `transaction:${txId}`);
|
||||
} else {
|
||||
this.subscriptions.get('transaction')?.removeSubscription(client, txId);
|
||||
this.prometheus?.unsubscribe(client, `transaction:${txId}`);
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, { tx_id: txId });
|
||||
}
|
||||
|
||||
/** Process client request for address tx update notifications */
|
||||
private handleAddressTxUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcAddressTxSubscriptionParams,
|
||||
subscribe: boolean
|
||||
): JsonRpc {
|
||||
const address = params.address;
|
||||
if (!isValidPrincipal(address)) {
|
||||
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid address'));
|
||||
}
|
||||
if (subscribe) {
|
||||
this.subscriptions.get('principalTransactions')?.addSubscription(client, address);
|
||||
this.prometheus?.subscribe(client, `address-transaction:${address}`);
|
||||
} else {
|
||||
this.subscriptions.get('principalTransactions')?.removeSubscription(client, address);
|
||||
this.prometheus?.unsubscribe(client, `address-transaction:${address}`);
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, { address: address });
|
||||
}
|
||||
|
||||
private handleAddressBalanceUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcAddressBalanceSubscriptionParams,
|
||||
subscribe: boolean
|
||||
): JsonRpc {
|
||||
const address = params.address;
|
||||
if (!isValidPrincipal(address)) {
|
||||
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid address'));
|
||||
}
|
||||
if (subscribe) {
|
||||
this.subscriptions.get('principalStxBalance')?.addSubscription(client, address);
|
||||
this.prometheus?.subscribe(client, `address-stx-balance:${address}`);
|
||||
} else {
|
||||
this.subscriptions.get('principalStxBalance')?.removeSubscription(client, address);
|
||||
this.prometheus?.unsubscribe(client, `address-stx-balance:${address}`);
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, { address: address });
|
||||
}
|
||||
|
||||
private handleBlockUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcBlockSubscriptionParams,
|
||||
subscribe: boolean
|
||||
) {
|
||||
if (subscribe) {
|
||||
this.subscriptions.get('block')?.addSubscription(client, params.event);
|
||||
this.prometheus?.subscribe(client, 'block');
|
||||
} else {
|
||||
this.subscriptions.get('block')?.removeSubscription(client, params.event);
|
||||
this.prometheus?.unsubscribe(client, 'block');
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, {});
|
||||
}
|
||||
|
||||
private handleMicroblockUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcMicroblockSubscriptionParams,
|
||||
subscribe: boolean
|
||||
) {
|
||||
if (subscribe) {
|
||||
this.subscriptions.get('microblock')?.addSubscription(client, params.event);
|
||||
this.prometheus?.subscribe(client, 'microblock');
|
||||
} else {
|
||||
this.subscriptions.get('microblock')?.removeSubscription(client, params.event);
|
||||
this.prometheus?.unsubscribe(client, 'microblock');
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, {});
|
||||
}
|
||||
|
||||
private handleMempoolUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcMempoolSubscriptionParams,
|
||||
subscribe: boolean
|
||||
) {
|
||||
if (subscribe) {
|
||||
this.subscriptions.get('mempool')?.addSubscription(client, params.event);
|
||||
this.prometheus?.subscribe(client, 'mempool');
|
||||
} else {
|
||||
this.subscriptions.get('mempool')?.removeSubscription(client, params.event);
|
||||
this.prometheus?.unsubscribe(client, 'mempool');
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, {});
|
||||
}
|
||||
|
||||
private processTxUpdate(tx: Transaction | MempoolTransaction) {
|
||||
try {
|
||||
const subscribers = this.subscriptions.get('transaction')?.subscriptions.get(tx.tx_id);
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification('tx_update', tx).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
this.prometheus?.sendEvent('transaction');
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket tx update for ${tx.tx_id}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
private processAddressUpdate(principal: string, tx: AddressTransactionWithTransfers) {
|
||||
try {
|
||||
const subscribers = this.subscriptions
|
||||
.get('principalTransactions')
|
||||
?.subscriptions.get(principal);
|
||||
if (subscribers) {
|
||||
const updateNotification = {
|
||||
address: principal,
|
||||
tx_id: tx.tx.tx_id,
|
||||
tx_status: tx.tx.tx_status,
|
||||
tx_type: tx.tx.tx_type,
|
||||
...tx,
|
||||
};
|
||||
const rpcNotificationPayload = jsonRpcNotification(
|
||||
'address_tx_update',
|
||||
updateNotification
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
this.prometheus?.sendEvent('address-transaction');
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket address tx updates to ${principal}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
private processAddressBalanceUpdate(principal: string, balance: AddressStxBalanceResponse) {
|
||||
const subscribers = this.subscriptions.get('principalStxBalance')?.subscriptions.get(principal);
|
||||
if (subscribers) {
|
||||
try {
|
||||
const balanceNotification = {
|
||||
address: principal,
|
||||
...balance,
|
||||
};
|
||||
const rpcNotificationPayload = jsonRpcNotification(
|
||||
'address_balance_update',
|
||||
balanceNotification
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
this.prometheus?.sendEvent('address-stx-balance');
|
||||
} catch (error) {
|
||||
logError(`error sending websocket stx balance update to ${principal}`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private processBlockUpdate(block: Block) {
|
||||
try {
|
||||
const subscribers = this.subscriptions.get('block')?.subscriptions.get('block');
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification('block', block).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
this.prometheus?.sendEvent('block');
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket block updates`, error);
|
||||
}
|
||||
}
|
||||
|
||||
private processMicroblockUpdate(microblock: Microblock) {
|
||||
try {
|
||||
const subscribers = this.subscriptions.get('microblock')?.subscriptions.get('microblock');
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification('microblock', microblock).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
this.prometheus?.sendEvent('microblock');
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket microblock updates`, error);
|
||||
}
|
||||
}
|
||||
|
||||
private processMempoolUpdate(transaction: MempoolTransaction) {
|
||||
try {
|
||||
const subscribers = this.subscriptions.get('mempool')?.subscriptions.get('mempool');
|
||||
if (subscribers) {
|
||||
const rpcNotificationPayload = jsonRpcNotification('mempool', transaction).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
this.prometheus?.sendEvent('mempool');
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket mempool updates`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,281 +0,0 @@
|
||||
import { Server as SocketIOServer } from 'socket.io';
|
||||
import * as http from 'http';
|
||||
import {
|
||||
AddressStxBalanceResponse,
|
||||
AddressStxBalanceTopic,
|
||||
AddressTransactionTopic,
|
||||
AddressTransactionWithTransfers,
|
||||
ClientToServerMessages,
|
||||
Topic,
|
||||
ServerToClientMessages,
|
||||
} from '@stacks/stacks-blockchain-api-types';
|
||||
import {
|
||||
getBlockFromDataStore,
|
||||
getMempoolTxsFromDataStore,
|
||||
getMicroblockFromDataStore,
|
||||
getTxFromDataStore,
|
||||
parseDbTx,
|
||||
} from '../../controllers/db-controller';
|
||||
import { isProdEnv, isValidPrincipal, logError, logger } from '../../../helpers';
|
||||
import { WebSocketPrometheus } from './metrics';
|
||||
import { PgStore } from '../../../datastore/pg-store';
|
||||
import { isValidTxId } from '../../../api/query-helpers';
|
||||
|
||||
function getInvalidSubscriptionTopics(subscriptions: Topic | Topic[]): undefined | string[] {
|
||||
const isSubValid = (sub: Topic): undefined | string => {
|
||||
if (sub.includes(':')) {
|
||||
const txOrAddr = sub.split(':')[0];
|
||||
const value = sub.split(':')[1];
|
||||
switch (txOrAddr) {
|
||||
case 'address-transaction':
|
||||
case 'address-stx-balance':
|
||||
return isValidPrincipal(value) ? undefined : sub;
|
||||
case 'transaction':
|
||||
return isValidTxId(value) ? undefined : sub;
|
||||
default:
|
||||
return sub;
|
||||
}
|
||||
}
|
||||
switch (sub) {
|
||||
case 'block':
|
||||
case 'mempool':
|
||||
case 'microblock':
|
||||
return undefined;
|
||||
default:
|
||||
return sub;
|
||||
}
|
||||
};
|
||||
if (!Array.isArray(subscriptions)) {
|
||||
const invalidSub = isSubValid(subscriptions);
|
||||
return invalidSub ? [invalidSub] : undefined;
|
||||
}
|
||||
const validatedSubs = subscriptions.map(isSubValid);
|
||||
const invalidSubs = validatedSubs.filter(validSub => typeof validSub === 'string');
|
||||
return invalidSubs.length === 0 ? undefined : (invalidSubs as string[]);
|
||||
}
|
||||
|
||||
export function createSocketIORouter(db: PgStore, server: http.Server) {
|
||||
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(server, {
|
||||
cors: { origin: '*' },
|
||||
});
|
||||
let prometheus: WebSocketPrometheus | null;
|
||||
if (isProdEnv) {
|
||||
prometheus = new WebSocketPrometheus('socket_io');
|
||||
}
|
||||
|
||||
io.on('connection', async socket => {
|
||||
logger.info(`[socket.io] new connection: ${socket.id}`);
|
||||
if (socket.handshake.headers['x-forwarded-for']) {
|
||||
prometheus?.connect(socket.handshake.headers['x-forwarded-for'] as string);
|
||||
} else {
|
||||
prometheus?.connect(socket.handshake.address);
|
||||
}
|
||||
const subscriptions = socket.handshake.query['subscriptions'];
|
||||
if (subscriptions) {
|
||||
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
|
||||
for (const topic of topics) {
|
||||
prometheus?.subscribe(socket, topic);
|
||||
await socket.join(topic);
|
||||
}
|
||||
}
|
||||
|
||||
socket.on('disconnect', reason => {
|
||||
logger.info(`[socket.io] disconnected ${socket.id}: ${reason}`);
|
||||
prometheus?.disconnect(socket);
|
||||
});
|
||||
socket.on('subscribe', async (topic, callback) => {
|
||||
if (!getInvalidSubscriptionTopics(topic)) {
|
||||
prometheus?.subscribe(socket, topic);
|
||||
await socket.join(topic);
|
||||
callback?.(null);
|
||||
}
|
||||
});
|
||||
socket.on('unsubscribe', async (...topics) => {
|
||||
for (const topic of topics) {
|
||||
prometheus?.unsubscribe(socket, topic);
|
||||
await socket.leave(topic);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Middleware checks for the invalid topic subscriptions and terminates connection if found any
|
||||
io.use((socket, next) => {
|
||||
const subscriptions = socket.handshake.query['subscriptions'];
|
||||
if (subscriptions) {
|
||||
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
|
||||
const invalidSubs = getInvalidSubscriptionTopics(topics as Topic[]);
|
||||
if (invalidSubs) {
|
||||
const error = new Error(`Invalid topic: ${invalidSubs.join(', ')}`);
|
||||
next(error);
|
||||
} else {
|
||||
next();
|
||||
}
|
||||
} else {
|
||||
next();
|
||||
}
|
||||
});
|
||||
|
||||
const adapter = io.of('/').adapter;
|
||||
|
||||
adapter.on('create-room', room => {
|
||||
logger.info(`[socket.io] room created: ${room}`);
|
||||
});
|
||||
|
||||
adapter.on('delete-room', room => {
|
||||
logger.info(`[socket.io] room deleted: ${room}`);
|
||||
});
|
||||
|
||||
adapter.on('join-room', (room, id) => {
|
||||
logger.info(`[socket.io] socket ${id} joined room: ${room}`);
|
||||
});
|
||||
|
||||
adapter.on('leave-room', (room, id) => {
|
||||
logger.info(`[socket.io] socket ${id} left room: ${room}`);
|
||||
});
|
||||
|
||||
db.eventEmitter.on('blockUpdate', async blockHash => {
|
||||
// Only parse and emit data if there are currently subscriptions to the blocks topic
|
||||
const blockTopic: Topic = 'block';
|
||||
if (adapter.rooms.has(blockTopic)) {
|
||||
const blockQuery = await getBlockFromDataStore({ blockIdentifer: { hash: blockHash }, db });
|
||||
if (!blockQuery.found) {
|
||||
return;
|
||||
}
|
||||
const block = blockQuery.result;
|
||||
prometheus?.sendEvent('block');
|
||||
io.to(blockTopic).emit('block', block);
|
||||
}
|
||||
});
|
||||
|
||||
db.eventEmitter.on('microblockUpdate', async microblockHash => {
|
||||
const microblockTopic: Topic = 'microblock';
|
||||
if (adapter.rooms.has(microblockTopic)) {
|
||||
const microblockQuery = await getMicroblockFromDataStore({
|
||||
db: db,
|
||||
microblockHash: microblockHash,
|
||||
});
|
||||
if (!microblockQuery.found) {
|
||||
return;
|
||||
}
|
||||
const microblock = microblockQuery.result;
|
||||
prometheus?.sendEvent('microblock');
|
||||
io.to(microblockTopic).emit('microblock', microblock);
|
||||
}
|
||||
});
|
||||
|
||||
db.eventEmitter.on('txUpdate', async txId => {
|
||||
// Mempool updates
|
||||
const mempoolTopic: Topic = 'mempool';
|
||||
if (adapter.rooms.has(mempoolTopic)) {
|
||||
const mempoolTxs = await getMempoolTxsFromDataStore(db, {
|
||||
txIds: [txId],
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (mempoolTxs.length > 0) {
|
||||
const mempoolTx = mempoolTxs[0];
|
||||
prometheus?.sendEvent('mempool');
|
||||
io.to(mempoolTopic).emit('mempool', mempoolTx);
|
||||
}
|
||||
}
|
||||
|
||||
// Individual tx updates
|
||||
const txTopic: Topic = `transaction:${txId}`;
|
||||
if (adapter.rooms.has(txTopic)) {
|
||||
const mempoolTxs = await getMempoolTxsFromDataStore(db, {
|
||||
txIds: [txId],
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (mempoolTxs.length > 0) {
|
||||
prometheus?.sendEvent('transaction');
|
||||
io.to(mempoolTopic).emit('transaction', mempoolTxs[0]);
|
||||
} else {
|
||||
const txQuery = await getTxFromDataStore(db, {
|
||||
txId: txId,
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (txQuery.found) {
|
||||
prometheus?.sendEvent('transaction');
|
||||
io.to(mempoolTopic).emit('transaction', txQuery.result);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
db.eventEmitter.on('addressUpdate', async (address, blockHeight) => {
|
||||
const addrTxTopic: AddressTransactionTopic = `address-transaction:${address}` as const;
|
||||
const addrStxBalanceTopic: AddressStxBalanceTopic = `address-stx-balance:${address}` as const;
|
||||
if (!adapter.rooms.has(addrTxTopic) && !adapter.rooms.has(addrStxBalanceTopic)) {
|
||||
return;
|
||||
}
|
||||
const dbTxsQuery = await db.getAddressTxsWithAssetTransfers({
|
||||
stxAddress: address,
|
||||
blockHeight: blockHeight,
|
||||
atSingleBlock: true,
|
||||
});
|
||||
if (dbTxsQuery.total == 0) {
|
||||
return;
|
||||
}
|
||||
const addressTxs = dbTxsQuery.results;
|
||||
|
||||
// Address txs updates
|
||||
if (adapter.rooms.has(addrTxTopic)) {
|
||||
addressTxs.forEach(addressTx => {
|
||||
const parsedTx = parseDbTx(addressTx.tx);
|
||||
const result: AddressTransactionWithTransfers = {
|
||||
tx: parsedTx,
|
||||
stx_sent: addressTx.stx_sent.toString(),
|
||||
stx_received: addressTx.stx_received.toString(),
|
||||
stx_transfers: addressTx.stx_transfers.map(value => {
|
||||
return {
|
||||
amount: value.amount.toString(),
|
||||
sender: value.sender,
|
||||
recipient: value.recipient,
|
||||
};
|
||||
}),
|
||||
};
|
||||
prometheus?.sendEvent('address-transaction');
|
||||
io.to(addrTxTopic).emit('address-transaction', address, result);
|
||||
io.to(addrTxTopic).emit(addrTxTopic, address, result);
|
||||
});
|
||||
}
|
||||
|
||||
// Address STX balance updates
|
||||
if (adapter.rooms.has(addrStxBalanceTopic)) {
|
||||
// Get latest balance (in case multiple txs come in from different blocks)
|
||||
const blockHeights = addressTxs.map(tx => tx.tx.block_height);
|
||||
const latestBlock = Math.max(...blockHeights);
|
||||
getAddressStxBalance(address, latestBlock)
|
||||
.then(balance => {
|
||||
prometheus?.sendEvent('address-stx-balance');
|
||||
io.to(addrStxBalanceTopic).emit('address-stx-balance', address, balance);
|
||||
io.to(addrStxBalanceTopic).emit(addrStxBalanceTopic, address, balance);
|
||||
})
|
||||
.catch(error => {
|
||||
logError(`[socket.io] Error querying STX balance update for ${address}`, error);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
async function getAddressStxBalance(address: string, blockHeight: number) {
|
||||
const stxBalanceResult = await db.getStxBalanceAtBlock(address, blockHeight);
|
||||
const tokenOfferingLocked = await db.getTokenOfferingLocked(address, blockHeight);
|
||||
const result: AddressStxBalanceResponse = {
|
||||
balance: stxBalanceResult.balance.toString(),
|
||||
total_sent: stxBalanceResult.totalSent.toString(),
|
||||
total_received: stxBalanceResult.totalReceived.toString(),
|
||||
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
|
||||
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
|
||||
lock_tx_id: stxBalanceResult.lockTxId,
|
||||
locked: stxBalanceResult.locked.toString(),
|
||||
lock_height: stxBalanceResult.lockHeight,
|
||||
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
|
||||
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
|
||||
};
|
||||
if (tokenOfferingLocked.found) {
|
||||
result.token_offering_locked = tokenOfferingLocked.result;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
return io;
|
||||
}
|
||||
76
src/api/routes/ws/web-socket-channel.ts
Normal file
76
src/api/routes/ws/web-socket-channel.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import * as http from 'http';
|
||||
import {
|
||||
AddressStxBalanceResponse,
|
||||
AddressTransactionWithTransfers,
|
||||
Block,
|
||||
MempoolTransaction,
|
||||
Microblock,
|
||||
Transaction,
|
||||
} from 'docs/generated';
|
||||
import { WebSocketPrometheus } from './web-socket-prometheus';
|
||||
|
||||
/**
|
||||
* Topics that external API users may subscribe to when looking for real time updates.
|
||||
* Not to be confused with `PgStoreEventEmitter` internal events or `WebSocketPayload` messages.
|
||||
*/
|
||||
export type WebSocketTopics = {
|
||||
block: () => void;
|
||||
microblock: () => void;
|
||||
mempool: () => void;
|
||||
transaction: (txId: string) => void;
|
||||
principalTransactions: (principal: string) => void;
|
||||
principalStxBalance: (principal: string) => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* Payloads that can be sent to external API users depending on which `WebSocketTopics` they are
|
||||
* subscribed to. Each of these contains a full database object relevant to the response so each
|
||||
* channel may format the output according to its needs.
|
||||
*/
|
||||
export type WebSocketPayload = {
|
||||
block: (block: Block) => void;
|
||||
microblock: (microblock: Microblock) => void;
|
||||
mempoolTransaction: (transaction: MempoolTransaction) => void;
|
||||
transaction: (transaction: Transaction | MempoolTransaction) => void;
|
||||
principalTransaction: (principal: string, transaction: AddressTransactionWithTransfers) => void;
|
||||
principalStxBalance: (principal: string, stxBalance: AddressStxBalanceResponse) => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* Type hack taken from https://github.com/bterlson/strict-event-emitter-types to dynamically set
|
||||
* the argument types depending on the selected topic or payload.
|
||||
*/
|
||||
export type ListenerType<T> = [T] extends [(...args: infer U) => any]
|
||||
? U
|
||||
: [T] extends [void]
|
||||
? []
|
||||
: [T];
|
||||
|
||||
/**
|
||||
* A channel that accepts user subscriptions to real time updates and responds with relevant
|
||||
* payloads through WebSockets (or its flavors like Socket.IO).
|
||||
*/
|
||||
export abstract class WebSocketChannel {
|
||||
readonly server: http.Server;
|
||||
protected prometheus?: WebSocketPrometheus;
|
||||
|
||||
constructor(server: http.Server) {
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
abstract connect(): void;
|
||||
|
||||
abstract close(callback?: (err?: Error | undefined) => void): void;
|
||||
|
||||
/** Checks if the channel has listeners for the specified topic */
|
||||
abstract hasListeners<P extends keyof WebSocketTopics>(
|
||||
topic: P,
|
||||
...args: ListenerType<WebSocketTopics[P]>
|
||||
): boolean;
|
||||
|
||||
/** Sends a payload through the channel */
|
||||
abstract send<P extends keyof WebSocketPayload>(
|
||||
payload: P,
|
||||
...args: ListenerType<WebSocketPayload[P]>
|
||||
): void;
|
||||
}
|
||||
170
src/api/routes/ws/web-socket-transmitter.ts
Normal file
170
src/api/routes/ws/web-socket-transmitter.ts
Normal file
@@ -0,0 +1,170 @@
|
||||
import * as http from 'http';
|
||||
import { AddressStxBalanceResponse, AddressTransactionWithTransfers } from 'docs/generated';
|
||||
import {
|
||||
getBlockFromDataStore,
|
||||
getMempoolTxsFromDataStore,
|
||||
getMicroblockFromDataStore,
|
||||
getTxFromDataStore,
|
||||
parseDbTx,
|
||||
} from '../../controllers/db-controller';
|
||||
import { PgStore } from '../../../datastore/pg-store';
|
||||
import { WebSocketChannel } from './web-socket-channel';
|
||||
import { SocketIOChannel } from './channels/socket-io-channel';
|
||||
import { WsRpcChannel } from './channels/ws-rpc-channel';
|
||||
|
||||
/**
|
||||
* This object matches real time update `WebSocketTopics` subscriptions with internal
|
||||
* `PgStoreEventEmitter` notifications. If a match is found, the relevant data is queried from the
|
||||
* database and returned to users using all available WebSocket channels.
|
||||
*/
|
||||
export class WebSocketTransmitter {
|
||||
readonly db: PgStore;
|
||||
readonly server: http.Server;
|
||||
private channels: WebSocketChannel[] = [];
|
||||
|
||||
constructor(db: PgStore, server: http.Server) {
|
||||
this.db = db;
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
connect() {
|
||||
this.db.eventEmitter.addListener('blockUpdate', blockHash => this.blockUpdate(blockHash));
|
||||
this.db.eventEmitter.addListener('microblockUpdate', microblockHash =>
|
||||
this.microblockUpdate(microblockHash)
|
||||
);
|
||||
this.db.eventEmitter.addListener('txUpdate', txId => this.txUpdate(txId));
|
||||
this.db.eventEmitter.addListener('addressUpdate', (address, blockHeight) =>
|
||||
this.addressUpdate(address, blockHeight)
|
||||
);
|
||||
|
||||
this.channels.push(new SocketIOChannel(this.server));
|
||||
this.channels.push(new WsRpcChannel(this.server));
|
||||
this.channels.forEach(c => c.connect());
|
||||
}
|
||||
|
||||
close(callback: (err?: Error | undefined) => void) {
|
||||
Promise.all(
|
||||
this.channels.map(
|
||||
c =>
|
||||
new Promise<void>((resolve, reject) => {
|
||||
c.close(error => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
})
|
||||
)
|
||||
)
|
||||
.then(_ => callback())
|
||||
.catch(error => callback(error));
|
||||
}
|
||||
|
||||
private async blockUpdate(blockHash: string) {
|
||||
if (this.channels.find(c => c.hasListeners('block'))) {
|
||||
const blockQuery = await getBlockFromDataStore({
|
||||
blockIdentifer: { hash: blockHash },
|
||||
db: this.db,
|
||||
});
|
||||
if (blockQuery.found) {
|
||||
this.channels.forEach(c => c.send('block', blockQuery.result));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async microblockUpdate(microblockHash: string) {
|
||||
if (this.channels.find(c => c.hasListeners('microblock'))) {
|
||||
const microblockQuery = await getMicroblockFromDataStore({
|
||||
db: this.db,
|
||||
microblockHash: microblockHash,
|
||||
});
|
||||
if (microblockQuery.found) {
|
||||
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async txUpdate(txId: string) {
|
||||
if (this.channels.find(c => c.hasListeners('mempool'))) {
|
||||
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
|
||||
txIds: [txId],
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (mempoolTxs.length > 0) {
|
||||
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
|
||||
}
|
||||
}
|
||||
|
||||
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
|
||||
// Look at the `txs` table first so we always prefer the confirmed transaction.
|
||||
const txQuery = await getTxFromDataStore(this.db, {
|
||||
txId: txId,
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (txQuery.found) {
|
||||
this.channels.forEach(c => c.send('transaction', txQuery.result));
|
||||
} else {
|
||||
// Tx is not yet confirmed, look at `mempool_txs`.
|
||||
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
|
||||
txIds: [txId],
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (mempoolTxs.length > 0) {
|
||||
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async addressUpdate(address: string, blockHeight: number) {
|
||||
if (this.channels.find(c => c.hasListeners('principalTransactions', address))) {
|
||||
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
|
||||
stxAddress: address,
|
||||
blockHeight: blockHeight,
|
||||
atSingleBlock: true,
|
||||
});
|
||||
if (dbTxsQuery.total == 0) {
|
||||
return;
|
||||
}
|
||||
const addressTxs = dbTxsQuery.results;
|
||||
for (const addressTx of addressTxs) {
|
||||
const parsedTx = parseDbTx(addressTx.tx);
|
||||
const result: AddressTransactionWithTransfers = {
|
||||
tx: parsedTx,
|
||||
stx_sent: addressTx.stx_sent.toString(),
|
||||
stx_received: addressTx.stx_received.toString(),
|
||||
stx_transfers: addressTx.stx_transfers.map(value => {
|
||||
return {
|
||||
amount: value.amount.toString(),
|
||||
sender: value.sender,
|
||||
recipient: value.recipient,
|
||||
};
|
||||
}),
|
||||
};
|
||||
this.channels.forEach(c => c.send('principalTransaction', address, result));
|
||||
}
|
||||
}
|
||||
|
||||
if (this.channels.find(c => c.hasListeners('principalStxBalance', address))) {
|
||||
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
|
||||
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
|
||||
const balance: AddressStxBalanceResponse = {
|
||||
balance: stxBalanceResult.balance.toString(),
|
||||
total_sent: stxBalanceResult.totalSent.toString(),
|
||||
total_received: stxBalanceResult.totalReceived.toString(),
|
||||
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
|
||||
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
|
||||
lock_tx_id: stxBalanceResult.lockTxId,
|
||||
locked: stxBalanceResult.locked.toString(),
|
||||
lock_height: stxBalanceResult.lockHeight,
|
||||
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
|
||||
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
|
||||
};
|
||||
if (tokenOfferingLocked.found) {
|
||||
balance.token_offering_locked = tokenOfferingLocked.result;
|
||||
}
|
||||
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,559 +0,0 @@
|
||||
import {
|
||||
JsonRpcError,
|
||||
JsonRpc,
|
||||
IParsedObjectRequest,
|
||||
parse as parseRpcString,
|
||||
error as jsonRpcError,
|
||||
notification as jsonRpcNotification,
|
||||
success as jsonRpcSuccess,
|
||||
} from 'jsonrpc-lite';
|
||||
import * as WebSocket from 'ws';
|
||||
import * as http from 'http';
|
||||
import * as net from 'net';
|
||||
import PQueue from 'p-queue';
|
||||
import {
|
||||
RpcTxUpdateSubscriptionParams,
|
||||
RpcAddressTxSubscriptionParams,
|
||||
RpcAddressBalanceSubscriptionParams,
|
||||
RpcAddressBalanceNotificationParams,
|
||||
RpcAddressTxNotificationParams,
|
||||
RpcBlockSubscriptionParams,
|
||||
RpcMicroblockSubscriptionParams,
|
||||
RpcMempoolSubscriptionParams,
|
||||
RpcTxUpdateNotificationParams,
|
||||
} from '@stacks/stacks-blockchain-api-types';
|
||||
|
||||
import { DbTx, DbMempoolTx } from '../../../datastore/common';
|
||||
import { normalizeHashString, logError, isValidPrincipal, isProdEnv } from '../../../helpers';
|
||||
import {
|
||||
getBlockFromDataStore,
|
||||
getMempoolTxsFromDataStore,
|
||||
getMicroblockFromDataStore,
|
||||
getTxStatusString,
|
||||
getTxTypeString,
|
||||
} from '../../controllers/db-controller';
|
||||
import { WebSocketPrometheus } from './metrics';
|
||||
import { PgStore } from '../../../datastore/pg-store';
|
||||
|
||||
type Subscription =
|
||||
| RpcTxUpdateSubscriptionParams
|
||||
| RpcAddressTxSubscriptionParams
|
||||
| RpcAddressBalanceSubscriptionParams
|
||||
| RpcBlockSubscriptionParams
|
||||
| RpcMicroblockSubscriptionParams
|
||||
| RpcMempoolSubscriptionParams;
|
||||
|
||||
class SubscriptionManager {
|
||||
/**
|
||||
* Key = subscription topic.
|
||||
* Value = clients interested in the subscription topic.
|
||||
*/
|
||||
subscriptions: Map<string, Set<WebSocket>> = new Map();
|
||||
|
||||
// Sockets that are responding to ping.
|
||||
liveSockets: Set<WebSocket> = new Set();
|
||||
heartbeatInterval?: NodeJS.Timeout;
|
||||
readonly heartbeatIntervalMs = 5_000;
|
||||
|
||||
addSubscription(client: WebSocket, topicId: string) {
|
||||
if (this.subscriptions.size === 0) {
|
||||
this.startHeartbeat();
|
||||
}
|
||||
let clients = this.subscriptions.get(topicId);
|
||||
if (!clients) {
|
||||
clients = new Set();
|
||||
this.subscriptions.set(topicId, clients);
|
||||
}
|
||||
clients.add(client);
|
||||
this.liveSockets.add(client);
|
||||
client.on('close', () => {
|
||||
this.removeSubscription(client, topicId);
|
||||
});
|
||||
client.on('pong', () => {
|
||||
this.liveSockets.add(client);
|
||||
});
|
||||
}
|
||||
|
||||
removeSubscription(client: WebSocket, topicId: string) {
|
||||
const clients = this.subscriptions.get(topicId);
|
||||
if (clients) {
|
||||
clients.delete(client);
|
||||
if (clients.size === 0) {
|
||||
this.subscriptions.delete(topicId);
|
||||
if (this.subscriptions.size === 0) {
|
||||
this.stopHeartbeat();
|
||||
}
|
||||
}
|
||||
}
|
||||
this.liveSockets.delete(client);
|
||||
}
|
||||
|
||||
startHeartbeat() {
|
||||
if (this.heartbeatInterval) {
|
||||
return;
|
||||
}
|
||||
this.heartbeatInterval = setInterval(() => {
|
||||
this.subscriptions.forEach((clients, topic) => {
|
||||
clients.forEach(ws => {
|
||||
// Client did not respond to a previous ping, it's dead.
|
||||
if (!this.liveSockets.has(ws)) {
|
||||
this.removeSubscription(ws, topic);
|
||||
return;
|
||||
}
|
||||
// Assume client is dead until it responds to our ping.
|
||||
this.liveSockets.delete(ws);
|
||||
ws.ping();
|
||||
});
|
||||
});
|
||||
}, this.heartbeatIntervalMs);
|
||||
}
|
||||
|
||||
stopHeartbeat() {
|
||||
if (this.heartbeatInterval) {
|
||||
clearInterval(this.heartbeatInterval);
|
||||
this.heartbeatInterval = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
close() {
|
||||
this.subscriptions.clear();
|
||||
this.liveSockets.clear();
|
||||
this.stopHeartbeat();
|
||||
}
|
||||
}
|
||||
|
||||
export function createWsRpcRouter(db: PgStore, server: http.Server): WebSocket.Server {
|
||||
let prometheus: WebSocketPrometheus | null;
|
||||
if (isProdEnv) {
|
||||
prometheus = new WebSocketPrometheus('websocket');
|
||||
}
|
||||
|
||||
// Use `noServer` and the `upgrade` event to prevent the ws lib from hijacking the http.Server error event
|
||||
const wsPath = '/extended/v1/ws';
|
||||
const wsServer = new WebSocket.Server({ noServer: true, path: wsPath });
|
||||
server.on('upgrade', (request: http.IncomingMessage, socket, head) => {
|
||||
if (request.url?.startsWith(wsPath)) {
|
||||
wsServer.handleUpgrade(request, socket as net.Socket, head, ws => {
|
||||
wsServer.emit('connection', ws, request);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const txUpdateSubscriptions = new SubscriptionManager();
|
||||
const addressTxUpdateSubscriptions = new SubscriptionManager();
|
||||
const addressBalanceUpdateSubscriptions = new SubscriptionManager();
|
||||
const blockSubscriptions = new SubscriptionManager();
|
||||
const microblockSubscriptions = new SubscriptionManager();
|
||||
const mempoolSubscriptions = new SubscriptionManager();
|
||||
|
||||
function handleClientMessage(client: WebSocket, data: WebSocket.Data) {
|
||||
try {
|
||||
if (typeof data !== 'string') {
|
||||
throw JsonRpcError.parseError(`unexpected data type: ${data.constructor.name}`);
|
||||
}
|
||||
const parsedRpcReq = parseRpcString(data);
|
||||
const isBatchRequest = Array.isArray(parsedRpcReq);
|
||||
let rpcReqs = Array.isArray(parsedRpcReq) ? parsedRpcReq : [parsedRpcReq];
|
||||
|
||||
// Ignore client notifications, spec dictates server should never respond to these.
|
||||
rpcReqs = rpcReqs.filter(req => req.type !== 'notification');
|
||||
|
||||
const responses: JsonRpc[] = rpcReqs.map(rpcReq => {
|
||||
switch (rpcReq.type) {
|
||||
case 'request':
|
||||
return handleClientRpcReq(client, rpcReq);
|
||||
case 'error':
|
||||
return jsonRpcError(
|
||||
rpcReq.payload.id,
|
||||
JsonRpcError.invalidRequest('unexpected error msg from client')
|
||||
);
|
||||
case 'success':
|
||||
return jsonRpcError(
|
||||
rpcReq.payload.id,
|
||||
JsonRpcError.invalidRequest('unexpected success msg from client')
|
||||
);
|
||||
case 'invalid':
|
||||
return jsonRpcError(null as any, rpcReq.payload);
|
||||
default:
|
||||
return jsonRpcError(
|
||||
null as any,
|
||||
JsonRpcError.invalidRequest('unexpected msg type from client')
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
if (isBatchRequest) {
|
||||
client.send(JSON.stringify(responses));
|
||||
} else if (responses.length === 1) {
|
||||
client.send(responses[0].serialize());
|
||||
}
|
||||
} catch (err: any) {
|
||||
// Response `id` is null for invalid JSON requests (or other errors where the request ID isn't known).
|
||||
try {
|
||||
const res = err instanceof JsonRpcError ? err : JsonRpcError.internalError(err.toString());
|
||||
sendRpcResponse(client, jsonRpcError(null as any, res));
|
||||
} catch (error) {
|
||||
// ignore any errors here
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function sendRpcResponse(client: WebSocket, res: JsonRpc) {
|
||||
client.send(res.serialize());
|
||||
}
|
||||
|
||||
/** Route supported RPC methods */
|
||||
function handleClientRpcReq(client: WebSocket, req: IParsedObjectRequest): JsonRpc {
|
||||
switch (req.payload.method) {
|
||||
case 'subscribe':
|
||||
return handleClientSubscription(client, req, true);
|
||||
case 'unsubscribe':
|
||||
return handleClientSubscription(client, req, false);
|
||||
default:
|
||||
return jsonRpcError(req.payload.id, JsonRpcError.methodNotFound(null));
|
||||
}
|
||||
}
|
||||
|
||||
/** Route supported subscription events */
|
||||
function handleClientSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
subscribe: boolean
|
||||
): JsonRpc {
|
||||
const params = req.payload.params as Subscription;
|
||||
if (!params || !params.event) {
|
||||
return jsonRpcError(
|
||||
req.payload.id,
|
||||
JsonRpcError.invalidParams('subscription requests must include an event name')
|
||||
);
|
||||
}
|
||||
switch (params.event) {
|
||||
case 'tx_update':
|
||||
return handleTxUpdateSubscription(client, req, params, subscribe);
|
||||
case 'address_tx_update':
|
||||
return handleAddressTxUpdateSubscription(client, req, params, subscribe);
|
||||
case 'address_balance_update':
|
||||
return handleAddressBalanceUpdateSubscription(client, req, params, subscribe);
|
||||
case 'block':
|
||||
return handleBlockUpdateSubscription(client, req, params, subscribe);
|
||||
case 'microblock':
|
||||
return handleMicroblockUpdateSubscription(client, req, params, subscribe);
|
||||
case 'mempool':
|
||||
return handleMempoolUpdateSubscription(client, req, params, subscribe);
|
||||
default:
|
||||
return jsonRpcError(
|
||||
req.payload.id,
|
||||
JsonRpcError.invalidParams('subscription request must use a valid event name')
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/** Process client request for tx update notifications */
|
||||
function handleTxUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcTxUpdateSubscriptionParams,
|
||||
subscribe: boolean
|
||||
): JsonRpc {
|
||||
const txId = normalizeHashString(params.tx_id);
|
||||
if (!txId) {
|
||||
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid tx_id'));
|
||||
}
|
||||
if (subscribe) {
|
||||
txUpdateSubscriptions.addSubscription(client, txId);
|
||||
prometheus?.subscribe(client, `transaction:${txId}`);
|
||||
} else {
|
||||
txUpdateSubscriptions.removeSubscription(client, txId);
|
||||
prometheus?.unsubscribe(client, `transaction:${txId}`);
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, { tx_id: txId });
|
||||
}
|
||||
|
||||
/** Process client request for address tx update notifications */
|
||||
function handleAddressTxUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcAddressTxSubscriptionParams,
|
||||
subscribe: boolean
|
||||
): JsonRpc {
|
||||
const address = params.address;
|
||||
if (!isValidPrincipal(address)) {
|
||||
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid address'));
|
||||
}
|
||||
if (subscribe) {
|
||||
addressTxUpdateSubscriptions.addSubscription(client, address);
|
||||
prometheus?.subscribe(client, `address-transaction:${address}`);
|
||||
} else {
|
||||
addressTxUpdateSubscriptions.removeSubscription(client, address);
|
||||
prometheus?.unsubscribe(client, `address-transaction:${address}`);
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, { address: address });
|
||||
}
|
||||
|
||||
function handleAddressBalanceUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcAddressBalanceSubscriptionParams,
|
||||
subscribe: boolean
|
||||
): JsonRpc {
|
||||
const address = params.address;
|
||||
if (!isValidPrincipal(address)) {
|
||||
return jsonRpcError(req.payload.id, JsonRpcError.invalidParams('invalid address'));
|
||||
}
|
||||
if (subscribe) {
|
||||
addressBalanceUpdateSubscriptions.addSubscription(client, address);
|
||||
prometheus?.subscribe(client, `address-stx-balance:${address}`);
|
||||
} else {
|
||||
addressBalanceUpdateSubscriptions.removeSubscription(client, address);
|
||||
prometheus?.unsubscribe(client, `address-stx-balance:${address}`);
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, { address: address });
|
||||
}
|
||||
|
||||
function handleBlockUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcBlockSubscriptionParams,
|
||||
subscribe: boolean
|
||||
) {
|
||||
if (subscribe) {
|
||||
blockSubscriptions.addSubscription(client, params.event);
|
||||
prometheus?.subscribe(client, 'block');
|
||||
} else {
|
||||
blockSubscriptions.removeSubscription(client, params.event);
|
||||
prometheus?.unsubscribe(client, 'block');
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, {});
|
||||
}
|
||||
|
||||
function handleMicroblockUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcMicroblockSubscriptionParams,
|
||||
subscribe: boolean
|
||||
) {
|
||||
if (subscribe) {
|
||||
microblockSubscriptions.addSubscription(client, params.event);
|
||||
prometheus?.subscribe(client, 'microblock');
|
||||
} else {
|
||||
microblockSubscriptions.removeSubscription(client, params.event);
|
||||
prometheus?.unsubscribe(client, 'microblock');
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, {});
|
||||
}
|
||||
|
||||
function handleMempoolUpdateSubscription(
|
||||
client: WebSocket,
|
||||
req: IParsedObjectRequest,
|
||||
params: RpcMempoolSubscriptionParams,
|
||||
subscribe: boolean
|
||||
) {
|
||||
if (subscribe) {
|
||||
mempoolSubscriptions.addSubscription(client, params.event);
|
||||
prometheus?.subscribe(client, 'mempool');
|
||||
} else {
|
||||
mempoolSubscriptions.removeSubscription(client, params.event);
|
||||
prometheus?.unsubscribe(client, 'mempool');
|
||||
}
|
||||
return jsonRpcSuccess(req.payload.id, {});
|
||||
}
|
||||
|
||||
async function processTxUpdate(txId: string) {
|
||||
try {
|
||||
const subscribers = txUpdateSubscriptions.subscriptions.get(txId);
|
||||
if (subscribers) {
|
||||
let tx: DbTx | DbMempoolTx; // Tx updates can come from both mempool and mined txs.
|
||||
const dbMempoolTxQuery = await db.getMempoolTx({
|
||||
txId: txId,
|
||||
includeUnanchored: true,
|
||||
includePruned: true,
|
||||
});
|
||||
if (dbMempoolTxQuery.found) {
|
||||
tx = dbMempoolTxQuery.result;
|
||||
} else {
|
||||
const dbTxQuery = await db.getTx({ txId: txId, includeUnanchored: true });
|
||||
if (dbTxQuery.found) {
|
||||
tx = dbTxQuery.result;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
const updateNotification: RpcTxUpdateNotificationParams = {
|
||||
tx_id: tx.tx_id,
|
||||
tx_status: getTxStatusString(tx.status),
|
||||
tx_type: getTxTypeString(tx.type_id),
|
||||
};
|
||||
const rpcNotificationPayload = jsonRpcNotification(
|
||||
'tx_update',
|
||||
updateNotification
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('transaction');
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket tx update for ${txId}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
async function processAddressUpdate(address: string, blockHeight: number) {
|
||||
try {
|
||||
const subscribers = addressTxUpdateSubscriptions.subscriptions.get(address);
|
||||
if (subscribers) {
|
||||
const dbTxsQuery = await db.getAddressTxsWithAssetTransfers({
|
||||
stxAddress: address,
|
||||
blockHeight: blockHeight,
|
||||
atSingleBlock: true,
|
||||
});
|
||||
if (dbTxsQuery.total == 0) {
|
||||
return;
|
||||
}
|
||||
const addressTxs = dbTxsQuery.results;
|
||||
addressTxs.forEach(tx => {
|
||||
const updateNotification: RpcAddressTxNotificationParams = {
|
||||
address: address,
|
||||
tx_id: tx.tx.tx_id,
|
||||
tx_status: getTxStatusString(tx.tx.status),
|
||||
tx_type: getTxTypeString(tx.tx.type_id),
|
||||
};
|
||||
const rpcNotificationPayload = jsonRpcNotification(
|
||||
'address_tx_update',
|
||||
updateNotification
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('address-transaction');
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket address tx updates to ${address}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
// Queue to process balance update notifications
|
||||
const addrBalanceProcessorQueue = new PQueue({ concurrency: 1 });
|
||||
|
||||
async function processAddressBalanceUpdate(address: string) {
|
||||
const subscribers = addressBalanceUpdateSubscriptions.subscriptions.get(address);
|
||||
if (subscribers) {
|
||||
await addrBalanceProcessorQueue.add(async () => {
|
||||
try {
|
||||
const balance = await db.getStxBalance({
|
||||
stxAddress: address,
|
||||
includeUnanchored: true,
|
||||
});
|
||||
const balanceNotification: RpcAddressBalanceNotificationParams = {
|
||||
address: address,
|
||||
balance: balance.balance.toString(),
|
||||
};
|
||||
const rpcNotificationPayload = jsonRpcNotification(
|
||||
'address_balance_update',
|
||||
balanceNotification
|
||||
).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('address-stx-balance');
|
||||
} catch (error) {
|
||||
logError(`error sending websocket stx balance update to ${address}`, error);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function processBlockUpdate(blockHash: string) {
|
||||
try {
|
||||
const subscribers = blockSubscriptions.subscriptions.get('block');
|
||||
if (subscribers) {
|
||||
const blockQuery = await getBlockFromDataStore({ blockIdentifer: { hash: blockHash }, db });
|
||||
if (blockQuery.found) {
|
||||
const block = blockQuery.result;
|
||||
const rpcNotificationPayload = jsonRpcNotification('block', block).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('block');
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket block updates`, error);
|
||||
}
|
||||
}
|
||||
|
||||
async function processMicroblockUpdate(microblockHash: string) {
|
||||
try {
|
||||
const subscribers = microblockSubscriptions.subscriptions.get('microblock');
|
||||
if (subscribers) {
|
||||
const microblockQuery = await getMicroblockFromDataStore({
|
||||
microblockHash: microblockHash,
|
||||
db,
|
||||
});
|
||||
if (microblockQuery.found) {
|
||||
const microblock = microblockQuery.result;
|
||||
const rpcNotificationPayload = jsonRpcNotification('microblock', microblock).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('microblock');
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket microblock updates`, error);
|
||||
}
|
||||
}
|
||||
|
||||
async function processMempoolUpdate(txId: string) {
|
||||
try {
|
||||
const subscribers = mempoolSubscriptions.subscriptions.get('mempool');
|
||||
if (subscribers) {
|
||||
const mempoolTxs = await getMempoolTxsFromDataStore(db, {
|
||||
txIds: [txId],
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (mempoolTxs.length > 0) {
|
||||
const mempoolTx = mempoolTxs[0];
|
||||
const rpcNotificationPayload = jsonRpcNotification('mempool', mempoolTx).serialize();
|
||||
subscribers.forEach(client => client.send(rpcNotificationPayload));
|
||||
prometheus?.sendEvent('mempool');
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logError(`error sending websocket mempool updates`, error);
|
||||
}
|
||||
}
|
||||
|
||||
db.eventEmitter.addListener('txUpdate', async txId => {
|
||||
await processTxUpdate(txId);
|
||||
await processMempoolUpdate(txId);
|
||||
});
|
||||
|
||||
db.eventEmitter.addListener('addressUpdate', async (address, blockHeight) => {
|
||||
await processAddressUpdate(address, blockHeight);
|
||||
await processAddressBalanceUpdate(address);
|
||||
});
|
||||
|
||||
db.eventEmitter.addListener('blockUpdate', async blockHash => {
|
||||
await processBlockUpdate(blockHash);
|
||||
});
|
||||
|
||||
db.eventEmitter.addListener('microblockUpdate', async microblockHash => {
|
||||
await processMicroblockUpdate(microblockHash);
|
||||
});
|
||||
|
||||
wsServer.on('connection', (clientSocket, req) => {
|
||||
if (req.headers['x-forwarded-for']) {
|
||||
prometheus?.connect(req.headers['x-forwarded-for'] as string);
|
||||
} else if (req.socket.remoteAddress) {
|
||||
prometheus?.connect(req.socket.remoteAddress);
|
||||
}
|
||||
clientSocket.on('message', data => {
|
||||
handleClientMessage(clientSocket, data);
|
||||
});
|
||||
clientSocket.on('close', (_: WebSocket) => {
|
||||
prometheus?.disconnect(clientSocket);
|
||||
});
|
||||
});
|
||||
|
||||
wsServer.on('close', (_: WebSocket.Server) => {
|
||||
txUpdateSubscriptions.close();
|
||||
addressTxUpdateSubscriptions.close();
|
||||
addressBalanceUpdateSubscriptions.close();
|
||||
blockSubscriptions.close();
|
||||
microblockSubscriptions.close();
|
||||
mempoolSubscriptions.close();
|
||||
});
|
||||
|
||||
return wsServer;
|
||||
}
|
||||
@@ -938,6 +938,7 @@ export class PgStore {
|
||||
DbTxStatus.DroppedReplaceAcrossFork,
|
||||
DbTxStatus.DroppedTooExpensive,
|
||||
DbTxStatus.DroppedStaleGarbageCollect,
|
||||
DbTxStatus.DroppedApiGarbageCollect,
|
||||
];
|
||||
const resultQuery = await sql<(MempoolTxQueryResult & { count: number })[]>`
|
||||
SELECT ${unsafeCols(sql, [
|
||||
|
||||
@@ -187,6 +187,7 @@ export class PgWriteStore extends PgStore {
|
||||
|
||||
async update(data: DataStoreBlockUpdateData): Promise<void> {
|
||||
const tokenMetadataQueueEntries: DbTokenMetadataQueueEntry[] = [];
|
||||
let garbageCollectedMempoolTxs: string[] = [];
|
||||
await this.sql.begin(async sql => {
|
||||
const chainTip = await this.getChainTip(sql);
|
||||
await this.handleReorg(sql, data.block, chainTip.blockHeight);
|
||||
@@ -359,10 +360,13 @@ export class PgWriteStore extends PgStore {
|
||||
}
|
||||
await this.refreshNftCustody(sql, batchedTxData);
|
||||
await this.refreshMaterializedView(sql, 'chain_tip');
|
||||
const deletedMempoolTxs = await this.deleteGarbageCollectedMempoolTxs(sql);
|
||||
if (deletedMempoolTxs.deletedTxs.length > 0) {
|
||||
logger.verbose(`Garbage collected ${deletedMempoolTxs.deletedTxs.length} mempool txs`);
|
||||
const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql);
|
||||
if (mempoolGarbageResults.deletedTxs.length > 0) {
|
||||
logger.verbose(
|
||||
`Garbage collected ${mempoolGarbageResults.deletedTxs.length} mempool txs`
|
||||
);
|
||||
}
|
||||
garbageCollectedMempoolTxs = mempoolGarbageResults.deletedTxs;
|
||||
|
||||
const tokenContractDeployments = data.txs
|
||||
.filter(entry => entry.tx.type_id === DbTxTypeId.SmartContract)
|
||||
@@ -397,6 +401,9 @@ export class PgWriteStore extends PgStore {
|
||||
for (const tx of data.txs) {
|
||||
await this.notifier.sendTx({ txId: tx.tx.tx_id });
|
||||
}
|
||||
for (const txId of garbageCollectedMempoolTxs) {
|
||||
await this.notifier?.sendTx({ txId: txId });
|
||||
}
|
||||
await this.emitAddressTxUpdates(data.txs);
|
||||
for (const tokenMetadataQueueEntry of tokenMetadataQueueEntries) {
|
||||
await this.notifier.sendTokenMetadata({ queueId: tokenMetadataQueueEntry.queueId });
|
||||
|
||||
@@ -671,9 +671,6 @@ export async function getOrAddAsync<K, V>(
|
||||
return val;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
export type ElementType<T extends any[]> = T extends (infer U)[] ? U : never;
|
||||
|
||||
export type FoundOrNot<T> = { found: true; result: T } | { found: false; result?: T };
|
||||
|
||||
export function timeout(ms: number, abortController?: AbortController): Promise<void> {
|
||||
|
||||
@@ -158,7 +158,7 @@ export async function getOperations(
|
||||
return operations;
|
||||
}
|
||||
|
||||
export function processUnlockingEvents(events: StxUnlockEvent[], operations: RosettaOperation[]) {
|
||||
function processUnlockingEvents(events: StxUnlockEvent[], operations: RosettaOperation[]) {
|
||||
events.forEach(event => {
|
||||
operations.push(makeStakeUnlockOperation(event, operations.length));
|
||||
});
|
||||
|
||||
@@ -216,7 +216,7 @@ function testTx(args?: TestTxArgs): DataStoreTxEventData {
|
||||
microblock_hash: args?.microblock_hash ?? MICROBLOCK_HASH,
|
||||
token_transfer_amount: args?.token_transfer_amount ?? TOKEN_TRANSFER_AMOUNT,
|
||||
token_transfer_recipient_address: args?.token_transfer_recipient_address ?? RECIPIENT_ADDRESS,
|
||||
token_transfer_memo: args?.token_transfer_memo,
|
||||
token_transfer_memo: args?.token_transfer_memo ?? '',
|
||||
smart_contract_contract_id: args?.smart_contract_contract_id,
|
||||
smart_contract_source_code: args?.smart_contract_source_code,
|
||||
execution_cost_read_count: 0,
|
||||
|
||||
@@ -103,12 +103,15 @@ describe('socket-io', () => {
|
||||
});
|
||||
const mempoolWaiter: Waiter<MempoolTransaction> = waiter();
|
||||
const txWaiters: Waiter<MempoolTransaction | Transaction>[] = [waiter(), waiter()];
|
||||
let txWaiterIndex = 0;
|
||||
socket.on('mempool', tx => {
|
||||
mempoolWaiter.finish(tx);
|
||||
});
|
||||
socket.on('transaction', tx => {
|
||||
txWaiters[txWaiterIndex++].finish(tx);
|
||||
if (tx.tx_status === 'pending') {
|
||||
txWaiters[0].finish(tx);
|
||||
} else {
|
||||
txWaiters[1].finish(tx);
|
||||
}
|
||||
});
|
||||
|
||||
const block = new TestBlockBuilder().addTx().build();
|
||||
@@ -116,23 +119,23 @@ describe('socket-io', () => {
|
||||
|
||||
const mempoolTx = testMempoolTx({ tx_id: '0x01', status: DbTxStatus.Pending });
|
||||
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
|
||||
const mempoolResult = await mempoolWaiter;
|
||||
const txResult = await txWaiters[0];
|
||||
|
||||
const microblock = new TestMicroblockStreamBuilder()
|
||||
.addMicroblock()
|
||||
.addTx({ tx_id: '0x01' })
|
||||
.build();
|
||||
await db.updateMicroblocks(microblock);
|
||||
|
||||
const mempoolResult = await mempoolWaiter;
|
||||
const txResult = await txWaiters[0];
|
||||
const txMicroblockResult = await txWaiters[1];
|
||||
|
||||
try {
|
||||
expect(mempoolResult.tx_status).toEqual('pending');
|
||||
expect(mempoolResult.tx_id).toEqual('0x01');
|
||||
expect(txResult.tx_status).toEqual('pending');
|
||||
expect(txResult.tx_id).toEqual('0x01');
|
||||
expect(txMicroblockResult.tx_id).toEqual('0x01');
|
||||
expect(txMicroblockResult.tx_status).toEqual('pending');
|
||||
expect(txMicroblockResult.tx_status).toEqual('success');
|
||||
} finally {
|
||||
socket.emit('unsubscribe', 'mempool');
|
||||
socket.emit('unsubscribe', 'transaction:0x01');
|
||||
@@ -140,6 +143,52 @@ describe('socket-io', () => {
|
||||
}
|
||||
});
|
||||
|
||||
test('socket-io > mempool txs', async () => {
|
||||
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = '0';
|
||||
|
||||
const address = apiServer.address;
|
||||
const socket = io(`http://${address}`, {
|
||||
reconnection: false,
|
||||
query: { subscriptions: 'mempool' },
|
||||
});
|
||||
const txWaiters: Waiter<MempoolTransaction | Transaction>[] = [waiter(), waiter()];
|
||||
socket.on('mempool', tx => {
|
||||
if (tx.tx_status === 'pending') {
|
||||
txWaiters[0].finish(tx);
|
||||
} else {
|
||||
txWaiters[1].finish(tx);
|
||||
}
|
||||
});
|
||||
|
||||
const block1 = new TestBlockBuilder({ block_height: 1, index_block_hash: '0x01' })
|
||||
.addTx({ tx_id: '0x0101' })
|
||||
.build();
|
||||
await db.update(block1);
|
||||
const mempoolTx = testMempoolTx({ tx_id: '0x01', status: DbTxStatus.Pending });
|
||||
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
|
||||
const pendingResult = await txWaiters[0];
|
||||
|
||||
const block2 = new TestBlockBuilder({
|
||||
block_height: 2,
|
||||
index_block_hash: '0x02',
|
||||
parent_index_block_hash: '0x01',
|
||||
})
|
||||
.addTx({ tx_id: '0x0201' })
|
||||
.build();
|
||||
await db.update(block2);
|
||||
const droppedResult = await txWaiters[1];
|
||||
|
||||
try {
|
||||
expect(pendingResult.tx_id).toEqual('0x01');
|
||||
expect(pendingResult.tx_status).toEqual('pending');
|
||||
expect(droppedResult.tx_id).toEqual('0x01');
|
||||
expect(droppedResult.tx_status).toEqual('dropped_stale_garbage_collect');
|
||||
} finally {
|
||||
socket.emit('unsubscribe', 'mempool');
|
||||
socket.close();
|
||||
}
|
||||
});
|
||||
|
||||
test('socket-io > address tx updates', async () => {
|
||||
const addr1 = 'ST28D4Q6RCQSJ6F7TEYWQDS4N1RXYEP9YBWMYSB97';
|
||||
const socket = io(`http://${apiServer.address}`, {
|
||||
|
||||
@@ -6,7 +6,6 @@ import { once } from 'events';
|
||||
import { RpcWebSocketClient } from 'rpc-websocket-client';
|
||||
import {
|
||||
RpcTxUpdateSubscriptionParams,
|
||||
RpcTxUpdateNotificationParams,
|
||||
RpcAddressTxSubscriptionParams,
|
||||
RpcAddressTxNotificationParams,
|
||||
RpcAddressBalanceSubscriptionParams,
|
||||
@@ -84,7 +83,7 @@ describe('websocket notifications', () => {
|
||||
const mempoolWaiter: Waiter<MempoolTransaction> = waiter();
|
||||
client.onNotification.push(msg => {
|
||||
if (msg.method === 'tx_update') {
|
||||
const txUpdate: RpcTxUpdateNotificationParams = msg.params;
|
||||
const txUpdate: RpcAddressTxNotificationParams = msg.params;
|
||||
txUpdates[updateIndex++]?.finish(txUpdate.tx_status);
|
||||
}
|
||||
if (msg.method === 'mempool') {
|
||||
@@ -115,14 +114,14 @@ describe('websocket notifications', () => {
|
||||
|
||||
// check for microblock tx update notification
|
||||
const txStatus2 = await txUpdates[1];
|
||||
expect(txStatus2).toBe('pending');
|
||||
expect(txStatus2).toBe('success');
|
||||
|
||||
// update DB with TX after WS server is sent txid to monitor
|
||||
db.eventEmitter.emit('txUpdate', txId);
|
||||
|
||||
// check for tx update notification
|
||||
const txStatus3 = await txUpdates[2];
|
||||
expect(txStatus3).toBe('pending');
|
||||
expect(txStatus3).toBe('success');
|
||||
|
||||
// unsubscribe from notifications for this tx
|
||||
const unsubscribeResult = await client.call('unsubscribe', subParams1);
|
||||
@@ -272,6 +271,56 @@ describe('websocket notifications', () => {
|
||||
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
|
||||
tx_status: 'success',
|
||||
tx_type: 'token_transfer',
|
||||
stx_received: '100',
|
||||
stx_sent: '150',
|
||||
stx_transfers: [
|
||||
{
|
||||
amount: '100',
|
||||
recipient: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
|
||||
sender: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
|
||||
},
|
||||
],
|
||||
tx: {
|
||||
anchor_mode: 'any',
|
||||
block_hash: '0x01',
|
||||
block_height: 1,
|
||||
burn_block_time: 94869286,
|
||||
burn_block_time_iso: '1973-01-03T00:34:46.000Z',
|
||||
canonical: true,
|
||||
event_count: 0,
|
||||
events: [],
|
||||
execution_cost_read_count: 0,
|
||||
execution_cost_read_length: 0,
|
||||
execution_cost_runtime: 0,
|
||||
execution_cost_write_count: 0,
|
||||
execution_cost_write_length: 0,
|
||||
fee_rate: '50',
|
||||
is_unanchored: false,
|
||||
microblock_canonical: true,
|
||||
microblock_hash: '0x123466',
|
||||
microblock_sequence: 0,
|
||||
nonce: 0,
|
||||
parent_block_hash: '0x123456',
|
||||
parent_burn_block_time: 94869286,
|
||||
parent_burn_block_time_iso: '1973-01-03T00:34:46.000Z',
|
||||
post_condition_mode: 'allow',
|
||||
post_conditions: [],
|
||||
sender_address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
|
||||
sponsored: false,
|
||||
token_transfer: {
|
||||
amount: '100',
|
||||
memo: '0x',
|
||||
recipient_address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
|
||||
},
|
||||
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
|
||||
tx_index: 0,
|
||||
tx_result: {
|
||||
hex: '0x0703',
|
||||
repr: '(ok true)',
|
||||
},
|
||||
tx_status: 'success',
|
||||
tx_type: 'token_transfer',
|
||||
},
|
||||
});
|
||||
|
||||
const microblock = new TestMicroblockStreamBuilder()
|
||||
@@ -295,6 +344,56 @@ describe('websocket notifications', () => {
|
||||
tx_id: '0x8913',
|
||||
tx_status: 'success',
|
||||
tx_type: 'token_transfer',
|
||||
stx_received: '150',
|
||||
stx_sent: '200',
|
||||
stx_transfers: [
|
||||
{
|
||||
amount: '150',
|
||||
recipient: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
|
||||
sender: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
|
||||
},
|
||||
],
|
||||
tx: {
|
||||
anchor_mode: 'any',
|
||||
block_hash: '0x123456',
|
||||
block_height: 2,
|
||||
burn_block_time: 94869286,
|
||||
burn_block_time_iso: '1973-01-03T00:34:46.000Z',
|
||||
canonical: true,
|
||||
event_count: 0,
|
||||
events: [],
|
||||
execution_cost_read_count: 0,
|
||||
execution_cost_read_length: 0,
|
||||
execution_cost_runtime: 0,
|
||||
execution_cost_write_count: 0,
|
||||
execution_cost_write_length: 0,
|
||||
fee_rate: '50',
|
||||
is_unanchored: false,
|
||||
microblock_canonical: true,
|
||||
microblock_hash: '0x11',
|
||||
microblock_sequence: 0,
|
||||
nonce: 0,
|
||||
parent_block_hash: '0x01',
|
||||
parent_burn_block_time: 94869286,
|
||||
parent_burn_block_time_iso: '1973-01-03T00:34:46.000Z',
|
||||
post_condition_mode: 'allow',
|
||||
post_conditions: [],
|
||||
sender_address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
|
||||
sponsored: false,
|
||||
token_transfer: {
|
||||
amount: '150',
|
||||
memo: '0x',
|
||||
recipient_address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
|
||||
},
|
||||
tx_id: '0x8913',
|
||||
tx_index: 0,
|
||||
tx_result: {
|
||||
hex: '0x0703',
|
||||
repr: '(ok true)',
|
||||
},
|
||||
tx_status: 'success',
|
||||
tx_type: 'token_transfer',
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
await client.call('unsubscribe', subParams);
|
||||
@@ -349,6 +448,15 @@ describe('websocket notifications', () => {
|
||||
expect(txUpdate1).toEqual({
|
||||
address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
|
||||
balance: '100',
|
||||
burnchain_lock_height: 0,
|
||||
burnchain_unlock_height: 0,
|
||||
lock_height: 0,
|
||||
lock_tx_id: '',
|
||||
locked: '0',
|
||||
total_fees_sent: '0',
|
||||
total_miner_rewards_received: '0',
|
||||
total_received: '100',
|
||||
total_sent: '0',
|
||||
});
|
||||
|
||||
const unsubscribeResult = await client.call('unsubscribe', subParams1);
|
||||
@@ -387,6 +495,50 @@ describe('websocket notifications', () => {
|
||||
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
|
||||
tx_status: 'success',
|
||||
tx_type: 'token_transfer',
|
||||
stx_received: '0',
|
||||
stx_sent: '50',
|
||||
stx_transfers: [],
|
||||
tx: {
|
||||
anchor_mode: 'any',
|
||||
block_hash: '0x123456',
|
||||
block_height: 1,
|
||||
burn_block_time: 94869286,
|
||||
burn_block_time_iso: '1973-01-03T00:34:46.000Z',
|
||||
canonical: true,
|
||||
event_count: 0,
|
||||
events: [],
|
||||
execution_cost_read_count: 0,
|
||||
execution_cost_read_length: 0,
|
||||
execution_cost_runtime: 0,
|
||||
execution_cost_write_count: 0,
|
||||
execution_cost_write_length: 0,
|
||||
fee_rate: '50',
|
||||
is_unanchored: false,
|
||||
microblock_canonical: true,
|
||||
microblock_hash: '0x123466',
|
||||
microblock_sequence: 0,
|
||||
nonce: 0,
|
||||
parent_block_hash: '0x123456',
|
||||
parent_burn_block_time: 94869286,
|
||||
parent_burn_block_time_iso: '1973-01-03T00:34:46.000Z',
|
||||
post_condition_mode: 'allow',
|
||||
post_conditions: [],
|
||||
sender_address: 'ST3GQB6WGCWKDNFNPSQRV8DY93JN06XPZ2ZE9EVMA',
|
||||
sponsored: false,
|
||||
token_transfer: {
|
||||
amount: '100',
|
||||
memo: '0x',
|
||||
recipient_address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
|
||||
},
|
||||
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
|
||||
tx_index: 0,
|
||||
tx_result: {
|
||||
hex: '0x0703',
|
||||
repr: '(ok true)',
|
||||
},
|
||||
tx_status: 'success',
|
||||
tx_type: 'token_transfer',
|
||||
},
|
||||
});
|
||||
await subscription.unsubscribe();
|
||||
} finally {
|
||||
|
||||
Reference in New Issue
Block a user