From d1adca4d5001b7b592fb917e27e1cadceb73d567 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Wed, 29 Sep 2021 12:11:49 -0500 Subject: [PATCH] feat: add read-only mode * feat: add readonly vscode launch config and .env var * feat: add pg-listen dependency * feat: first successful messages * chore: modify event emitter types to use only ids * chore: apply changes to ws rpc * chore: reduce size of address tx updates * fix: build tests * chore: move pg-listen logic to separate class * fix: incorrect message forwarding * chore: remove special treatment for test env * fix: rosetta, bns, tokens tests * fix: ws-rpc tests and serialization * fix: block height on ws rpc test * fix: remove manual notifier connection * fix: remove circular dependency on PgNotifier * fix: move away from Map and Set * fix: close pg channels correcty before disconnecting * fix: update socket-io with latest messages * chore: reduce size of token metadata messages * chore: remove logs on readonly task * chore: restore old style on rosetta tests * chore: remove tx_id array from block update message * feat: don't send tx events on address updates * fix: remove logs from devenv docker * chore: remove unused interface * fix: skip db migrations during read-only mode * chore: compound debug for dual reader-writer mode * chore: restore removed line * chore: delete unused docker config * chore: update ts string management * chore: remove readonly npm script * chore: don't use notifier on event replay * chore: make notifications be non-blocking * feat: ignore limit/offset when getting asset transfers for a single block * chore: make limit and offset optional args too * chore: fix websocket unit test * fix: fetch block txs correctly Co-authored-by: Matthew Little --- .env | 4 + .vscode/launch.json | 34 +++++ .vscode/tasks.json | 2 +- package-lock.json | 35 +++++ package.json | 1 + src/api/routes/socket-io.ts | 114 +++++++------- src/api/routes/tx.ts | 4 +- src/api/routes/ws-rpc.ts | 68 ++++++--- src/datastore/common.ts | 24 +-- src/datastore/memory-store.ts | 16 +- src/datastore/postgres-notifier.ts | 106 +++++++++++++ src/datastore/postgres-store.ts | 144 +++++++++++++----- src/event-stream/tokens-contract-handler.ts | 18 ++- src/helpers.ts | 1 + src/index.ts | 100 ++++++------ src/tests-bns/bns-integration-tests.ts | 17 ++- .../validate-rosetta-construction.ts | 2 +- src/tests-rosetta-cli/validate-rosetta.ts | 2 +- src/tests-rosetta/api.ts | 37 +++-- src/tests-tokens/tokens-metadata-tests.ts | 19 ++- src/tests/websocket-tests.ts | 8 +- 21 files changed, 526 insertions(+), 230 deletions(-) create mode 100644 src/datastore/postgres-notifier.ts diff --git a/.env b/.env index 07b21566..5c7a36fe 100644 --- a/.env +++ b/.env @@ -17,6 +17,10 @@ PG_SSL=false # Enable to have stacks-node events streamed to a file while the application is running # STACKS_EXPORT_EVENTS_FILE=/tmp/stacks-events.tsv +# Initializes the API in read-only mode i.e. without an event server that listens to events from a node and writes them +# to the local database. The API will only read data from the PG database specified above to respond to requests. +# STACKS_READ_ONLY_MODE=true + STACKS_CORE_EVENT_PORT=3700 STACKS_CORE_EVENT_HOST=127.0.0.1 diff --git a/.vscode/launch.json b/.vscode/launch.json index 66daaf4e..e3a22867 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -49,6 +49,34 @@ }, "killBehavior": "polite", }, + { + "type": "node", + "request": "launch", + "name": "Launch: read-only", + "skipFiles": [ + "/**" + ], + "runtimeArgs": [ + "-r", + "ts-node/register/transpile-only", + "-r", + "tsconfig-paths/register" + ], + "args": [ + "${workspaceFolder}/src/index.ts" + ], + "outputCapture": "std", + "internalConsoleOptions": "openOnSessionStart", + "env": { + "STACKS_BLOCKCHAIN_API_PORT": "3998", + "STACKS_READ_ONLY_MODE": "true", + "STACKS_BLOCKCHAIN_API_DB": "pg", + "STACKS_CHAIN_ID": "0x80000000", + "NODE_ENV": "development", + "TS_NODE_SKIP_IGNORE": "true" + }, + "killBehavior": "polite", + }, { "type": "node", "request": "launch", @@ -182,5 +210,11 @@ "cwd": "${workspaceFolder}/docs", "args": ["${workspaceFolder}/docs/scripts/generate-types.ts"] } + ], + "compounds": [ + { + "name": "Launch: mocknet reader-writer", + "configurations": ["Launch: mocknet", "Launch: read-only"] + } ] } diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 3ec0e88b..a77f137d 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -14,7 +14,7 @@ { "label": "stacks-node:deploy-dev", "type": "shell", - "command": "npm run devenv:deploy -- -d && npm run devenv:logs", + "command": "npm run devenv:deploy -- -d", "isBackground": true, "problemMatcher": [ { diff --git a/package-lock.json b/package-lock.json index 6f702525..a2dddfc9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14139,11 +14139,41 @@ "resolved": "https://registry.npmjs.org/pg-cursor/-/pg-cursor-2.6.0.tgz", "integrity": "sha512-BFLg40CTgBJ+LX9EwqjztUYaKxpxLffMmDTmlQNMCustX/JxMTYimxRkdhZvPYZGp++/2LjuqkKtO5DVVq0FNg==" }, + "pg-format": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/pg-format/-/pg-format-1.0.4.tgz", + "integrity": "sha1-J3NCNsKtP05QZJFaWTNOIAQKgo4=" + }, "pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==" }, + "pg-listen": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/pg-listen/-/pg-listen-1.7.0.tgz", + "integrity": "sha512-MKDwKLm4ryhy7iq1yw1K1MvUzBdTkaT16HZToddX9QaT8XSdt3Kins5mYH6DLECGFzFWG09VdXvWOIYogjXrsg==", + "requires": { + "debug": "^4.1.1", + "pg-format": "^1.0.4", + "typed-emitter": "^0.1.0" + }, + "dependencies": { + "debug": { + "version": "4.3.2", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.2.tgz", + "integrity": "sha512-mOp8wKcvj7XxC78zLgw/ZA+6TSgkoE2C/ienthhRD298T7UNwAg9diBpLRxC0mOezLl4B0xV7M0cCO6P/O0Xhw==", + "requires": { + "ms": "2.1.2" + } + }, + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + } + } + }, "pg-pool": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.3.0.tgz", @@ -17101,6 +17131,11 @@ "mime-types": "~2.1.24" } }, + "typed-emitter": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/typed-emitter/-/typed-emitter-0.1.0.tgz", + "integrity": "sha512-Tfay0l6gJMP5rkil8CzGbLthukn+9BN/VXWcABVFPjOoelJ+koW8BuPZYk+h/L+lEeIp1fSzVRiWRPIjKVjPdg==" + }, "typedarray": { "version": "0.0.6", "resolved": "https://registry.npmjs.org/typedarray/-/typedarray-0.0.6.tgz", diff --git a/package.json b/package.json index 6e32abdf..9185d153 100644 --- a/package.json +++ b/package.json @@ -140,6 +140,7 @@ "pg": "^8.2.1", "pg-copy-streams": "^5.1.1", "pg-cursor": "^2.6.0", + "pg-listen": "^1.7.0", "prom-client": "^12.0.0", "rpc-bitcoin": "^2.0.0", "smart-buffer": "^4.1.0", diff --git a/src/api/routes/socket-io.ts b/src/api/routes/socket-io.ts index be62080b..51ab6e11 100644 --- a/src/api/routes/socket-io.ts +++ b/src/api/routes/socket-io.ts @@ -125,93 +125,95 @@ export function createSocketIORouter(db: DataStore, server: http.Server) { logger.info(`[socket.io] socket ${id} left room "${room}"`); }); - db.on('blockUpdate', (dbBlock, txIds, microblocksAccepted, microblocksStreamed) => { + db.on('blockUpdate', async (blockHash, microblocksAccepted, microblocksStreamed) => { // Only parse and emit data if there are currently subscriptions to the blocks topic const blockTopic: Topic = 'block'; if (adapter.rooms.has(blockTopic)) { + const dbBlockQuery = await db.getBlock({ hash: blockHash }); + if (!dbBlockQuery.found) { + return; + } + const dbBlock = dbBlockQuery.result; + let txIds: string[] = []; + const dbTxsQuery = await db.getBlockTxsRows(blockHash); + if (dbTxsQuery.found) { + txIds = dbTxsQuery.result.map(dbTx => dbTx.tx_id); + } const block = parseDbBlock(dbBlock, txIds, microblocksAccepted, microblocksStreamed); prometheus?.sendEvent('block'); io.to(blockTopic).emit('block', block); } }); - db.on('txUpdate', dbTx => { + db.on('txUpdate', async txId => { // Only parse and emit data if there are currently subscriptions to the mempool topic const mempoolTopic: Topic = 'mempool'; if (adapter.rooms.has(mempoolTopic)) { - // only watch for mempool txs - if ('receipt_time' in dbTx) { - // do not send updates for dropped/pruned mempool txs - if (!dbTx.pruned) { - const tx = parseDbMempoolTx(dbTx); - prometheus?.sendEvent('mempool'); - io.to(mempoolTopic).emit('mempool', tx); - } + const dbTxQuery = await db.getMempoolTx({ + txId: txId, + includeUnanchored: true, + includePruned: false, + }); + if (!dbTxQuery.found) { + return; } + const dbMempoolTx = dbTxQuery.result; + const tx = parseDbMempoolTx(dbMempoolTx); + prometheus?.sendEvent('mempool'); + io.to(mempoolTopic).emit('mempool', tx); } }); - db.on('addressUpdate', info => { - // Check for any subscribers to tx updates related to this address - const addrTxTopic: AddressTransactionTopic = `address-transaction:${info.address}` as const; + db.on('addressUpdate', async (address, blockHeight) => { + const addrTxTopic: AddressTransactionTopic = `address-transaction:${address}` as const; + const addrStxBalanceTopic: AddressStxBalanceTopic = `address-stx-balance:${address}` as const; + if (!adapter.rooms.has(addrTxTopic) && !adapter.rooms.has(addrStxBalanceTopic)) { + return; + } + const dbTxsQuery = await db.getAddressTxsWithAssetTransfers({ + stxAddress: address, + blockHeight: blockHeight, + }); + if (dbTxsQuery.total == 0) { + return; + } + const addressTxs = dbTxsQuery.results; + + // Address txs updates if (adapter.rooms.has(addrTxTopic)) { - info.txs.forEach((stxEvents, dbTx) => { - const parsedTx = parseDbTx(dbTx); - let stxSent = 0n; - let stxReceived = 0n; - const stxTransfers: AddressTransactionWithTransfers['stx_transfers'] = []; - Array.from(stxEvents).forEach(event => { - if (event.recipient === info.address) { - stxReceived += event.amount; - } - if (event.sender === info.address) { - stxSent += event.amount; - } - stxTransfers.push({ - amount: event.amount.toString(), - sender: event.sender, - recipient: event.recipient, - }); - }); - if (dbTx.sender_address === info.address) { - stxSent += dbTx.fee_rate; - } + addressTxs.forEach(addressTx => { + const parsedTx = parseDbTx(addressTx.tx); const result: AddressTransactionWithTransfers = { tx: parsedTx, - stx_sent: stxSent.toString(), - stx_received: stxReceived.toString(), - stx_transfers: stxTransfers, + stx_sent: addressTx.stx_sent.toString(), + stx_received: addressTx.stx_received.toString(), + stx_transfers: addressTx.stx_transfers.map(value => { + return { + amount: value.amount.toString(), + sender: value.sender, + recipient: value.recipient, + }; + }), }; prometheus?.sendEvent('address-transaction'); - io.to(addrTxTopic).emit('address-transaction', info.address, result); - // TODO: force type until template literal index signatures are supported https://github.com/microsoft/TypeScript/pull/26797 - io.to(addrTxTopic).emit( - (addrTxTopic as unknown) as 'address-transaction', - info.address, - result - ); + io.to(addrTxTopic).emit('address-transaction', address, result); + io.to(addrTxTopic).emit(addrTxTopic, address, result); }); } - // Check for any subscribers to STX balance updates for this address - const addrStxBalanceTopic: AddressStxBalanceTopic = `address-stx-balance:${info.address}` as const; + // Address STX balance updates if (adapter.rooms.has(addrStxBalanceTopic)) { // Get latest balance (in case multiple txs come in from different blocks) - const blockHeights = Array.from(info.txs.keys()).map(tx => tx.block_height); + const blockHeights = addressTxs.map(tx => tx.tx.block_height); const latestBlock = Math.max(...blockHeights); - void getAddressStxBalance(info.address, latestBlock) + void getAddressStxBalance(address, latestBlock) .then(balance => { prometheus?.sendEvent('address-stx-balance'); - io.to(addrStxBalanceTopic).emit('address-stx-balance', info.address, balance); - // TODO: force type until template literal index signatures are supported https://github.com/microsoft/TypeScript/pull/26797 - io.to(addrStxBalanceTopic).emit( - (addrStxBalanceTopic as unknown) as 'address-stx-balance', - info.address, - balance - ); + io.to(addrStxBalanceTopic).emit('address-stx-balance', address, balance); + io.to(addrStxBalanceTopic).emit(addrStxBalanceTopic, address, balance); }) .catch(error => { - logError(`[socket.io] Error querying STX balance update for ${info.address}`, error); + logError(`[socket.io] Error querying STX balance update for ${address}`, error); }); } }); diff --git a/src/api/routes/tx.ts b/src/api/routes/tx.ts index 100ecd2d..caf15b4c 100644 --- a/src/api/routes/tx.ts +++ b/src/api/routes/tx.ts @@ -194,8 +194,8 @@ export function createTxRouter(db: DataStore): RouterWithAsync { }; // EventEmitters don't like being passed Promise functions so wrap the async handler - const onTxUpdate = (txInfo: DbTx | DbMempoolTx): void => { - void dbTxUpdate(txInfo.tx_id); + const onTxUpdate = (txId: string): void => { + void dbTxUpdate(txId); }; const endWaiter = waiter(); diff --git a/src/api/routes/ws-rpc.ts b/src/api/routes/ws-rpc.ts index ec657b63..9976a819 100644 --- a/src/api/routes/ws-rpc.ts +++ b/src/api/routes/ws-rpc.ts @@ -19,7 +19,7 @@ import { RpcTxUpdateNotificationParams, } from '@stacks/stacks-blockchain-api-types'; -import { DataStore, AddressTxUpdateInfo, DbTx, DbMempoolTx } from '../../datastore/common'; +import { DataStore, DbTx, DbMempoolTx } from '../../datastore/common'; import { normalizeHashString, logError, isValidPrincipal } from '../../helpers'; import { getTxStatusString, getTxTypeString } from '../controllers/db-controller'; @@ -226,10 +226,26 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket return jsonRpcSuccess(req.payload.id, { address: address }); } - function processTxUpdate(tx: DbTx | DbMempoolTx) { + async function processTxUpdate(txId: string) { try { - const subscribers = txUpdateSubscriptions.subscriptions.get(tx.tx_id); + const subscribers = txUpdateSubscriptions.subscriptions.get(txId); if (subscribers) { + let tx: DbTx | DbMempoolTx; // Tx updates can come from both mempool and mined txs. + const dbMempoolTxQuery = await db.getMempoolTx({ + txId: txId, + includeUnanchored: true, + includePruned: true, + }); + if (dbMempoolTxQuery.found) { + tx = dbMempoolTxQuery.result; + } else { + const dbTxQuery = await db.getTx({ txId: txId, includeUnanchored: true }); + if (dbTxQuery.found) { + tx = dbTxQuery.result; + } else { + return; + } + } const updateNotification: RpcTxUpdateNotificationParams = { tx_id: tx.tx_id, tx_status: getTxStatusString(tx.status), @@ -242,20 +258,28 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket subscribers.forEach(client => client.send(rpcNotificationPayload)); } } catch (error) { - logError(`error sending websocket tx update for ${tx.tx_id}`, error); + logError(`error sending websocket tx update for ${txId}`, error); } } - function processAddressUpdate(addressInfo: AddressTxUpdateInfo) { + async function processAddressUpdate(address: string, blockHeight: number) { try { - const subscribers = addressTxUpdateSubscriptions.subscriptions.get(addressInfo.address); + const subscribers = addressTxUpdateSubscriptions.subscriptions.get(address); if (subscribers) { - Array.from(addressInfo.txs.keys()).forEach(tx => { + const dbTxsQuery = await db.getAddressTxsWithAssetTransfers({ + stxAddress: address, + blockHeight: blockHeight, + }); + if (dbTxsQuery.total == 0) { + return; + } + const addressTxs = dbTxsQuery.results; + addressTxs.forEach(tx => { const updateNotification: RpcAddressTxNotificationParams = { - address: addressInfo.address, - tx_id: tx.tx_id, - tx_status: getTxStatusString(tx.status), - tx_type: getTxTypeString(tx.type_id), + address: address, + tx_id: tx.tx.tx_id, + tx_status: getTxStatusString(tx.tx.status), + tx_type: getTxTypeString(tx.tx.type_id), }; const rpcNotificationPayload = jsonRpcNotification( 'address_tx_update', @@ -265,24 +289,24 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket }); } } catch (error) { - logError(`error sending websocket address tx updates to ${addressInfo.address}`, error); + logError(`error sending websocket address tx updates to ${address}`, error); } } // Queue to process balance update notifications const addrBalanceProcessorQueue = new PQueue({ concurrency: 1 }); - function processAddressBalanceUpdate(addressInfo: AddressTxUpdateInfo) { - const subscribers = addressBalanceUpdateSubscriptions.subscriptions.get(addressInfo.address); + function processAddressBalanceUpdate(address: string) { + const subscribers = addressBalanceUpdateSubscriptions.subscriptions.get(address); if (subscribers) { void addrBalanceProcessorQueue.add(async () => { try { const balance = await db.getStxBalance({ - stxAddress: addressInfo.address, + stxAddress: address, includeUnanchored: true, }); const balanceNotification: RpcAddressBalanceNotificationParams = { - address: addressInfo.address, + address: address, balance: balance.balance.toString(), }; const rpcNotificationPayload = jsonRpcNotification( @@ -291,19 +315,19 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket ).serialize(); subscribers.forEach(client => client.send(rpcNotificationPayload)); } catch (error) { - logError(`error sending websocket stx balance update to ${addressInfo.address}`, error); + logError(`error sending websocket stx balance update to ${address}`, error); } }); } } - db.addListener('txUpdate', txInfo => { - void processTxUpdate(txInfo); + db.addListener('txUpdate', async txId => { + await processTxUpdate(txId); }); - db.addListener('addressUpdate', addressInfo => { - void processAddressUpdate(addressInfo); - void processAddressBalanceUpdate(addressInfo); + db.addListener('addressUpdate', async (address, blockHeight) => { + await processAddressUpdate(address, blockHeight); + processAddressBalanceUpdate(address); }); wsServer.on('connection', (clientSocket, req) => { diff --git a/src/datastore/common.ts b/src/datastore/common.ts index 5927c221..5cd995d4 100644 --- a/src/datastore/common.ts +++ b/src/datastore/common.ts @@ -340,11 +340,6 @@ export interface DbTxWithAssetTransfers { }[]; } -export interface AddressTxUpdateInfo { - address: string; - txs: Map>; -} - export interface AddressNftEventIdentifier { sender: string; recipient: string; @@ -354,20 +349,25 @@ export interface AddressNftEventIdentifier { tx_id: Buffer; } +export interface TokenMetadataUpdateInfo { + queueId: number; + txId: string; + contractId: string; +} + export type DataStoreEventEmitter = StrictEventEmitter< EventEmitter, { - txUpdate: (info: DbTx | DbMempoolTx) => void; + txUpdate: (txId: string) => void; blockUpdate: ( - block: DbBlock, - txIds: string[], + blockHash: string, microblocksAccepted: string[], microblocksStreamed: string[] ) => void; - addressUpdate: (info: AddressTxUpdateInfo) => void; + addressUpdate: (address: string, blockHeight: number) => void; nameUpdate: (info: string) => void; tokensUpdate: (contractID: string) => void; - tokenMetadataUpdateQueued: (entry: DbTokenMetadataQueueEntry) => void; + tokenMetadataUpdateQueued: (entry: TokenMetadataUpdateInfo) => void; } >; @@ -717,8 +717,8 @@ export interface DataStore extends DataStoreEventEmitter { getAddressTxsWithAssetTransfers( args: { stxAddress: string; - limit: number; - offset: number; + limit?: number; + offset?: number; } & ({ blockHeight: number } | { includeUnanchored: boolean }) ): Promise<{ results: DbTxWithAssetTransfers[]; total: number }>; diff --git a/src/datastore/memory-store.ts b/src/datastore/memory-store.ts index 714360f9..b2f7e4a4 100644 --- a/src/datastore/memory-store.ts +++ b/src/datastore/memory-store.ts @@ -100,13 +100,9 @@ export class MemoryDataStore await this.updateSmartContract(entry.tx, smartContract); } } - const txIdList = data.txs - .map(({ tx }) => ({ txId: tx.tx_id, txIndex: tx.tx_index })) - .sort((a, b) => a.txIndex - b.txIndex) - .map(tx => tx.txId); - this.emit('blockUpdate', data.block, txIdList, [], []); + this.emit('blockUpdate', data.block.block_hash, [], []); data.txs.forEach(entry => { - this.emit('txUpdate', entry.tx); + this.emit('txUpdate', entry.tx.tx_id); }); } @@ -311,7 +307,7 @@ export class MemoryDataStore updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTx[] }): Promise { txs.forEach(tx => { this.txMempool.set(tx.tx_id, tx); - this.emit('txUpdate', tx); + this.emit('txUpdate', tx.tx_id); }); return Promise.resolve(); } @@ -322,7 +318,7 @@ export class MemoryDataStore if (tx) { tx.status = args.status; this.txMempool.set(txId, tx); - this.emit('txUpdate', tx); + this.emit('txUpdate', tx.tx_id); } }); return Promise.resolve(); @@ -526,8 +522,8 @@ export class MemoryDataStore getAddressTxsWithAssetTransfers(args: { stxAddress: string; - limit: number; - offset: number; + limit?: number; + offset?: number; blockHeight?: number; }): Promise<{ results: DbTxWithAssetTransfers[]; total: number }> { throw new Error('not yet implemented'); diff --git a/src/datastore/postgres-notifier.ts b/src/datastore/postgres-notifier.ts new file mode 100644 index 00000000..8094d38d --- /dev/null +++ b/src/datastore/postgres-notifier.ts @@ -0,0 +1,106 @@ +import { ClientConfig } from 'pg'; +import createPostgresSubscriber, { Subscriber } from 'pg-listen'; +import { logError } from '../helpers'; +import { DbTokenMetadataQueueEntry } from './common'; + +export type PgTxNotificationPayload = { + txId: string; +}; + +export type PgBlockNotificationPayload = { + blockHash: string; + microblocksAccepted: string[]; + microblocksStreamed: string[]; +}; + +export type PgAddressNotificationPayload = { + address: string; + blockHeight: number; +}; + +export type PgTokenMetadataNotificationPayload = { + entry: DbTokenMetadataQueueEntry; +}; + +export type PgNameNotificationPayload = { + nameInfo: string; +}; + +export type PgTokensNotificationPayload = { + contractID: string; +}; + +export type PgNotificationPayload = + | PgBlockNotificationPayload + | PgTxNotificationPayload + | PgAddressNotificationPayload + | PgTokenMetadataNotificationPayload + | PgNameNotificationPayload + | PgTokensNotificationPayload; + +export type PgNotification = { + type: string; + payload: PgNotificationPayload; +}; + +export type PgNotificationCallback = (notification: PgNotification) => void; + +/** + * Creates and connects to a channel between the API and the Postgres DB to receive table update notifications + * using LISTEN/NOTIFY messages. + * https://www.postgresql.org/docs/12/sql-notify.html + */ +export class PgNotifier { + readonly pgChannelName: string = 'pg-notifier'; + subscriber: Subscriber; + + constructor(clientConfig: ClientConfig) { + this.subscriber = createPostgresSubscriber(clientConfig); + } + + public async connect(eventCallback: PgNotificationCallback) { + this.subscriber.notifications.on(this.pgChannelName, message => + eventCallback(message.notification) + ); + this.subscriber.events.on('error', error => logError('Fatal PgNotifier error', error)); + await this.subscriber.connect(); + await this.subscriber.listenTo(this.pgChannelName); + } + + public sendBlock(payload: PgBlockNotificationPayload) { + this.notify({ type: 'blockUpdate', payload: payload }); + } + + public sendTx(payload: PgTxNotificationPayload) { + this.notify({ type: 'txUpdate', payload: payload }); + } + + public sendAddress(payload: PgAddressNotificationPayload) { + this.notify({ type: 'addressUpdate', payload: payload }); + } + + public sendName(payload: PgNameNotificationPayload) { + this.notify({ type: 'nameUpdate', payload: payload }); + } + + public sendTokenMetadata(payload: PgTokenMetadataNotificationPayload) { + this.notify({ type: 'tokenMetadataUpdateQueued', payload: payload }); + } + + public sendTokens(payload: PgTokensNotificationPayload) { + this.notify({ type: 'tokensUpdate', payload: payload }); + } + + public async close() { + await this.subscriber.unlisten(this.pgChannelName); + await this.subscriber.close(); + } + + private notify(notification: PgNotification) { + void this.subscriber + .notify(this.pgChannelName, { notification: notification }) + .catch(error => + logError(`Error sending PgNotifier notification of type: ${notification.type}`, error) + ); + } +} diff --git a/src/datastore/postgres-store.ts b/src/datastore/postgres-store.ts index 15d0fc1a..4e953fc1 100644 --- a/src/datastore/postgres-store.ts +++ b/src/datastore/postgres-store.ts @@ -35,6 +35,7 @@ import { distinctBy, unwrapOptional, pipelineAsync, + isProdEnv, has0xPrefix, } from '../helpers'; import { @@ -94,6 +95,15 @@ import { import { getTxTypeId } from '../api/controllers/db-controller'; import { isProcessableTokenMetadata } from '../event-stream/tokens-contract-handler'; import { ClarityAbi } from '@stacks/transactions'; +import { + PgAddressNotificationPayload, + PgBlockNotificationPayload, + PgNameNotificationPayload, + PgNotifier, + PgTokenMetadataNotificationPayload, + PgTokensNotificationPayload, + PgTxNotificationPayload, +} from './postgres-notifier'; const MIGRATIONS_TABLE = 'pgmigrations'; const MIGRATIONS_DIR = path.join(APP_DIR, 'migrations'); @@ -570,10 +580,52 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitter }) implements DataStore { readonly pool: Pool; - private constructor(pool: Pool) { + readonly notifier?: PgNotifier; + private constructor(pool: Pool, notifier: PgNotifier | undefined = undefined) { // eslint-disable-next-line constructor-super super(); this.pool = pool; + this.notifier = notifier; + } + + /** + * Connects to the `PgNotifier`. Its messages will be forwarded to the rest of the API components + * though the EventEmitter. + */ + async connectPgNotifier() { + await this.notifier?.connect(notification => { + switch (notification.type) { + case 'blockUpdate': + const block = notification.payload as PgBlockNotificationPayload; + this.emit( + 'blockUpdate', + block.blockHash, + block.microblocksAccepted, + block.microblocksStreamed + ); + break; + case 'txUpdate': + const tx = notification.payload as PgTxNotificationPayload; + this.emit('txUpdate', tx.txId); + break; + case 'addressUpdate': + const address = notification.payload as PgAddressNotificationPayload; + this.emit('addressUpdate', address.address, address.blockHeight); + break; + case 'tokensUpdate': + const tokens = notification.payload as PgTokensNotificationPayload; + this.emit('tokensUpdate', tokens.contractID); + break; + case 'nameUpdate': + const name = notification.payload as PgNameNotificationPayload; + this.emit('nameUpdate', name.nameInfo); + break; + case 'tokenMetadataUpdateQueued': + const metadata = notification.payload as PgTokenMetadataNotificationPayload; + this.emit('tokenMetadataUpdateQueued', metadata.entry); + break; + } + }); } /** @@ -1177,18 +1229,21 @@ export class PgDataStore // TODO(mb): look up microblocks streamed off this block that where accepted by the next anchor block const microblocksStreamed: string[] = []; - // TODO(mb): replace `data.txs` with a list of all updated DbTx values (including orphaned microblock-txs, updated microblock-txs, batched-txs) - const txIdList = data.txs - .map(({ tx }) => ({ txId: tx.tx_id, txIndex: tx.tx_index })) - .sort((a, b) => a.txIndex - b.txIndex) - .map(tx => tx.txId); - this.emit('blockUpdate', data.block, txIdList, microblocksAccepted, microblocksStreamed); - data.txs.forEach(entry => { - this.emit('txUpdate', entry.tx); - }); - this.emitAddressTxUpdates(data); - for (const tokenMetadataQueueEntry of tokenMetadataQueueEntries) { - this.emit('tokenMetadataUpdateQueued', tokenMetadataQueueEntry); + // Skip sending `PgNotifier` updates altogether if we're in the genesis block since this block is the + // event replay of the v1 blockchain. + if ((data.block.block_height > 1 || !isProdEnv) && this.notifier) { + this.notifier?.sendBlock({ + blockHash: data.block.block_hash, + microblocksAccepted: microblocksAccepted, + microblocksStreamed: microblocksStreamed, + }); + data.txs.forEach(entry => { + this.notifier?.sendTx({ txId: entry.tx.tx_id }); + }); + this.emitAddressTxUpdates(data); + for (const tokenMetadataQueueEntry of tokenMetadataQueueEntries) { + this.notifier?.sendTokenMetadata({ entry: tokenMetadataQueueEntry }); + } } } @@ -1702,7 +1757,7 @@ export class PgDataStore [zonefile, validZonefileHash] ); }); - this.emit('nameUpdate', tx_id); + this.notifier?.sendName({ nameInfo: tx_id }); } private validateZonefileHash(zonefileHash: string) { @@ -1732,17 +1787,12 @@ export class PgDataStore emitAddressTxUpdates(data: DataStoreBlockUpdateData) { // Record all addresses that had an associated tx. - // Key = address, value = set of TxIds - const addressTxUpdates = new Map>>(); + const addressTxUpdates = new Map(); data.txs.forEach(entry => { const tx = entry.tx; - const addAddressTx = (addr: string | undefined, stxEvent?: DbStxEvent) => { + const addAddressTx = (addr: string | undefined) => { if (addr) { - const addrTxs = getOrAdd(addressTxUpdates, addr, () => new Map>()); - const txEvents = getOrAdd(addrTxs, tx, () => new Set()); - if (stxEvent !== undefined) { - txEvents.add(stxEvent); - } + getOrAdd(addressTxUpdates, addr, () => tx.block_height); } }; addAddressTx(tx.sender_address); @@ -1750,8 +1800,8 @@ export class PgDataStore addAddressTx(event.locked_address); }); entry.stxEvents.forEach(event => { - addAddressTx(event.sender, event); - addAddressTx(event.recipient, event); + addAddressTx(event.sender); + addAddressTx(event.recipient); }); entry.ftEvents.forEach(event => { addAddressTx(event.sender); @@ -1776,10 +1826,10 @@ export class PgDataStore break; } }); - addressTxUpdates.forEach((txs, address) => { - this.emit('addressUpdate', { - address, - txs, + addressTxUpdates.forEach((blockHeight, address) => { + this.notifier?.sendAddress({ + address: address, + blockHeight: blockHeight, }); }); } @@ -2286,7 +2336,7 @@ export class PgDataStore logger.verbose(`Entities marked as non-canonical: ${markedNonCanonical}`); } - static async connect(skipMigrations = false): Promise { + static async connect(skipMigrations = false, withNotifier = true): Promise { const clientConfig = getPgClientConfig(); const initTimer = stopwatch(); @@ -2341,7 +2391,13 @@ export class PgDataStore let poolClient: PoolClient | undefined; try { poolClient = await pool.connect(); - return new PgDataStore(pool); + if (!withNotifier) { + return new PgDataStore(pool); + } + const notifier = new PgNotifier(clientConfig); + const store = new PgDataStore(pool, notifier); + await store.connectPgNotifier(); + return store; } catch (error) { logError( `Error connecting to Postgres using ${JSON.stringify(clientConfig)}: ${error}`, @@ -3087,7 +3143,7 @@ export class PgDataStore } }); for (const tx of updatedTxs) { - this.emit('txUpdate', tx); + this.notifier?.sendTx({ txId: tx.tx_id }); } } @@ -3107,7 +3163,7 @@ export class PgDataStore updatedTxs = updateResults.rows.map(r => this.parseMempoolTxQueryResult(r)); }); for (const tx of updatedTxs) { - this.emit('txUpdate', tx); + this.notifier?.sendTx({ txId: tx.tx_id }); } } @@ -4807,14 +4863,16 @@ export class PgDataStore async getAddressTxsWithAssetTransfers( args: { stxAddress: string; - limit: number; - offset: number; + limit?: number; + offset?: number; } & ({ blockHeight: number } | { includeUnanchored: boolean }) ): Promise<{ results: DbTxWithAssetTransfers[]; total: number }> { return this.queryTx(async client => { let atSingleBlock: boolean; - const queryParams: (string | number)[] = [args.stxAddress, args.limit, args.offset]; + const queryParams: (string | number)[] = [args.stxAddress]; if ('blockHeight' in args) { + // Single block mode ignores `limit` and `offset` arguments so we can retrieve all + // address events for that address in that block. atSingleBlock = true; queryParams.push(args.blockHeight); } else { @@ -4822,6 +4880,8 @@ export class PgDataStore includeUnanchored: args.includeUnanchored, }); atSingleBlock = false; + queryParams.push(args.limit ?? 20); + queryParams.push(args.offset ?? 0); queryParams.push(blockHeight); } // Use a JOIN to include stx_events associated with the address's txs @@ -4861,10 +4921,9 @@ export class PgDataStore ) SELECT ${TX_COLUMNS}, (COUNT(*) OVER())::integer as count FROM principal_txs - ${atSingleBlock ? 'WHERE block_height = $4' : 'WHERE block_height <= $4'} + ${atSingleBlock ? 'WHERE block_height = $2' : 'WHERE block_height <= $4'} ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC - LIMIT $2 - OFFSET $3 + ${!atSingleBlock ? 'LIMIT $2 OFFSET $3' : ''} ), events AS ( SELECT tx_id, sender, recipient, event_index, amount, @@ -6069,7 +6128,7 @@ export class PgDataStore ` SELECT locked_amount, unlock_height, locked_address FROM stx_lock_events - WHERE microblock_canonical = true AND canonical = true + WHERE microblock_canonical = true AND canonical = true AND unlock_height <= $1 AND unlock_height > $2 `, [current_burn_height, previous_burn_height] @@ -6081,7 +6140,7 @@ export class PgDataStore ` SELECT tx_id FROM txs - WHERE microblock_canonical = true AND canonical = true + WHERE microblock_canonical = true AND canonical = true AND block_height = $1 AND type_id = $2 LIMIT 1 `, @@ -6228,7 +6287,7 @@ export class PgDataStore ); return result.rowCount; }); - this.emit('tokensUpdate', contract_id); + this.notifier?.sendTokens({ contractID: contract_id }); return rowCount; } @@ -6274,7 +6333,7 @@ export class PgDataStore ); return result.rowCount; }); - this.emit('tokensUpdate', contract_id); + this.notifier?.sendTokens({ contractID: contract_id }); return rowCount; } @@ -6361,6 +6420,7 @@ export class PgDataStore } async close(): Promise { + await this.notifier?.close(); await this.pool.end(); } } diff --git a/src/event-stream/tokens-contract-handler.ts b/src/event-stream/tokens-contract-handler.ts index b34d3b3d..b556cf4d 100644 --- a/src/event-stream/tokens-contract-handler.ts +++ b/src/event-stream/tokens-contract-handler.ts @@ -4,6 +4,7 @@ import { DbFungibleTokenMetadata, DbNonFungibleTokenMetadata, DbTokenMetadataQueueEntry, + TokenMetadataUpdateInfo, } from '../datastore/common'; import { callReadOnlyFunction, @@ -721,9 +722,9 @@ export class TokensProcessorQueue { }> = new Evt(); /** The entries currently queued for processing in memory, keyed by the queue entry db id. */ - readonly queuedEntries: Map = new Map(); + readonly queuedEntries: Map = new Map(); - readonly onTokenMetadataUpdateQueued: (entry: DbTokenMetadataQueueEntry) => void; + readonly onTokenMetadataUpdateQueued: (entry: TokenMetadataUpdateInfo) => void; constructor(db: DataStore, chainId: ChainID) { this.db = db; @@ -751,7 +752,7 @@ export class TokensProcessorQueue { queuedEntries ); for (const entry of entries) { - this.queueHandler(entry); + await this.queueHandler(entry); } await this.queue.onEmpty(); // await this.queue.onIdle(); @@ -770,26 +771,31 @@ export class TokensProcessorQueue { queuedEntries ); for (const entry of entries) { - this.queueHandler(entry); + await this.queueHandler(entry); } } } - queueHandler(queueEntry: DbTokenMetadataQueueEntry) { + async queueHandler(queueEntry: TokenMetadataUpdateInfo) { if ( this.queuedEntries.has(queueEntry.queueId) || this.queuedEntries.size >= this.queue.concurrency ) { return; } + const contractQuery = await this.db.getSmartContract(queueEntry.contractId); + if (!contractQuery.found) { + return; + } logger.info( `[token-metadata] queueing token contract for processing: ${queueEntry.contractId} from tx ${queueEntry.txId}` ); this.queuedEntries.set(queueEntry.queueId, queueEntry); + const contractAbi: ClarityAbi = JSON.parse(contractQuery.result.abi); const tokenContractHandler = new TokensContractHandler({ contractId: queueEntry.contractId, - smartContractAbi: queueEntry.contractAbi, + smartContractAbi: contractAbi, datastore: this.db, chainId: this.chainId, txId: queueEntry.txId, diff --git a/src/helpers.ts b/src/helpers.ts index da175219..a68b4f5b 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -24,6 +24,7 @@ export const isProdEnv = process.env.NODE_ENV === 'prod' || !process.env.NODE_ENV || (!isTestEnv && !isDevEnv); +export const isReadOnlyMode = parseArgBoolean(process.env['STACKS_READ_ONLY_MODE']); export const APP_DIR = __dirname; export const REPO_DIR = path.dirname(__dirname); diff --git a/src/index.ts b/src/index.ts index 3a57417a..ae1d73a9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,6 +6,7 @@ import { isProdEnv, numberToHex, httpPostRequest, + isReadOnlyMode, } from './helpers'; import * as sourceMapSupport from 'source-map-support'; import { DataStore } from './datastore/common'; @@ -91,7 +92,8 @@ async function init(): Promise { } case 'pg': case undefined: { - db = await PgDataStore.connect(); + const skipMigrations = isReadOnlyMode; + db = await PgDataStore.connect(skipMigrations); break; } default: { @@ -101,56 +103,58 @@ async function init(): Promise { } } - if (db instanceof PgDataStore) { - if (isProdEnv) { - await importV1TokenOfferingData(db); - } else { - logger.warn( - `Notice: skipping token offering data import because of non-production NODE_ENV` - ); + if (!isReadOnlyMode) { + if (db instanceof PgDataStore) { + if (isProdEnv) { + await importV1TokenOfferingData(db); + } else { + logger.warn( + `Notice: skipping token offering data import because of non-production NODE_ENV` + ); + } + if (isProdEnv && !process.env.BNS_IMPORT_DIR) { + logger.warn(`Notice: full BNS functionality requires 'BNS_IMPORT_DIR' to be set.`); + } else if (process.env.BNS_IMPORT_DIR) { + await importV1BnsData(db, process.env.BNS_IMPORT_DIR); + } } - if (isProdEnv && !process.env.BNS_IMPORT_DIR) { - logger.warn(`Notice: full BNS functionality requires 'BNS_IMPORT_DIR' to be set.`); - } else if (process.env.BNS_IMPORT_DIR) { - await importV1BnsData(db, process.env.BNS_IMPORT_DIR); - } - } - const configuredChainID = getConfiguredChainID(); + const configuredChainID = getConfiguredChainID(); - const eventServer = await startEventServer({ - datastore: db, - chainId: configuredChainID, - }); - registerShutdownConfig({ - name: 'Event Server', - handler: () => eventServer.closeAsync(), - forceKillable: false, - }); - - const networkChainId = await getCoreChainID(); - if (networkChainId !== configuredChainID) { - const chainIdConfig = numberToHex(configuredChainID); - const chainIdNode = numberToHex(networkChainId); - const error = new Error( - `The configured STACKS_CHAIN_ID does not match, configured: ${chainIdConfig}, stacks-node: ${chainIdNode}` - ); - logError(error.message, error); - throw error; - } - monitorCoreRpcConnection().catch(error => { - logger.error(`Error monitoring RPC connection: ${error}`, error); - }); - - if (isFtMetadataEnabled() || isNftMetadataEnabled()) { - const tokenMetadataProcessor = new TokensProcessorQueue(db, configuredChainID); - registerShutdownConfig({ - name: 'Token Metadata Processor', - handler: () => tokenMetadataProcessor.close(), - forceKillable: true, + const eventServer = await startEventServer({ + datastore: db, + chainId: configuredChainID, }); - // check if db has any non-processed token queues and await them all here - await tokenMetadataProcessor.drainDbQueue(); + registerShutdownConfig({ + name: 'Event Server', + handler: () => eventServer.closeAsync(), + forceKillable: false, + }); + + const networkChainId = await getCoreChainID(); + if (networkChainId !== configuredChainID) { + const chainIdConfig = numberToHex(configuredChainID); + const chainIdNode = numberToHex(networkChainId); + const error = new Error( + `The configured STACKS_CHAIN_ID does not match, configured: ${chainIdConfig}, stacks-node: ${chainIdNode}` + ); + logError(error.message, error); + throw error; + } + monitorCoreRpcConnection().catch(error => { + logger.error(`Error monitoring RPC connection: ${error}`, error); + }); + + if (isFtMetadataEnabled() || isNftMetadataEnabled()) { + const tokenMetadataProcessor = new TokensProcessorQueue(db, configuredChainID); + registerShutdownConfig({ + name: 'Token Metadata Processor', + handler: () => tokenMetadataProcessor.close(), + forceKillable: true, + }); + // check if db has any non-processed token queues and await them all here + await tokenMetadataProcessor.drainDbQueue(); + } } } @@ -270,7 +274,7 @@ async function handleProgramArgs() { // or the `--force` option can be used. await cycleMigrations({ dangerousAllowDataLoss: true }); - const db = await PgDataStore.connect(true); + const db = await PgDataStore.connect(true, false); const eventServer = await startEventServer({ datastore: db, chainId: getConfiguredChainID(), diff --git a/src/tests-bns/bns-integration-tests.ts b/src/tests-bns/bns-integration-tests.ts index d6fa004c..07a368cd 100644 --- a/src/tests-bns/bns-integration-tests.ts +++ b/src/tests-bns/bns-integration-tests.ts @@ -62,15 +62,20 @@ describe('BNS integration tests', () => { function standByForTx(expectedTxId: string): Promise { const broadcastTx = new Promise(resolve => { - const listener: (info: DbTx | DbMempoolTx) => void = info => { + const listener: (txId: string) => void = async txId => { + const dbTxQuery = await api.datastore.getTx({ txId: txId, includeUnanchored: true }); + if (!dbTxQuery.found) { + return; + } + const dbTx = dbTxQuery.result as DbTx; if ( - info.tx_id === expectedTxId && - (info.status === DbTxStatus.Success || - info.status === DbTxStatus.AbortByResponse || - info.status === DbTxStatus.AbortByPostCondition) + dbTx.tx_id === expectedTxId && + (dbTx.status === DbTxStatus.Success || + dbTx.status === DbTxStatus.AbortByResponse || + dbTx.status === DbTxStatus.AbortByPostCondition) ) { api.datastore.removeListener('txUpdate', listener); - resolve(info as DbTx); + resolve(dbTx); } }; api.datastore.addListener('txUpdate', listener); diff --git a/src/tests-rosetta-cli/validate-rosetta-construction.ts b/src/tests-rosetta-cli/validate-rosetta-construction.ts index 3ef7b8d3..40c147d8 100644 --- a/src/tests-rosetta-cli/validate-rosetta-construction.ts +++ b/src/tests-rosetta-cli/validate-rosetta-construction.ts @@ -277,7 +277,7 @@ function uniqueId() { } async function waitForBlock(api: ApiServer) { - await new Promise(resolve => api.datastore.once('blockUpdate', block => resolve(block))); + await new Promise(resolve => api.datastore.once('blockUpdate', blockHash => resolve(blockHash))); } function sleep(ms: number) { diff --git a/src/tests-rosetta-cli/validate-rosetta.ts b/src/tests-rosetta-cli/validate-rosetta.ts index 44475257..65d5e2d8 100644 --- a/src/tests-rosetta-cli/validate-rosetta.ts +++ b/src/tests-rosetta-cli/validate-rosetta.ts @@ -330,7 +330,7 @@ function uniqueId() { } async function waitForBlock(api: ApiServer) { - await new Promise(resolve => api.datastore.once('blockUpdate', block => resolve(block))); + await new Promise(resolve => api.datastore.once('blockUpdate', blockHash => resolve(blockHash))); } function sleep(ms: number) { diff --git a/src/tests-rosetta/api.ts b/src/tests-rosetta/api.ts index 2938248c..6c0bbc62 100644 --- a/src/tests-rosetta/api.ts +++ b/src/tests-rosetta/api.ts @@ -82,15 +82,20 @@ describe('Rosetta API', () => { function standByForTx(expectedTxId: string): Promise { const broadcastTx = new Promise(resolve => { - const listener: (info: DbTx | DbMempoolTx) => void = info => { + const listener: (txId: string) => void = async txId => { + const dbTxQuery = await api.datastore.getTx({ txId: txId, includeUnanchored: true }); + if (!dbTxQuery.found) { + return; + } + const dbTx = dbTxQuery.result as DbTx; if ( - info.tx_id === expectedTxId && - (info.status === DbTxStatus.Success || - info.status === DbTxStatus.AbortByResponse || - info.status === DbTxStatus.AbortByPostCondition) + dbTx.tx_id === expectedTxId && + (dbTx.status === DbTxStatus.Success || + dbTx.status === DbTxStatus.AbortByResponse || + dbTx.status === DbTxStatus.AbortByPostCondition) ) { api.datastore.removeListener('txUpdate', listener); - resolve(info as DbTx); + resolve(dbTx); } }; api.datastore.addListener('txUpdate', listener); @@ -196,10 +201,10 @@ describe('Rosetta API', () => { test('network/status', async () => { // skip first a block (so we are at least N+1 blocks) - await new Promise(resolve => + await new Promise(resolve => api.datastore.once('blockUpdate', block => resolve(block)) ); - const block = await new Promise(resolve => + const blockHash = await new Promise(resolve => api.datastore.once('blockUpdate', block => resolve(block)) ); const genesisBlock = await api.datastore.getBlock({ height: 1 }); @@ -209,11 +214,14 @@ describe('Rosetta API', () => { .send({ network_identifier: { blockchain: 'stacks', network: 'testnet' } }); expect(query1.status).toBe(200); expect(query1.type).toBe('application/json'); + const blockQuery = await api.datastore.getBlock({ hash: blockHash }); + assert(blockQuery.found); + const block = blockQuery.result; const expectResponse = { current_block_identifier: { index: block.block_height, - hash: block.block_hash, + hash: blockHash, }, current_block_timestamp: block.burn_block_time * 1000, genesis_block_identifier: { @@ -356,10 +364,15 @@ describe('Rosetta API', () => { test('block/transaction', async () => { let expectedTxId: string = ''; const broadcastTx = new Promise(resolve => { - const listener: (info: DbTx | DbMempoolTx) => void = info => { - if (info.tx_id === expectedTxId && info.status === DbTxStatus.Success) { + const listener: (txId: string) => void = async txId => { + const dbTxQuery = await api.datastore.getTx({ txId: txId, includeUnanchored: false }); + if (!dbTxQuery.found) { + return; + } + const dbTx = dbTxQuery.result as DbTx; + if (dbTx.tx_id === expectedTxId && dbTx.status === DbTxStatus.Success) { api.datastore.removeListener('txUpdate', listener); - resolve(info as DbTx); + resolve(dbTx); } }; api.datastore.addListener('txUpdate', listener); diff --git a/src/tests-tokens/tokens-metadata-tests.ts b/src/tests-tokens/tokens-metadata-tests.ts index c72c223f..6b9ca6a8 100644 --- a/src/tests-tokens/tokens-metadata-tests.ts +++ b/src/tests-tokens/tokens-metadata-tests.ts @@ -37,16 +37,21 @@ describe('api tests', () => { let tokensProcessorQueue: TokensProcessorQueue; function standByForTx(expectedTxId: string): Promise { - const broadcastTx = new Promise(resolve => { - const listener: (info: DbTx | DbMempoolTx) => void = info => { + const broadcastTx = new Promise((resolve, reject) => { + const listener: (txId: string) => void = async txId => { + const dbTxQuery = await api.datastore.getTx({ txId: txId, includeUnanchored: true }); + if (!dbTxQuery.found) { + return; + } + const dbTx = dbTxQuery.result; if ( - info.tx_id === expectedTxId && - (info.status === DbTxStatus.Success || - info.status === DbTxStatus.AbortByResponse || - info.status === DbTxStatus.AbortByPostCondition) + dbTx.tx_id === expectedTxId && + (dbTx.status === DbTxStatus.Success || + dbTx.status === DbTxStatus.AbortByResponse || + dbTx.status === DbTxStatus.AbortByPostCondition) ) { api.datastore.removeListener('txUpdate', listener); - resolve(info as DbTx); + resolve(dbTx); } }; api.datastore.addListener('txUpdate', listener); diff --git a/src/tests/websocket-tests.ts b/src/tests/websocket-tests.ts index ca900f1f..2eeef442 100644 --- a/src/tests/websocket-tests.ts +++ b/src/tests/websocket-tests.ts @@ -189,7 +189,7 @@ describe('websocket notifications', () => { // update DB with TX after WS server is sent txid to monitor // tx.status = DbTxStatus.Success; // await db.update(dbUpdate); - db.emit('txUpdate', { ...tx, status: DbTxStatus.Pending }); + db.emit('txUpdate', tx.tx_id); // check for tx update notification const txStatus2 = await txUpdates[1]; @@ -202,7 +202,7 @@ describe('websocket notifications', () => { }); // ensure tx updates no longer received - db.emit('txUpdate', { ...tx, status: DbTxStatus.Pending }); + db.emit('txUpdate', tx.tx_id); await new Promise(resolve => setImmediate(resolve)); expect(txUpdates[2].isFinished).toBe(false); } finally { @@ -240,7 +240,7 @@ describe('websocket notifications', () => { raw_tx: Buffer.from('raw-tx-test'), index_block_hash: '0x5432', block_hash: '0x9876', - block_height: 68456, + block_height: 1, burn_block_time: 2837565, parent_burn_block_time: 1626122935, type_id: DbTxTypeId.TokenTransfer, @@ -529,7 +529,7 @@ describe('websocket notifications', () => { raw_tx: Buffer.from('raw-tx-test'), index_block_hash: '0x5432', block_hash: '0x9876', - block_height: 68456, + block_height: 1, burn_block_time: 2837565, parent_burn_block_time: 1626122935, type_id: DbTxTypeId.TokenTransfer,