mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-04-28 21:05:36 +08:00
fix: handle pg exceptions on web socket transmitter (#1353)
This commit is contained in:
@@ -12,6 +12,7 @@ import { WebSocketChannel } from './web-socket-channel';
|
|||||||
import { SocketIOChannel } from './channels/socket-io-channel';
|
import { SocketIOChannel } from './channels/socket-io-channel';
|
||||||
import { WsRpcChannel } from './channels/ws-rpc-channel';
|
import { WsRpcChannel } from './channels/ws-rpc-channel';
|
||||||
import { parseNftEvent } from '../../../datastore/helpers';
|
import { parseNftEvent } from '../../../datastore/helpers';
|
||||||
|
import { logger } from '../../../helpers';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This object matches real time update `WebSocketTopics` subscriptions with internal
|
* This object matches real time update `WebSocketTopics` subscriptions with internal
|
||||||
@@ -67,128 +68,156 @@ export class WebSocketTransmitter {
|
|||||||
|
|
||||||
private async blockUpdate(blockHash: string) {
|
private async blockUpdate(blockHash: string) {
|
||||||
if (this.channels.find(c => c.hasListeners('block'))) {
|
if (this.channels.find(c => c.hasListeners('block'))) {
|
||||||
const blockQuery = await getBlockFromDataStore({
|
try {
|
||||||
blockIdentifer: { hash: blockHash },
|
const blockQuery = await getBlockFromDataStore({
|
||||||
db: this.db,
|
blockIdentifer: { hash: blockHash },
|
||||||
});
|
db: this.db,
|
||||||
if (blockQuery.found) {
|
});
|
||||||
this.channels.forEach(c => c.send('block', blockQuery.result));
|
if (blockQuery.found) {
|
||||||
|
this.channels.forEach(c => c.send('block', blockQuery.result));
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async microblockUpdate(microblockHash: string) {
|
private async microblockUpdate(microblockHash: string) {
|
||||||
if (this.channels.find(c => c.hasListeners('microblock'))) {
|
if (this.channels.find(c => c.hasListeners('microblock'))) {
|
||||||
const microblockQuery = await getMicroblockFromDataStore({
|
try {
|
||||||
db: this.db,
|
const microblockQuery = await getMicroblockFromDataStore({
|
||||||
microblockHash: microblockHash,
|
db: this.db,
|
||||||
});
|
microblockHash: microblockHash,
|
||||||
if (microblockQuery.found) {
|
});
|
||||||
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
|
if (microblockQuery.found) {
|
||||||
|
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async txUpdate(txId: string) {
|
private async txUpdate(txId: string) {
|
||||||
if (this.channels.find(c => c.hasListeners('mempool'))) {
|
if (this.channels.find(c => c.hasListeners('mempool'))) {
|
||||||
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
|
try {
|
||||||
txIds: [txId],
|
|
||||||
includeUnanchored: true,
|
|
||||||
});
|
|
||||||
if (mempoolTxs.length > 0) {
|
|
||||||
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
|
|
||||||
// Look at the `txs` table first so we always prefer the confirmed transaction.
|
|
||||||
const txQuery = await getTxFromDataStore(this.db, {
|
|
||||||
txId: txId,
|
|
||||||
includeUnanchored: true,
|
|
||||||
});
|
|
||||||
if (txQuery.found) {
|
|
||||||
this.channels.forEach(c => c.send('transaction', txQuery.result));
|
|
||||||
} else {
|
|
||||||
// Tx is not yet confirmed, look at `mempool_txs`.
|
|
||||||
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
|
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
|
||||||
txIds: [txId],
|
txIds: [txId],
|
||||||
includeUnanchored: true,
|
includeUnanchored: true,
|
||||||
});
|
});
|
||||||
if (mempoolTxs.length > 0) {
|
if (mempoolTxs.length > 0) {
|
||||||
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
|
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
|
||||||
}
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
|
||||||
|
try {
|
||||||
|
// Look at the `txs` table first so we always prefer the confirmed transaction.
|
||||||
|
const txQuery = await getTxFromDataStore(this.db, {
|
||||||
|
txId: txId,
|
||||||
|
includeUnanchored: true,
|
||||||
|
});
|
||||||
|
if (txQuery.found) {
|
||||||
|
this.channels.forEach(c => c.send('transaction', txQuery.result));
|
||||||
|
} else {
|
||||||
|
// Tx is not yet confirmed, look at `mempool_txs`.
|
||||||
|
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
|
||||||
|
txIds: [txId],
|
||||||
|
includeUnanchored: true,
|
||||||
|
});
|
||||||
|
if (mempoolTxs.length > 0) {
|
||||||
|
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async nftEventUpdate(txId: string, eventIndex: number) {
|
private async nftEventUpdate(txId: string, eventIndex: number) {
|
||||||
const nftEvent = await this.db.getNftEvent({ txId, eventIndex });
|
try {
|
||||||
if (!nftEvent.found) {
|
const nftEvent = await this.db.getNftEvent({ txId, eventIndex });
|
||||||
return;
|
if (!nftEvent.found) {
|
||||||
}
|
return;
|
||||||
const assetIdentifier = nftEvent.result.asset_identifier;
|
}
|
||||||
const value = nftEvent.result.value;
|
const assetIdentifier = nftEvent.result.asset_identifier;
|
||||||
const event = parseNftEvent(nftEvent.result);
|
const value = nftEvent.result.value;
|
||||||
|
const event = parseNftEvent(nftEvent.result);
|
||||||
|
|
||||||
if (this.channels.find(c => c.hasListeners('nftEvent'))) {
|
if (this.channels.find(c => c.hasListeners('nftEvent'))) {
|
||||||
this.channels.forEach(c => c.send('nftEvent', event));
|
this.channels.forEach(c => c.send('nftEvent', event));
|
||||||
}
|
}
|
||||||
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
|
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
|
||||||
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
|
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
|
||||||
}
|
}
|
||||||
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
|
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
|
||||||
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
|
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async addressUpdate(address: string, blockHeight: number) {
|
private async addressUpdate(address: string, blockHeight: number) {
|
||||||
if (this.channels.find(c => c.hasListeners('principalTransactions', address))) {
|
if (this.channels.find(c => c.hasListeners('principalTransactions', address))) {
|
||||||
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
|
try {
|
||||||
stxAddress: address,
|
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
|
||||||
blockHeight: blockHeight,
|
stxAddress: address,
|
||||||
atSingleBlock: true,
|
blockHeight: blockHeight,
|
||||||
});
|
atSingleBlock: true,
|
||||||
if (dbTxsQuery.total == 0) {
|
});
|
||||||
return;
|
if (dbTxsQuery.total == 0) {
|
||||||
}
|
return;
|
||||||
const addressTxs = dbTxsQuery.results;
|
}
|
||||||
for (const addressTx of addressTxs) {
|
const addressTxs = dbTxsQuery.results;
|
||||||
const parsedTx = parseDbTx(addressTx.tx);
|
for (const addressTx of addressTxs) {
|
||||||
const result: AddressTransactionWithTransfers = {
|
const parsedTx = parseDbTx(addressTx.tx);
|
||||||
tx: parsedTx,
|
const result: AddressTransactionWithTransfers = {
|
||||||
stx_sent: addressTx.stx_sent.toString(),
|
tx: parsedTx,
|
||||||
stx_received: addressTx.stx_received.toString(),
|
stx_sent: addressTx.stx_sent.toString(),
|
||||||
stx_transfers: addressTx.stx_transfers.map(value => {
|
stx_received: addressTx.stx_received.toString(),
|
||||||
return {
|
stx_transfers: addressTx.stx_transfers.map(value => {
|
||||||
amount: value.amount.toString(),
|
return {
|
||||||
sender: value.sender,
|
amount: value.amount.toString(),
|
||||||
recipient: value.recipient,
|
sender: value.sender,
|
||||||
};
|
recipient: value.recipient,
|
||||||
}),
|
};
|
||||||
};
|
}),
|
||||||
this.channels.forEach(c => c.send('principalTransaction', address, result));
|
};
|
||||||
|
this.channels.forEach(c => c.send('principalTransaction', address, result));
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.channels.find(c => c.hasListeners('principalStxBalance', address))) {
|
if (this.channels.find(c => c.hasListeners('principalStxBalance', address))) {
|
||||||
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
|
try {
|
||||||
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
|
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
|
||||||
const balance: AddressStxBalanceResponse = {
|
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
|
||||||
balance: stxBalanceResult.balance.toString(),
|
const balance: AddressStxBalanceResponse = {
|
||||||
total_sent: stxBalanceResult.totalSent.toString(),
|
balance: stxBalanceResult.balance.toString(),
|
||||||
total_received: stxBalanceResult.totalReceived.toString(),
|
total_sent: stxBalanceResult.totalSent.toString(),
|
||||||
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
|
total_received: stxBalanceResult.totalReceived.toString(),
|
||||||
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
|
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
|
||||||
lock_tx_id: stxBalanceResult.lockTxId,
|
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
|
||||||
locked: stxBalanceResult.locked.toString(),
|
lock_tx_id: stxBalanceResult.lockTxId,
|
||||||
lock_height: stxBalanceResult.lockHeight,
|
locked: stxBalanceResult.locked.toString(),
|
||||||
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
|
lock_height: stxBalanceResult.lockHeight,
|
||||||
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
|
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
|
||||||
};
|
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
|
||||||
if (tokenOfferingLocked.found) {
|
};
|
||||||
balance.token_offering_locked = tokenOfferingLocked.result;
|
if (tokenOfferingLocked.found) {
|
||||||
|
balance.token_offering_locked = tokenOfferingLocked.result;
|
||||||
|
}
|
||||||
|
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error);
|
||||||
}
|
}
|
||||||
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
59
src/tests/ws-transmitter-tests.ts
Normal file
59
src/tests/ws-transmitter-tests.ts
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
import { PgWriteStore } from '../datastore/pg-write-store';
|
||||||
|
import { cycleMigrations, runMigrations } from '../datastore/migrations';
|
||||||
|
import { WebSocketTransmitter } from '../api/routes/ws/web-socket-transmitter';
|
||||||
|
import { Server } from 'http';
|
||||||
|
import {
|
||||||
|
ListenerType,
|
||||||
|
WebSocketChannel,
|
||||||
|
WebSocketPayload,
|
||||||
|
WebSocketTopics,
|
||||||
|
} from '../api/routes/ws/web-socket-channel';
|
||||||
|
|
||||||
|
class TestChannel extends WebSocketChannel {
|
||||||
|
connect(): void {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
close(callback?: ((err?: Error | undefined) => void) | undefined): void {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
send<P extends keyof WebSocketPayload>(
|
||||||
|
payload: P,
|
||||||
|
...args: ListenerType<WebSocketPayload[P]>
|
||||||
|
): void {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
hasListeners<P extends keyof WebSocketTopics>(
|
||||||
|
topic: P,
|
||||||
|
...args: ListenerType<WebSocketTopics[P]>
|
||||||
|
): boolean {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('ws transmitter', () => {
|
||||||
|
let db: PgWriteStore;
|
||||||
|
let transmitter: WebSocketTransmitter;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
process.env.PG_DATABASE = 'postgres';
|
||||||
|
await cycleMigrations();
|
||||||
|
db = await PgWriteStore.connect({ usageName: 'tests', skipMigrations: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
test('handles pg exceptions gracefully', async () => {
|
||||||
|
const fakeServer = new Server();
|
||||||
|
transmitter = new WebSocketTransmitter(db, fakeServer);
|
||||||
|
transmitter['channels'].push(new TestChannel(fakeServer));
|
||||||
|
await db.close();
|
||||||
|
await expect(transmitter['blockUpdate']('0xff')).resolves.not.toThrow();
|
||||||
|
await expect(transmitter['microblockUpdate']('0xff')).resolves.not.toThrow();
|
||||||
|
await expect(transmitter['txUpdate']('0xff')).resolves.not.toThrow();
|
||||||
|
await expect(transmitter['nftEventUpdate']('0xff', 0)).resolves.not.toThrow();
|
||||||
|
await expect(transmitter['addressUpdate']('0xff', 1)).resolves.not.toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
await db?.close();
|
||||||
|
await runMigrations(undefined, 'down');
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user