fix: handle pg exceptions on web socket transmitter (#1353)

This commit is contained in:
Rafael Cárdenas
2022-10-12 14:52:15 -05:00
committed by GitHub
parent f9b7ae45a8
commit 2e6448d7af
2 changed files with 176 additions and 88 deletions

View File

@@ -12,6 +12,7 @@ import { WebSocketChannel } from './web-socket-channel';
import { SocketIOChannel } from './channels/socket-io-channel';
import { WsRpcChannel } from './channels/ws-rpc-channel';
import { parseNftEvent } from '../../../datastore/helpers';
import { logger } from '../../../helpers';
/**
* This object matches real time update `WebSocketTopics` subscriptions with internal
@@ -67,128 +68,156 @@ export class WebSocketTransmitter {
private async blockUpdate(blockHash: string) {
if (this.channels.find(c => c.hasListeners('block'))) {
const blockQuery = await getBlockFromDataStore({
blockIdentifer: { hash: blockHash },
db: this.db,
});
if (blockQuery.found) {
this.channels.forEach(c => c.send('block', blockQuery.result));
try {
const blockQuery = await getBlockFromDataStore({
blockIdentifer: { hash: blockHash },
db: this.db,
});
if (blockQuery.found) {
this.channels.forEach(c => c.send('block', blockQuery.result));
}
} catch (error) {
logger.error(error);
}
}
}
private async microblockUpdate(microblockHash: string) {
if (this.channels.find(c => c.hasListeners('microblock'))) {
const microblockQuery = await getMicroblockFromDataStore({
db: this.db,
microblockHash: microblockHash,
});
if (microblockQuery.found) {
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
try {
const microblockQuery = await getMicroblockFromDataStore({
db: this.db,
microblockHash: microblockHash,
});
if (microblockQuery.found) {
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
}
} catch (error) {
logger.error(error);
}
}
}
private async txUpdate(txId: string) {
if (this.channels.find(c => c.hasListeners('mempool'))) {
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
txIds: [txId],
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
}
}
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
// Look at the `txs` table first so we always prefer the confirmed transaction.
const txQuery = await getTxFromDataStore(this.db, {
txId: txId,
includeUnanchored: true,
});
if (txQuery.found) {
this.channels.forEach(c => c.send('transaction', txQuery.result));
} else {
// Tx is not yet confirmed, look at `mempool_txs`.
try {
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
txIds: [txId],
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
}
} catch (error) {
logger.error(error);
}
}
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
try {
// Look at the `txs` table first so we always prefer the confirmed transaction.
const txQuery = await getTxFromDataStore(this.db, {
txId: txId,
includeUnanchored: true,
});
if (txQuery.found) {
this.channels.forEach(c => c.send('transaction', txQuery.result));
} else {
// Tx is not yet confirmed, look at `mempool_txs`.
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
txIds: [txId],
includeUnanchored: true,
});
if (mempoolTxs.length > 0) {
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
}
}
} catch (error) {
logger.error(error);
}
}
}
private async nftEventUpdate(txId: string, eventIndex: number) {
const nftEvent = await this.db.getNftEvent({ txId, eventIndex });
if (!nftEvent.found) {
return;
}
const assetIdentifier = nftEvent.result.asset_identifier;
const value = nftEvent.result.value;
const event = parseNftEvent(nftEvent.result);
try {
const nftEvent = await this.db.getNftEvent({ txId, eventIndex });
if (!nftEvent.found) {
return;
}
const assetIdentifier = nftEvent.result.asset_identifier;
const value = nftEvent.result.value;
const event = parseNftEvent(nftEvent.result);
if (this.channels.find(c => c.hasListeners('nftEvent'))) {
this.channels.forEach(c => c.send('nftEvent', event));
}
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
}
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
if (this.channels.find(c => c.hasListeners('nftEvent'))) {
this.channels.forEach(c => c.send('nftEvent', event));
}
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
}
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
}
} catch (error) {
logger.error(error);
}
}
private async addressUpdate(address: string, blockHeight: number) {
if (this.channels.find(c => c.hasListeners('principalTransactions', address))) {
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
stxAddress: address,
blockHeight: blockHeight,
atSingleBlock: true,
});
if (dbTxsQuery.total == 0) {
return;
}
const addressTxs = dbTxsQuery.results;
for (const addressTx of addressTxs) {
const parsedTx = parseDbTx(addressTx.tx);
const result: AddressTransactionWithTransfers = {
tx: parsedTx,
stx_sent: addressTx.stx_sent.toString(),
stx_received: addressTx.stx_received.toString(),
stx_transfers: addressTx.stx_transfers.map(value => {
return {
amount: value.amount.toString(),
sender: value.sender,
recipient: value.recipient,
};
}),
};
this.channels.forEach(c => c.send('principalTransaction', address, result));
try {
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
stxAddress: address,
blockHeight: blockHeight,
atSingleBlock: true,
});
if (dbTxsQuery.total == 0) {
return;
}
const addressTxs = dbTxsQuery.results;
for (const addressTx of addressTxs) {
const parsedTx = parseDbTx(addressTx.tx);
const result: AddressTransactionWithTransfers = {
tx: parsedTx,
stx_sent: addressTx.stx_sent.toString(),
stx_received: addressTx.stx_received.toString(),
stx_transfers: addressTx.stx_transfers.map(value => {
return {
amount: value.amount.toString(),
sender: value.sender,
recipient: value.recipient,
};
}),
};
this.channels.forEach(c => c.send('principalTransaction', address, result));
}
} catch (error) {
logger.error(error);
}
}
if (this.channels.find(c => c.hasListeners('principalStxBalance', address))) {
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
const balance: AddressStxBalanceResponse = {
balance: stxBalanceResult.balance.toString(),
total_sent: stxBalanceResult.totalSent.toString(),
total_received: stxBalanceResult.totalReceived.toString(),
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
lock_tx_id: stxBalanceResult.lockTxId,
locked: stxBalanceResult.locked.toString(),
lock_height: stxBalanceResult.lockHeight,
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
};
if (tokenOfferingLocked.found) {
balance.token_offering_locked = tokenOfferingLocked.result;
try {
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
const balance: AddressStxBalanceResponse = {
balance: stxBalanceResult.balance.toString(),
total_sent: stxBalanceResult.totalSent.toString(),
total_received: stxBalanceResult.totalReceived.toString(),
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
lock_tx_id: stxBalanceResult.lockTxId,
locked: stxBalanceResult.locked.toString(),
lock_height: stxBalanceResult.lockHeight,
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
};
if (tokenOfferingLocked.found) {
balance.token_offering_locked = tokenOfferingLocked.result;
}
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
} catch (error) {
logger.error(error);
}
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
}
}
}

View File

@@ -0,0 +1,59 @@
import { PgWriteStore } from '../datastore/pg-write-store';
import { cycleMigrations, runMigrations } from '../datastore/migrations';
import { WebSocketTransmitter } from '../api/routes/ws/web-socket-transmitter';
import { Server } from 'http';
import {
ListenerType,
WebSocketChannel,
WebSocketPayload,
WebSocketTopics,
} from '../api/routes/ws/web-socket-channel';
class TestChannel extends WebSocketChannel {
connect(): void {
//
}
close(callback?: ((err?: Error | undefined) => void) | undefined): void {
//
}
send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
): void {
//
}
hasListeners<P extends keyof WebSocketTopics>(
topic: P,
...args: ListenerType<WebSocketTopics[P]>
): boolean {
return true;
}
}
describe('ws transmitter', () => {
let db: PgWriteStore;
let transmitter: WebSocketTransmitter;
beforeEach(async () => {
process.env.PG_DATABASE = 'postgres';
await cycleMigrations();
db = await PgWriteStore.connect({ usageName: 'tests', skipMigrations: true });
});
test('handles pg exceptions gracefully', async () => {
const fakeServer = new Server();
transmitter = new WebSocketTransmitter(db, fakeServer);
transmitter['channels'].push(new TestChannel(fakeServer));
await db.close();
await expect(transmitter['blockUpdate']('0xff')).resolves.not.toThrow();
await expect(transmitter['microblockUpdate']('0xff')).resolves.not.toThrow();
await expect(transmitter['txUpdate']('0xff')).resolves.not.toThrow();
await expect(transmitter['nftEventUpdate']('0xff', 0)).resolves.not.toThrow();
await expect(transmitter['addressUpdate']('0xff', 1)).resolves.not.toThrow();
});
afterEach(async () => {
await db?.close();
await runMigrations(undefined, 'down');
});
});