From 2e6448d7afc7bb35d5bcd3da88105f0552a13764 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Wed, 12 Oct 2022 14:52:15 -0500 Subject: [PATCH] fix: handle pg exceptions on web socket transmitter (#1353) --- src/api/routes/ws/web-socket-transmitter.ts | 205 +++++++++++--------- src/tests/ws-transmitter-tests.ts | 59 ++++++ 2 files changed, 176 insertions(+), 88 deletions(-) create mode 100644 src/tests/ws-transmitter-tests.ts diff --git a/src/api/routes/ws/web-socket-transmitter.ts b/src/api/routes/ws/web-socket-transmitter.ts index 612c6604..0dbc62c2 100644 --- a/src/api/routes/ws/web-socket-transmitter.ts +++ b/src/api/routes/ws/web-socket-transmitter.ts @@ -12,6 +12,7 @@ import { WebSocketChannel } from './web-socket-channel'; import { SocketIOChannel } from './channels/socket-io-channel'; import { WsRpcChannel } from './channels/ws-rpc-channel'; import { parseNftEvent } from '../../../datastore/helpers'; +import { logger } from '../../../helpers'; /** * This object matches real time update `WebSocketTopics` subscriptions with internal @@ -67,128 +68,156 @@ export class WebSocketTransmitter { private async blockUpdate(blockHash: string) { if (this.channels.find(c => c.hasListeners('block'))) { - const blockQuery = await getBlockFromDataStore({ - blockIdentifer: { hash: blockHash }, - db: this.db, - }); - if (blockQuery.found) { - this.channels.forEach(c => c.send('block', blockQuery.result)); + try { + const blockQuery = await getBlockFromDataStore({ + blockIdentifer: { hash: blockHash }, + db: this.db, + }); + if (blockQuery.found) { + this.channels.forEach(c => c.send('block', blockQuery.result)); + } + } catch (error) { + logger.error(error); } } } private async microblockUpdate(microblockHash: string) { if (this.channels.find(c => c.hasListeners('microblock'))) { - const microblockQuery = await getMicroblockFromDataStore({ - db: this.db, - microblockHash: microblockHash, - }); - if (microblockQuery.found) { - this.channels.forEach(c => c.send('microblock', microblockQuery.result)); + try { + const microblockQuery = await getMicroblockFromDataStore({ + db: this.db, + microblockHash: microblockHash, + }); + if (microblockQuery.found) { + this.channels.forEach(c => c.send('microblock', microblockQuery.result)); + } + } catch (error) { + logger.error(error); } } } private async txUpdate(txId: string) { if (this.channels.find(c => c.hasListeners('mempool'))) { - const mempoolTxs = await getMempoolTxsFromDataStore(this.db, { - txIds: [txId], - includeUnanchored: true, - }); - if (mempoolTxs.length > 0) { - this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0])); - } - } - - if (this.channels.find(c => c.hasListeners('transaction', txId))) { - // Look at the `txs` table first so we always prefer the confirmed transaction. - const txQuery = await getTxFromDataStore(this.db, { - txId: txId, - includeUnanchored: true, - }); - if (txQuery.found) { - this.channels.forEach(c => c.send('transaction', txQuery.result)); - } else { - // Tx is not yet confirmed, look at `mempool_txs`. + try { const mempoolTxs = await getMempoolTxsFromDataStore(this.db, { txIds: [txId], includeUnanchored: true, }); if (mempoolTxs.length > 0) { - this.channels.forEach(c => c.send('transaction', mempoolTxs[0])); + this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0])); } + } catch (error) { + logger.error(error); + } + } + + if (this.channels.find(c => c.hasListeners('transaction', txId))) { + try { + // Look at the `txs` table first so we always prefer the confirmed transaction. + const txQuery = await getTxFromDataStore(this.db, { + txId: txId, + includeUnanchored: true, + }); + if (txQuery.found) { + this.channels.forEach(c => c.send('transaction', txQuery.result)); + } else { + // Tx is not yet confirmed, look at `mempool_txs`. + const mempoolTxs = await getMempoolTxsFromDataStore(this.db, { + txIds: [txId], + includeUnanchored: true, + }); + if (mempoolTxs.length > 0) { + this.channels.forEach(c => c.send('transaction', mempoolTxs[0])); + } + } + } catch (error) { + logger.error(error); } } } private async nftEventUpdate(txId: string, eventIndex: number) { - const nftEvent = await this.db.getNftEvent({ txId, eventIndex }); - if (!nftEvent.found) { - return; - } - const assetIdentifier = nftEvent.result.asset_identifier; - const value = nftEvent.result.value; - const event = parseNftEvent(nftEvent.result); + try { + const nftEvent = await this.db.getNftEvent({ txId, eventIndex }); + if (!nftEvent.found) { + return; + } + const assetIdentifier = nftEvent.result.asset_identifier; + const value = nftEvent.result.value; + const event = parseNftEvent(nftEvent.result); - if (this.channels.find(c => c.hasListeners('nftEvent'))) { - this.channels.forEach(c => c.send('nftEvent', event)); - } - if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) { - this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event)); - } - if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) { - this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event)); + if (this.channels.find(c => c.hasListeners('nftEvent'))) { + this.channels.forEach(c => c.send('nftEvent', event)); + } + if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) { + this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event)); + } + if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) { + this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event)); + } + } catch (error) { + logger.error(error); } } private async addressUpdate(address: string, blockHeight: number) { if (this.channels.find(c => c.hasListeners('principalTransactions', address))) { - const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({ - stxAddress: address, - blockHeight: blockHeight, - atSingleBlock: true, - }); - if (dbTxsQuery.total == 0) { - return; - } - const addressTxs = dbTxsQuery.results; - for (const addressTx of addressTxs) { - const parsedTx = parseDbTx(addressTx.tx); - const result: AddressTransactionWithTransfers = { - tx: parsedTx, - stx_sent: addressTx.stx_sent.toString(), - stx_received: addressTx.stx_received.toString(), - stx_transfers: addressTx.stx_transfers.map(value => { - return { - amount: value.amount.toString(), - sender: value.sender, - recipient: value.recipient, - }; - }), - }; - this.channels.forEach(c => c.send('principalTransaction', address, result)); + try { + const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({ + stxAddress: address, + blockHeight: blockHeight, + atSingleBlock: true, + }); + if (dbTxsQuery.total == 0) { + return; + } + const addressTxs = dbTxsQuery.results; + for (const addressTx of addressTxs) { + const parsedTx = parseDbTx(addressTx.tx); + const result: AddressTransactionWithTransfers = { + tx: parsedTx, + stx_sent: addressTx.stx_sent.toString(), + stx_received: addressTx.stx_received.toString(), + stx_transfers: addressTx.stx_transfers.map(value => { + return { + amount: value.amount.toString(), + sender: value.sender, + recipient: value.recipient, + }; + }), + }; + this.channels.forEach(c => c.send('principalTransaction', address, result)); + } + } catch (error) { + logger.error(error); } } if (this.channels.find(c => c.hasListeners('principalStxBalance', address))) { - const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight); - const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight); - const balance: AddressStxBalanceResponse = { - balance: stxBalanceResult.balance.toString(), - total_sent: stxBalanceResult.totalSent.toString(), - total_received: stxBalanceResult.totalReceived.toString(), - total_fees_sent: stxBalanceResult.totalFeesSent.toString(), - total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(), - lock_tx_id: stxBalanceResult.lockTxId, - locked: stxBalanceResult.locked.toString(), - lock_height: stxBalanceResult.lockHeight, - burnchain_lock_height: stxBalanceResult.burnchainLockHeight, - burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight, - }; - if (tokenOfferingLocked.found) { - balance.token_offering_locked = tokenOfferingLocked.result; + try { + const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight); + const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight); + const balance: AddressStxBalanceResponse = { + balance: stxBalanceResult.balance.toString(), + total_sent: stxBalanceResult.totalSent.toString(), + total_received: stxBalanceResult.totalReceived.toString(), + total_fees_sent: stxBalanceResult.totalFeesSent.toString(), + total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(), + lock_tx_id: stxBalanceResult.lockTxId, + locked: stxBalanceResult.locked.toString(), + lock_height: stxBalanceResult.lockHeight, + burnchain_lock_height: stxBalanceResult.burnchainLockHeight, + burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight, + }; + if (tokenOfferingLocked.found) { + balance.token_offering_locked = tokenOfferingLocked.result; + } + this.channels.forEach(c => c.send('principalStxBalance', address, balance)); + } catch (error) { + logger.error(error); } - this.channels.forEach(c => c.send('principalStxBalance', address, balance)); } } } diff --git a/src/tests/ws-transmitter-tests.ts b/src/tests/ws-transmitter-tests.ts new file mode 100644 index 00000000..4a371e84 --- /dev/null +++ b/src/tests/ws-transmitter-tests.ts @@ -0,0 +1,59 @@ +import { PgWriteStore } from '../datastore/pg-write-store'; +import { cycleMigrations, runMigrations } from '../datastore/migrations'; +import { WebSocketTransmitter } from '../api/routes/ws/web-socket-transmitter'; +import { Server } from 'http'; +import { + ListenerType, + WebSocketChannel, + WebSocketPayload, + WebSocketTopics, +} from '../api/routes/ws/web-socket-channel'; + +class TestChannel extends WebSocketChannel { + connect(): void { + // + } + close(callback?: ((err?: Error | undefined) => void) | undefined): void { + // + } + send

( + payload: P, + ...args: ListenerType + ): void { + // + } + hasListeners

( + topic: P, + ...args: ListenerType + ): boolean { + return true; + } +} + +describe('ws transmitter', () => { + let db: PgWriteStore; + let transmitter: WebSocketTransmitter; + + beforeEach(async () => { + process.env.PG_DATABASE = 'postgres'; + await cycleMigrations(); + db = await PgWriteStore.connect({ usageName: 'tests', skipMigrations: true }); + }); + + test('handles pg exceptions gracefully', async () => { + const fakeServer = new Server(); + transmitter = new WebSocketTransmitter(db, fakeServer); + transmitter['channels'].push(new TestChannel(fakeServer)); + await db.close(); + await expect(transmitter['blockUpdate']('0xff')).resolves.not.toThrow(); + await expect(transmitter['microblockUpdate']('0xff')).resolves.not.toThrow(); + await expect(transmitter['txUpdate']('0xff')).resolves.not.toThrow(); + await expect(transmitter['nftEventUpdate']('0xff', 0)).resolves.not.toThrow(); + await expect(transmitter['addressUpdate']('0xff', 1)).resolves.not.toThrow(); + }); + + afterEach(async () => { + await db?.close(); + await runMigrations(undefined, 'down'); + }); +});