fix: handle pg exceptions on web socket transmitter (#1353)

This commit is contained in:
Rafael Cárdenas
2022-10-12 14:52:15 -05:00
committed by GitHub
parent f9b7ae45a8
commit 2e6448d7af
2 changed files with 176 additions and 88 deletions

View File

@@ -12,6 +12,7 @@ import { WebSocketChannel } from './web-socket-channel';
import { SocketIOChannel } from './channels/socket-io-channel'; import { SocketIOChannel } from './channels/socket-io-channel';
import { WsRpcChannel } from './channels/ws-rpc-channel'; import { WsRpcChannel } from './channels/ws-rpc-channel';
import { parseNftEvent } from '../../../datastore/helpers'; import { parseNftEvent } from '../../../datastore/helpers';
import { logger } from '../../../helpers';
/** /**
* This object matches real time update `WebSocketTopics` subscriptions with internal * This object matches real time update `WebSocketTopics` subscriptions with internal
@@ -67,128 +68,156 @@ export class WebSocketTransmitter {
private async blockUpdate(blockHash: string) { private async blockUpdate(blockHash: string) {
if (this.channels.find(c => c.hasListeners('block'))) { if (this.channels.find(c => c.hasListeners('block'))) {
const blockQuery = await getBlockFromDataStore({ try {
blockIdentifer: { hash: blockHash }, const blockQuery = await getBlockFromDataStore({
db: this.db, blockIdentifer: { hash: blockHash },
}); db: this.db,
if (blockQuery.found) { });
this.channels.forEach(c => c.send('block', blockQuery.result)); if (blockQuery.found) {
this.channels.forEach(c => c.send('block', blockQuery.result));
}
} catch (error) {
logger.error(error);
} }
} }
} }
private async microblockUpdate(microblockHash: string) { private async microblockUpdate(microblockHash: string) {
if (this.channels.find(c => c.hasListeners('microblock'))) { if (this.channels.find(c => c.hasListeners('microblock'))) {
const microblockQuery = await getMicroblockFromDataStore({ try {
db: this.db, const microblockQuery = await getMicroblockFromDataStore({
microblockHash: microblockHash, db: this.db,
}); microblockHash: microblockHash,
if (microblockQuery.found) { });
this.channels.forEach(c => c.send('microblock', microblockQuery.result)); if (microblockQuery.found) {
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
}
} catch (error) {
logger.error(error);
} }
} }
} }
private async txUpdate(txId: string) { private async txUpdate(txId: string) {
if (this.channels.find(c => c.hasListeners('mempool'))) { if (this.channels.find(c => c.hasListeners('mempool'))) {
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, { try {
txIds: [txId],
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
}
}
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
// Look at the `txs` table first so we always prefer the confirmed transaction.
const txQuery = await getTxFromDataStore(this.db, {
txId: txId,
includeUnanchored: true,
});
if (txQuery.found) {
this.channels.forEach(c => c.send('transaction', txQuery.result));
} else {
// Tx is not yet confirmed, look at `mempool_txs`.
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, { const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
txIds: [txId], txIds: [txId],
includeUnanchored: true, includeUnanchored: true,
}); });
if (mempoolTxs.length > 0) { if (mempoolTxs.length > 0) {
this.channels.forEach(c => c.send('transaction', mempoolTxs[0])); this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
} }
} catch (error) {
logger.error(error);
}
}
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
try {
// Look at the `txs` table first so we always prefer the confirmed transaction.
const txQuery = await getTxFromDataStore(this.db, {
txId: txId,
includeUnanchored: true,
});
if (txQuery.found) {
this.channels.forEach(c => c.send('transaction', txQuery.result));
} else {
// Tx is not yet confirmed, look at `mempool_txs`.
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
txIds: [txId],
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
}
}
} catch (error) {
logger.error(error);
} }
} }
} }
private async nftEventUpdate(txId: string, eventIndex: number) { private async nftEventUpdate(txId: string, eventIndex: number) {
const nftEvent = await this.db.getNftEvent({ txId, eventIndex }); try {
if (!nftEvent.found) { const nftEvent = await this.db.getNftEvent({ txId, eventIndex });
return; if (!nftEvent.found) {
} return;
const assetIdentifier = nftEvent.result.asset_identifier; }
const value = nftEvent.result.value; const assetIdentifier = nftEvent.result.asset_identifier;
const event = parseNftEvent(nftEvent.result); const value = nftEvent.result.value;
const event = parseNftEvent(nftEvent.result);
if (this.channels.find(c => c.hasListeners('nftEvent'))) { if (this.channels.find(c => c.hasListeners('nftEvent'))) {
this.channels.forEach(c => c.send('nftEvent', event)); this.channels.forEach(c => c.send('nftEvent', event));
} }
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) { if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event)); this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
} }
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) { if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event)); this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
}
} catch (error) {
logger.error(error);
} }
} }
private async addressUpdate(address: string, blockHeight: number) { private async addressUpdate(address: string, blockHeight: number) {
if (this.channels.find(c => c.hasListeners('principalTransactions', address))) { if (this.channels.find(c => c.hasListeners('principalTransactions', address))) {
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({ try {
stxAddress: address, const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
blockHeight: blockHeight, stxAddress: address,
atSingleBlock: true, blockHeight: blockHeight,
}); atSingleBlock: true,
if (dbTxsQuery.total == 0) { });
return; if (dbTxsQuery.total == 0) {
} return;
const addressTxs = dbTxsQuery.results; }
for (const addressTx of addressTxs) { const addressTxs = dbTxsQuery.results;
const parsedTx = parseDbTx(addressTx.tx); for (const addressTx of addressTxs) {
const result: AddressTransactionWithTransfers = { const parsedTx = parseDbTx(addressTx.tx);
tx: parsedTx, const result: AddressTransactionWithTransfers = {
stx_sent: addressTx.stx_sent.toString(), tx: parsedTx,
stx_received: addressTx.stx_received.toString(), stx_sent: addressTx.stx_sent.toString(),
stx_transfers: addressTx.stx_transfers.map(value => { stx_received: addressTx.stx_received.toString(),
return { stx_transfers: addressTx.stx_transfers.map(value => {
amount: value.amount.toString(), return {
sender: value.sender, amount: value.amount.toString(),
recipient: value.recipient, sender: value.sender,
}; recipient: value.recipient,
}), };
}; }),
this.channels.forEach(c => c.send('principalTransaction', address, result)); };
this.channels.forEach(c => c.send('principalTransaction', address, result));
}
} catch (error) {
logger.error(error);
} }
} }
if (this.channels.find(c => c.hasListeners('principalStxBalance', address))) { if (this.channels.find(c => c.hasListeners('principalStxBalance', address))) {
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight); try {
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight); const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
const balance: AddressStxBalanceResponse = { const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
balance: stxBalanceResult.balance.toString(), const balance: AddressStxBalanceResponse = {
total_sent: stxBalanceResult.totalSent.toString(), balance: stxBalanceResult.balance.toString(),
total_received: stxBalanceResult.totalReceived.toString(), total_sent: stxBalanceResult.totalSent.toString(),
total_fees_sent: stxBalanceResult.totalFeesSent.toString(), total_received: stxBalanceResult.totalReceived.toString(),
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(), total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
lock_tx_id: stxBalanceResult.lockTxId, total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
locked: stxBalanceResult.locked.toString(), lock_tx_id: stxBalanceResult.lockTxId,
lock_height: stxBalanceResult.lockHeight, locked: stxBalanceResult.locked.toString(),
burnchain_lock_height: stxBalanceResult.burnchainLockHeight, lock_height: stxBalanceResult.lockHeight,
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight, burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
}; burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
if (tokenOfferingLocked.found) { };
balance.token_offering_locked = tokenOfferingLocked.result; if (tokenOfferingLocked.found) {
balance.token_offering_locked = tokenOfferingLocked.result;
}
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
} catch (error) {
logger.error(error);
} }
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
} }
} }
} }

View File

@@ -0,0 +1,59 @@
import { PgWriteStore } from '../datastore/pg-write-store';
import { cycleMigrations, runMigrations } from '../datastore/migrations';
import { WebSocketTransmitter } from '../api/routes/ws/web-socket-transmitter';
import { Server } from 'http';
import {
ListenerType,
WebSocketChannel,
WebSocketPayload,
WebSocketTopics,
} from '../api/routes/ws/web-socket-channel';
class TestChannel extends WebSocketChannel {
connect(): void {
//
}
close(callback?: ((err?: Error | undefined) => void) | undefined): void {
//
}
send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
): void {
//
}
hasListeners<P extends keyof WebSocketTopics>(
topic: P,
...args: ListenerType<WebSocketTopics[P]>
): boolean {
return true;
}
}
describe('ws transmitter', () => {
let db: PgWriteStore;
let transmitter: WebSocketTransmitter;
beforeEach(async () => {
process.env.PG_DATABASE = 'postgres';
await cycleMigrations();
db = await PgWriteStore.connect({ usageName: 'tests', skipMigrations: true });
});
test('handles pg exceptions gracefully', async () => {
const fakeServer = new Server();
transmitter = new WebSocketTransmitter(db, fakeServer);
transmitter['channels'].push(new TestChannel(fakeServer));
await db.close();
await expect(transmitter['blockUpdate']('0xff')).resolves.not.toThrow();
await expect(transmitter['microblockUpdate']('0xff')).resolves.not.toThrow();
await expect(transmitter['txUpdate']('0xff')).resolves.not.toThrow();
await expect(transmitter['nftEventUpdate']('0xff', 0)).resolves.not.toThrow();
await expect(transmitter['addressUpdate']('0xff', 1)).resolves.not.toThrow();
});
afterEach(async () => {
await db?.close();
await runMigrations(undefined, 'down');
});
});