diff --git a/client/src/socket-io/index.ts b/client/src/socket-io/index.ts index dac9381c..950c04c4 100644 --- a/client/src/socket-io/index.ts +++ b/client/src/socket-io/index.ts @@ -78,6 +78,14 @@ export class StacksApiSocketClient { this.handleSubscription('block', false); } + subscribeMicroblocks() { + return this.handleSubscription('microblock', true); + } + + unsubscribeMicroblocks() { + this.handleSubscription('microblock', false); + } + subscribeMempool() { return this.handleSubscription('mempool', true); } @@ -107,6 +115,7 @@ export class StacksApiSocketClient { this.socket.on('disconnect', reason => console.warn('disconnected', reason)); this.socket.on('connect_error', error => console.error('connect_error', error)); this.socket.on('block', block => console.log('block', block)); + this.socket.on('microblock', microblock => console.log('microblock', microblock)); this.socket.on('mempool', tx => console.log('mempool', tx)); this.socket.on('address-transaction', (address, data) => console.log('address-transaction', address, data) diff --git a/docs/socket-io/index.d.ts b/docs/socket-io/index.d.ts index b7bcce62..2b8494d8 100644 --- a/docs/socket-io/index.d.ts +++ b/docs/socket-io/index.d.ts @@ -1,8 +1,8 @@ -import type { AddressStxBalanceResponse, AddressTransactionWithTransfers, Block, MempoolTransaction } from '..'; +import type { AddressStxBalanceResponse, AddressTransactionWithTransfers, Block, Microblock, MempoolTransaction } from '..'; export type AddressTransactionTopic = `address-transaction:${string}`; export type AddressStxBalanceTopic = `address-stx-balance:${string}`; -export type Topic = 'block' | 'mempool' | AddressTransactionTopic | AddressStxBalanceTopic; +export type Topic = 'block' | 'microblock' | 'mempool' | AddressTransactionTopic | AddressStxBalanceTopic; export interface ClientToServerMessages { subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void; @@ -11,6 +11,7 @@ export interface ClientToServerMessages { export interface ServerToClientMessages { block: (block: Block) => void; + microblock: (microblock: Microblock) => void; mempool: (transaction: MempoolTransaction) => void; // @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797 diff --git a/src/api/routes/socket-io.ts b/src/api/routes/socket-io.ts index 51ab6e11..96dbf447 100644 --- a/src/api/routes/socket-io.ts +++ b/src/api/routes/socket-io.ts @@ -11,7 +11,12 @@ import { Topic, ServerToClientMessages, } from '@stacks/stacks-blockchain-api-types'; -import { parseDbBlock, parseDbMempoolTx, parseDbTx } from '../controllers/db-controller'; +import { + getMicroblockFromDataStore, + parseDbBlock, + parseDbMempoolTx, + parseDbTx, +} from '../controllers/db-controller'; import { isProdEnv, logError, logger } from '../../helpers'; interface SocketIOMetrics { @@ -145,6 +150,22 @@ export function createSocketIORouter(db: DataStore, server: http.Server) { } }); + db.on('microblockUpdate', async microblockHash => { + const microblockTopic: Topic = 'microblock'; + if (adapter.rooms.has(microblockTopic)) { + const microblockQuery = await getMicroblockFromDataStore({ + db: db, + microblockHash: microblockHash, + }); + if (!microblockQuery.found) { + return; + } + const microblock = microblockQuery.result; + prometheus?.sendEvent('microblock'); + io.to(microblockTopic).emit('microblock', microblock); + } + }); + db.on('txUpdate', async txId => { // Only parse and emit data if there are currently subscriptions to the mempool topic const mempoolTopic: Topic = 'mempool'; diff --git a/src/datastore/common.ts b/src/datastore/common.ts index 5cd995d4..8021dc87 100644 --- a/src/datastore/common.ts +++ b/src/datastore/common.ts @@ -364,6 +364,7 @@ export type DataStoreEventEmitter = StrictEventEmitter< microblocksAccepted: string[], microblocksStreamed: string[] ) => void; + microblockUpdate: (microblockHash: string) => void; addressUpdate: (address: string, blockHeight: number) => void; nameUpdate: (info: string) => void; tokensUpdate: (contractID: string) => void; diff --git a/src/datastore/postgres-notifier.ts b/src/datastore/postgres-notifier.ts index 8094d38d..bc03158c 100644 --- a/src/datastore/postgres-notifier.ts +++ b/src/datastore/postgres-notifier.ts @@ -13,6 +13,10 @@ export type PgBlockNotificationPayload = { microblocksStreamed: string[]; }; +export type PgMicroblockNotificationPayload = { + microblockHash: string; +}; + export type PgAddressNotificationPayload = { address: string; blockHeight: number; @@ -32,6 +36,7 @@ export type PgTokensNotificationPayload = { export type PgNotificationPayload = | PgBlockNotificationPayload + | PgMicroblockNotificationPayload | PgTxNotificationPayload | PgAddressNotificationPayload | PgTokenMetadataNotificationPayload @@ -71,6 +76,10 @@ export class PgNotifier { this.notify({ type: 'blockUpdate', payload: payload }); } + public sendMicroblock(payload: PgMicroblockNotificationPayload) { + this.notify({ type: 'microblockUpdate', payload: payload }); + } + public sendTx(payload: PgTxNotificationPayload) { this.notify({ type: 'txUpdate', payload: payload }); } diff --git a/src/datastore/postgres-store.ts b/src/datastore/postgres-store.ts index 4e953fc1..38581d1c 100644 --- a/src/datastore/postgres-store.ts +++ b/src/datastore/postgres-store.ts @@ -98,6 +98,7 @@ import { ClarityAbi } from '@stacks/transactions'; import { PgAddressNotificationPayload, PgBlockNotificationPayload, + PgMicroblockNotificationPayload, PgNameNotificationPayload, PgNotifier, PgTokenMetadataNotificationPayload, @@ -604,6 +605,9 @@ export class PgDataStore block.microblocksStreamed ); break; + case 'microblockUpdate': + const microblock = notification.payload as PgMicroblockNotificationPayload; + this.emit('microblockUpdate', microblock.microblockHash); case 'txUpdate': const tx = notification.payload as PgTxNotificationPayload; this.emit('txUpdate', tx.txId); @@ -984,6 +988,9 @@ export class PgDataStore } await this.insertMicroblockData(client, dbMicroblocks, txs); + dbMicroblocks.forEach(microblock => + this.notifier?.sendMicroblock({ microblockHash: microblock.microblock_hash }) + ); // Find any microblocks that have been orphaned by this latest microblock chain tip. // This function also checks that each microblock parent hash points to an existing microblock in the db. diff --git a/src/tests/socket-io-tests.ts b/src/tests/socket-io-tests.ts new file mode 100644 index 00000000..36ec5f28 --- /dev/null +++ b/src/tests/socket-io-tests.ts @@ -0,0 +1,188 @@ +import { io } from 'socket.io-client'; +import { ChainID } from '@stacks/common'; +import { PoolClient } from 'pg'; +import { ApiServer, startApiServer } from '../api/init'; +import { cycleMigrations, runMigrations, PgDataStore } from '../datastore/postgres-store'; +import { DbBlock, DbMicroblockPartial, DbTx, DbTxTypeId } from '../datastore/common'; +import { I32_MAX, waiter, Waiter } from '../helpers'; +import { Microblock } from '../../docs/generated'; + +describe('socket-io', () => { + let apiServer: ApiServer; + let db: PgDataStore; + let dbClient: PoolClient; + + beforeEach(async () => { + process.env.PG_DATABASE = 'postgres'; + await cycleMigrations(); + db = await PgDataStore.connect(); + dbClient = await db.pool.connect(); + apiServer = await startApiServer({ + datastore: db, + chainId: ChainID.Testnet, + httpLogLevel: 'silly', + }); + }); + + test('socket-io > microblock updates', async () => { + const addr1 = 'ST28D4Q6RCQSJ6F7TEYWQDS4N1RXYEP9YBWMYSB97'; + const addr2 = 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6'; + const block: DbBlock = { + block_hash: '0x1234', + index_block_hash: '0xdeadbeef', + parent_index_block_hash: '0x00', + parent_block_hash: '0xff0011', + parent_microblock_hash: '', + block_height: 1, + burn_block_time: 94869286, + burn_block_hash: '0x1234', + burn_block_height: 123, + miner_txid: '0x4321', + canonical: true, + parent_microblock_sequence: 0, + execution_cost_read_count: 0, + execution_cost_read_length: 0, + execution_cost_runtime: 0, + execution_cost_write_count: 0, + execution_cost_write_length: 0, + }; + const tx1: DbTx = { + tx_id: '0x01', + tx_index: 0, + anchor_mode: 3, + nonce: 0, + raw_tx: Buffer.alloc(0), + index_block_hash: block.index_block_hash, + block_hash: block.block_hash, + block_height: block.block_height, + burn_block_time: block.burn_block_time, + parent_burn_block_time: 1626122935, + type_id: DbTxTypeId.Coinbase, + status: 1, + raw_result: '0x0100000000000000000000000000000001', // u1 + canonical: true, + post_conditions: Buffer.from([0x01, 0xf5]), + fee_rate: 1234n, + sponsored: false, + sponsor_address: undefined, + sender_address: addr1, + origin_hash_mode: 1, + coinbase_payload: Buffer.from('hi'), + event_count: 1, + parent_index_block_hash: '', + parent_block_hash: '', + microblock_canonical: true, + microblock_sequence: I32_MAX, + microblock_hash: '', + execution_cost_read_count: 0, + execution_cost_read_length: 0, + execution_cost_runtime: 0, + execution_cost_write_count: 0, + execution_cost_write_length: 0, + }; + const mb1: DbMicroblockPartial = { + microblock_hash: '0xff01', + microblock_sequence: 0, + microblock_parent_hash: block.block_hash, + parent_index_block_hash: block.index_block_hash, + parent_burn_block_height: 123, + parent_burn_block_hash: '0xaa', + parent_burn_block_time: 1626122935, + }; + const mbTx1: DbTx = { + tx_id: '0x02', + tx_index: 0, + anchor_mode: 3, + nonce: 0, + raw_tx: Buffer.alloc(0), + type_id: DbTxTypeId.TokenTransfer, + status: 1, + raw_result: '0x0100000000000000000000000000000001', // u1 + canonical: true, + post_conditions: Buffer.from([0x01, 0xf5]), + fee_rate: 1234n, + sponsored: false, + sender_address: addr1, + sponsor_address: undefined, + origin_hash_mode: 1, + token_transfer_amount: 50n, + token_transfer_memo: Buffer.from('hi'), + token_transfer_recipient_address: addr2, + event_count: 1, + parent_index_block_hash: block.index_block_hash, + parent_block_hash: block.block_hash, + microblock_canonical: true, + microblock_sequence: mb1.microblock_sequence, + microblock_hash: mb1.microblock_hash, + parent_burn_block_time: mb1.parent_burn_block_time, + execution_cost_read_count: 0, + execution_cost_read_length: 0, + execution_cost_runtime: 0, + execution_cost_write_count: 0, + execution_cost_write_length: 0, + index_block_hash: '', + block_hash: '', + burn_block_time: -1, + block_height: -1, + }; + + const address = apiServer.address; + const socket = io(`http://${address}`, { query: { subscriptions: 'microblock' } }); + const updateWaiter: Waiter = waiter(); + + socket.on('microblock', microblock => { + updateWaiter.finish(microblock); + }); + await db.update({ + block: block, + microblocks: [], + minerRewards: [], + txs: [ + { + tx: tx1, + stxLockEvents: [], + stxEvents: [], + ftEvents: [], + nftEvents: [], + contractLogEvents: [], + smartContracts: [], + names: [], + namespaces: [], + }, + ], + }); + await db.updateMicroblocks({ + microblocks: [mb1], + txs: [ + { + tx: mbTx1, + stxLockEvents: [], + stxEvents: [], + ftEvents: [], + nftEvents: [], + contractLogEvents: [], + smartContracts: [], + names: [], + namespaces: [], + }, + ], + }); + + const result = await updateWaiter; + try { + expect(result.microblock_hash).toEqual('0xff01'); + expect(result.microblock_parent_hash).toEqual(block.block_hash); + expect(result.txs[0]).toEqual(mbTx1.tx_id); + } finally { + socket.emit('unsubscribe', 'microblock'); + socket.close(); + } + }); + + afterEach(async () => { + await apiServer.terminate(); + dbClient.release(); + await db?.close(); + await runMigrations(undefined, 'down'); + }); +});