mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 22:43:34 +08:00
fix: handle pg exceptions on web socket transmitter (#1353)
This commit is contained in:
@@ -12,6 +12,7 @@ import { WebSocketChannel } from './web-socket-channel';
|
||||
import { SocketIOChannel } from './channels/socket-io-channel';
|
||||
import { WsRpcChannel } from './channels/ws-rpc-channel';
|
||||
import { parseNftEvent } from '../../../datastore/helpers';
|
||||
import { logger } from '../../../helpers';
|
||||
|
||||
/**
|
||||
* This object matches real time update `WebSocketTopics` subscriptions with internal
|
||||
@@ -67,128 +68,156 @@ export class WebSocketTransmitter {
|
||||
|
||||
private async blockUpdate(blockHash: string) {
|
||||
if (this.channels.find(c => c.hasListeners('block'))) {
|
||||
const blockQuery = await getBlockFromDataStore({
|
||||
blockIdentifer: { hash: blockHash },
|
||||
db: this.db,
|
||||
});
|
||||
if (blockQuery.found) {
|
||||
this.channels.forEach(c => c.send('block', blockQuery.result));
|
||||
try {
|
||||
const blockQuery = await getBlockFromDataStore({
|
||||
blockIdentifer: { hash: blockHash },
|
||||
db: this.db,
|
||||
});
|
||||
if (blockQuery.found) {
|
||||
this.channels.forEach(c => c.send('block', blockQuery.result));
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async microblockUpdate(microblockHash: string) {
|
||||
if (this.channels.find(c => c.hasListeners('microblock'))) {
|
||||
const microblockQuery = await getMicroblockFromDataStore({
|
||||
db: this.db,
|
||||
microblockHash: microblockHash,
|
||||
});
|
||||
if (microblockQuery.found) {
|
||||
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
|
||||
try {
|
||||
const microblockQuery = await getMicroblockFromDataStore({
|
||||
db: this.db,
|
||||
microblockHash: microblockHash,
|
||||
});
|
||||
if (microblockQuery.found) {
|
||||
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async txUpdate(txId: string) {
|
||||
if (this.channels.find(c => c.hasListeners('mempool'))) {
|
||||
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
|
||||
txIds: [txId],
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (mempoolTxs.length > 0) {
|
||||
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
|
||||
}
|
||||
}
|
||||
|
||||
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
|
||||
// Look at the `txs` table first so we always prefer the confirmed transaction.
|
||||
const txQuery = await getTxFromDataStore(this.db, {
|
||||
txId: txId,
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (txQuery.found) {
|
||||
this.channels.forEach(c => c.send('transaction', txQuery.result));
|
||||
} else {
|
||||
// Tx is not yet confirmed, look at `mempool_txs`.
|
||||
try {
|
||||
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
|
||||
txIds: [txId],
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (mempoolTxs.length > 0) {
|
||||
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
|
||||
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
|
||||
try {
|
||||
// Look at the `txs` table first so we always prefer the confirmed transaction.
|
||||
const txQuery = await getTxFromDataStore(this.db, {
|
||||
txId: txId,
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (txQuery.found) {
|
||||
this.channels.forEach(c => c.send('transaction', txQuery.result));
|
||||
} else {
|
||||
// Tx is not yet confirmed, look at `mempool_txs`.
|
||||
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
|
||||
txIds: [txId],
|
||||
includeUnanchored: true,
|
||||
});
|
||||
if (mempoolTxs.length > 0) {
|
||||
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async nftEventUpdate(txId: string, eventIndex: number) {
|
||||
const nftEvent = await this.db.getNftEvent({ txId, eventIndex });
|
||||
if (!nftEvent.found) {
|
||||
return;
|
||||
}
|
||||
const assetIdentifier = nftEvent.result.asset_identifier;
|
||||
const value = nftEvent.result.value;
|
||||
const event = parseNftEvent(nftEvent.result);
|
||||
try {
|
||||
const nftEvent = await this.db.getNftEvent({ txId, eventIndex });
|
||||
if (!nftEvent.found) {
|
||||
return;
|
||||
}
|
||||
const assetIdentifier = nftEvent.result.asset_identifier;
|
||||
const value = nftEvent.result.value;
|
||||
const event = parseNftEvent(nftEvent.result);
|
||||
|
||||
if (this.channels.find(c => c.hasListeners('nftEvent'))) {
|
||||
this.channels.forEach(c => c.send('nftEvent', event));
|
||||
}
|
||||
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
|
||||
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
|
||||
}
|
||||
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
|
||||
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
|
||||
if (this.channels.find(c => c.hasListeners('nftEvent'))) {
|
||||
this.channels.forEach(c => c.send('nftEvent', event));
|
||||
}
|
||||
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
|
||||
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
|
||||
}
|
||||
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
|
||||
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
}
|
||||
}
|
||||
|
||||
private async addressUpdate(address: string, blockHeight: number) {
|
||||
if (this.channels.find(c => c.hasListeners('principalTransactions', address))) {
|
||||
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
|
||||
stxAddress: address,
|
||||
blockHeight: blockHeight,
|
||||
atSingleBlock: true,
|
||||
});
|
||||
if (dbTxsQuery.total == 0) {
|
||||
return;
|
||||
}
|
||||
const addressTxs = dbTxsQuery.results;
|
||||
for (const addressTx of addressTxs) {
|
||||
const parsedTx = parseDbTx(addressTx.tx);
|
||||
const result: AddressTransactionWithTransfers = {
|
||||
tx: parsedTx,
|
||||
stx_sent: addressTx.stx_sent.toString(),
|
||||
stx_received: addressTx.stx_received.toString(),
|
||||
stx_transfers: addressTx.stx_transfers.map(value => {
|
||||
return {
|
||||
amount: value.amount.toString(),
|
||||
sender: value.sender,
|
||||
recipient: value.recipient,
|
||||
};
|
||||
}),
|
||||
};
|
||||
this.channels.forEach(c => c.send('principalTransaction', address, result));
|
||||
try {
|
||||
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
|
||||
stxAddress: address,
|
||||
blockHeight: blockHeight,
|
||||
atSingleBlock: true,
|
||||
});
|
||||
if (dbTxsQuery.total == 0) {
|
||||
return;
|
||||
}
|
||||
const addressTxs = dbTxsQuery.results;
|
||||
for (const addressTx of addressTxs) {
|
||||
const parsedTx = parseDbTx(addressTx.tx);
|
||||
const result: AddressTransactionWithTransfers = {
|
||||
tx: parsedTx,
|
||||
stx_sent: addressTx.stx_sent.toString(),
|
||||
stx_received: addressTx.stx_received.toString(),
|
||||
stx_transfers: addressTx.stx_transfers.map(value => {
|
||||
return {
|
||||
amount: value.amount.toString(),
|
||||
sender: value.sender,
|
||||
recipient: value.recipient,
|
||||
};
|
||||
}),
|
||||
};
|
||||
this.channels.forEach(c => c.send('principalTransaction', address, result));
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.channels.find(c => c.hasListeners('principalStxBalance', address))) {
|
||||
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
|
||||
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
|
||||
const balance: AddressStxBalanceResponse = {
|
||||
balance: stxBalanceResult.balance.toString(),
|
||||
total_sent: stxBalanceResult.totalSent.toString(),
|
||||
total_received: stxBalanceResult.totalReceived.toString(),
|
||||
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
|
||||
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
|
||||
lock_tx_id: stxBalanceResult.lockTxId,
|
||||
locked: stxBalanceResult.locked.toString(),
|
||||
lock_height: stxBalanceResult.lockHeight,
|
||||
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
|
||||
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
|
||||
};
|
||||
if (tokenOfferingLocked.found) {
|
||||
balance.token_offering_locked = tokenOfferingLocked.result;
|
||||
try {
|
||||
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
|
||||
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
|
||||
const balance: AddressStxBalanceResponse = {
|
||||
balance: stxBalanceResult.balance.toString(),
|
||||
total_sent: stxBalanceResult.totalSent.toString(),
|
||||
total_received: stxBalanceResult.totalReceived.toString(),
|
||||
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
|
||||
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
|
||||
lock_tx_id: stxBalanceResult.lockTxId,
|
||||
locked: stxBalanceResult.locked.toString(),
|
||||
lock_height: stxBalanceResult.lockHeight,
|
||||
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
|
||||
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
|
||||
};
|
||||
if (tokenOfferingLocked.found) {
|
||||
balance.token_offering_locked = tokenOfferingLocked.result;
|
||||
}
|
||||
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
}
|
||||
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
59
src/tests/ws-transmitter-tests.ts
Normal file
59
src/tests/ws-transmitter-tests.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import { PgWriteStore } from '../datastore/pg-write-store';
|
||||
import { cycleMigrations, runMigrations } from '../datastore/migrations';
|
||||
import { WebSocketTransmitter } from '../api/routes/ws/web-socket-transmitter';
|
||||
import { Server } from 'http';
|
||||
import {
|
||||
ListenerType,
|
||||
WebSocketChannel,
|
||||
WebSocketPayload,
|
||||
WebSocketTopics,
|
||||
} from '../api/routes/ws/web-socket-channel';
|
||||
|
||||
class TestChannel extends WebSocketChannel {
|
||||
connect(): void {
|
||||
//
|
||||
}
|
||||
close(callback?: ((err?: Error | undefined) => void) | undefined): void {
|
||||
//
|
||||
}
|
||||
send<P extends keyof WebSocketPayload>(
|
||||
payload: P,
|
||||
...args: ListenerType<WebSocketPayload[P]>
|
||||
): void {
|
||||
//
|
||||
}
|
||||
hasListeners<P extends keyof WebSocketTopics>(
|
||||
topic: P,
|
||||
...args: ListenerType<WebSocketTopics[P]>
|
||||
): boolean {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
describe('ws transmitter', () => {
|
||||
let db: PgWriteStore;
|
||||
let transmitter: WebSocketTransmitter;
|
||||
|
||||
beforeEach(async () => {
|
||||
process.env.PG_DATABASE = 'postgres';
|
||||
await cycleMigrations();
|
||||
db = await PgWriteStore.connect({ usageName: 'tests', skipMigrations: true });
|
||||
});
|
||||
|
||||
test('handles pg exceptions gracefully', async () => {
|
||||
const fakeServer = new Server();
|
||||
transmitter = new WebSocketTransmitter(db, fakeServer);
|
||||
transmitter['channels'].push(new TestChannel(fakeServer));
|
||||
await db.close();
|
||||
await expect(transmitter['blockUpdate']('0xff')).resolves.not.toThrow();
|
||||
await expect(transmitter['microblockUpdate']('0xff')).resolves.not.toThrow();
|
||||
await expect(transmitter['txUpdate']('0xff')).resolves.not.toThrow();
|
||||
await expect(transmitter['nftEventUpdate']('0xff', 0)).resolves.not.toThrow();
|
||||
await expect(transmitter['addressUpdate']('0xff', 1)).resolves.not.toThrow();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await db?.close();
|
||||
await runMigrations(undefined, 'down');
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user