mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 22:43:34 +08:00
feat: add microblock update support to socket-io
* feat: add microblockUpdate to socket topics * chore: start adding tests * chore: finish socket-io test
This commit is contained in:
@@ -78,6 +78,14 @@ export class StacksApiSocketClient {
|
||||
this.handleSubscription('block', false);
|
||||
}
|
||||
|
||||
subscribeMicroblocks() {
|
||||
return this.handleSubscription('microblock', true);
|
||||
}
|
||||
|
||||
unsubscribeMicroblocks() {
|
||||
this.handleSubscription('microblock', false);
|
||||
}
|
||||
|
||||
subscribeMempool() {
|
||||
return this.handleSubscription('mempool', true);
|
||||
}
|
||||
@@ -107,6 +115,7 @@ export class StacksApiSocketClient {
|
||||
this.socket.on('disconnect', reason => console.warn('disconnected', reason));
|
||||
this.socket.on('connect_error', error => console.error('connect_error', error));
|
||||
this.socket.on('block', block => console.log('block', block));
|
||||
this.socket.on('microblock', microblock => console.log('microblock', microblock));
|
||||
this.socket.on('mempool', tx => console.log('mempool', tx));
|
||||
this.socket.on('address-transaction', (address, data) =>
|
||||
console.log('address-transaction', address, data)
|
||||
|
||||
5
docs/socket-io/index.d.ts
vendored
5
docs/socket-io/index.d.ts
vendored
@@ -1,8 +1,8 @@
|
||||
import type { AddressStxBalanceResponse, AddressTransactionWithTransfers, Block, MempoolTransaction } from '..';
|
||||
import type { AddressStxBalanceResponse, AddressTransactionWithTransfers, Block, Microblock, MempoolTransaction } from '..';
|
||||
|
||||
export type AddressTransactionTopic = `address-transaction:${string}`;
|
||||
export type AddressStxBalanceTopic = `address-stx-balance:${string}`;
|
||||
export type Topic = 'block' | 'mempool' | AddressTransactionTopic | AddressStxBalanceTopic;
|
||||
export type Topic = 'block' | 'microblock' | 'mempool' | AddressTransactionTopic | AddressStxBalanceTopic;
|
||||
|
||||
export interface ClientToServerMessages {
|
||||
subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void;
|
||||
@@ -11,6 +11,7 @@ export interface ClientToServerMessages {
|
||||
|
||||
export interface ServerToClientMessages {
|
||||
block: (block: Block) => void;
|
||||
microblock: (microblock: Microblock) => void;
|
||||
mempool: (transaction: MempoolTransaction) => void;
|
||||
|
||||
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797
|
||||
|
||||
@@ -11,7 +11,12 @@ import {
|
||||
Topic,
|
||||
ServerToClientMessages,
|
||||
} from '@stacks/stacks-blockchain-api-types';
|
||||
import { parseDbBlock, parseDbMempoolTx, parseDbTx } from '../controllers/db-controller';
|
||||
import {
|
||||
getMicroblockFromDataStore,
|
||||
parseDbBlock,
|
||||
parseDbMempoolTx,
|
||||
parseDbTx,
|
||||
} from '../controllers/db-controller';
|
||||
import { isProdEnv, logError, logger } from '../../helpers';
|
||||
|
||||
interface SocketIOMetrics {
|
||||
@@ -145,6 +150,22 @@ export function createSocketIORouter(db: DataStore, server: http.Server) {
|
||||
}
|
||||
});
|
||||
|
||||
db.on('microblockUpdate', async microblockHash => {
|
||||
const microblockTopic: Topic = 'microblock';
|
||||
if (adapter.rooms.has(microblockTopic)) {
|
||||
const microblockQuery = await getMicroblockFromDataStore({
|
||||
db: db,
|
||||
microblockHash: microblockHash,
|
||||
});
|
||||
if (!microblockQuery.found) {
|
||||
return;
|
||||
}
|
||||
const microblock = microblockQuery.result;
|
||||
prometheus?.sendEvent('microblock');
|
||||
io.to(microblockTopic).emit('microblock', microblock);
|
||||
}
|
||||
});
|
||||
|
||||
db.on('txUpdate', async txId => {
|
||||
// Only parse and emit data if there are currently subscriptions to the mempool topic
|
||||
const mempoolTopic: Topic = 'mempool';
|
||||
|
||||
@@ -364,6 +364,7 @@ export type DataStoreEventEmitter = StrictEventEmitter<
|
||||
microblocksAccepted: string[],
|
||||
microblocksStreamed: string[]
|
||||
) => void;
|
||||
microblockUpdate: (microblockHash: string) => void;
|
||||
addressUpdate: (address: string, blockHeight: number) => void;
|
||||
nameUpdate: (info: string) => void;
|
||||
tokensUpdate: (contractID: string) => void;
|
||||
|
||||
@@ -13,6 +13,10 @@ export type PgBlockNotificationPayload = {
|
||||
microblocksStreamed: string[];
|
||||
};
|
||||
|
||||
export type PgMicroblockNotificationPayload = {
|
||||
microblockHash: string;
|
||||
};
|
||||
|
||||
export type PgAddressNotificationPayload = {
|
||||
address: string;
|
||||
blockHeight: number;
|
||||
@@ -32,6 +36,7 @@ export type PgTokensNotificationPayload = {
|
||||
|
||||
export type PgNotificationPayload =
|
||||
| PgBlockNotificationPayload
|
||||
| PgMicroblockNotificationPayload
|
||||
| PgTxNotificationPayload
|
||||
| PgAddressNotificationPayload
|
||||
| PgTokenMetadataNotificationPayload
|
||||
@@ -71,6 +76,10 @@ export class PgNotifier {
|
||||
this.notify({ type: 'blockUpdate', payload: payload });
|
||||
}
|
||||
|
||||
public sendMicroblock(payload: PgMicroblockNotificationPayload) {
|
||||
this.notify({ type: 'microblockUpdate', payload: payload });
|
||||
}
|
||||
|
||||
public sendTx(payload: PgTxNotificationPayload) {
|
||||
this.notify({ type: 'txUpdate', payload: payload });
|
||||
}
|
||||
|
||||
@@ -98,6 +98,7 @@ import { ClarityAbi } from '@stacks/transactions';
|
||||
import {
|
||||
PgAddressNotificationPayload,
|
||||
PgBlockNotificationPayload,
|
||||
PgMicroblockNotificationPayload,
|
||||
PgNameNotificationPayload,
|
||||
PgNotifier,
|
||||
PgTokenMetadataNotificationPayload,
|
||||
@@ -604,6 +605,9 @@ export class PgDataStore
|
||||
block.microblocksStreamed
|
||||
);
|
||||
break;
|
||||
case 'microblockUpdate':
|
||||
const microblock = notification.payload as PgMicroblockNotificationPayload;
|
||||
this.emit('microblockUpdate', microblock.microblockHash);
|
||||
case 'txUpdate':
|
||||
const tx = notification.payload as PgTxNotificationPayload;
|
||||
this.emit('txUpdate', tx.txId);
|
||||
@@ -984,6 +988,9 @@ export class PgDataStore
|
||||
}
|
||||
|
||||
await this.insertMicroblockData(client, dbMicroblocks, txs);
|
||||
dbMicroblocks.forEach(microblock =>
|
||||
this.notifier?.sendMicroblock({ microblockHash: microblock.microblock_hash })
|
||||
);
|
||||
|
||||
// Find any microblocks that have been orphaned by this latest microblock chain tip.
|
||||
// This function also checks that each microblock parent hash points to an existing microblock in the db.
|
||||
|
||||
188
src/tests/socket-io-tests.ts
Normal file
188
src/tests/socket-io-tests.ts
Normal file
@@ -0,0 +1,188 @@
|
||||
import { io } from 'socket.io-client';
|
||||
import { ChainID } from '@stacks/common';
|
||||
import { PoolClient } from 'pg';
|
||||
import { ApiServer, startApiServer } from '../api/init';
|
||||
import { cycleMigrations, runMigrations, PgDataStore } from '../datastore/postgres-store';
|
||||
import { DbBlock, DbMicroblockPartial, DbTx, DbTxTypeId } from '../datastore/common';
|
||||
import { I32_MAX, waiter, Waiter } from '../helpers';
|
||||
import { Microblock } from '../../docs/generated';
|
||||
|
||||
describe('socket-io', () => {
|
||||
let apiServer: ApiServer;
|
||||
let db: PgDataStore;
|
||||
let dbClient: PoolClient;
|
||||
|
||||
beforeEach(async () => {
|
||||
process.env.PG_DATABASE = 'postgres';
|
||||
await cycleMigrations();
|
||||
db = await PgDataStore.connect();
|
||||
dbClient = await db.pool.connect();
|
||||
apiServer = await startApiServer({
|
||||
datastore: db,
|
||||
chainId: ChainID.Testnet,
|
||||
httpLogLevel: 'silly',
|
||||
});
|
||||
});
|
||||
|
||||
test('socket-io > microblock updates', async () => {
|
||||
const addr1 = 'ST28D4Q6RCQSJ6F7TEYWQDS4N1RXYEP9YBWMYSB97';
|
||||
const addr2 = 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6';
|
||||
const block: DbBlock = {
|
||||
block_hash: '0x1234',
|
||||
index_block_hash: '0xdeadbeef',
|
||||
parent_index_block_hash: '0x00',
|
||||
parent_block_hash: '0xff0011',
|
||||
parent_microblock_hash: '',
|
||||
block_height: 1,
|
||||
burn_block_time: 94869286,
|
||||
burn_block_hash: '0x1234',
|
||||
burn_block_height: 123,
|
||||
miner_txid: '0x4321',
|
||||
canonical: true,
|
||||
parent_microblock_sequence: 0,
|
||||
execution_cost_read_count: 0,
|
||||
execution_cost_read_length: 0,
|
||||
execution_cost_runtime: 0,
|
||||
execution_cost_write_count: 0,
|
||||
execution_cost_write_length: 0,
|
||||
};
|
||||
const tx1: DbTx = {
|
||||
tx_id: '0x01',
|
||||
tx_index: 0,
|
||||
anchor_mode: 3,
|
||||
nonce: 0,
|
||||
raw_tx: Buffer.alloc(0),
|
||||
index_block_hash: block.index_block_hash,
|
||||
block_hash: block.block_hash,
|
||||
block_height: block.block_height,
|
||||
burn_block_time: block.burn_block_time,
|
||||
parent_burn_block_time: 1626122935,
|
||||
type_id: DbTxTypeId.Coinbase,
|
||||
status: 1,
|
||||
raw_result: '0x0100000000000000000000000000000001', // u1
|
||||
canonical: true,
|
||||
post_conditions: Buffer.from([0x01, 0xf5]),
|
||||
fee_rate: 1234n,
|
||||
sponsored: false,
|
||||
sponsor_address: undefined,
|
||||
sender_address: addr1,
|
||||
origin_hash_mode: 1,
|
||||
coinbase_payload: Buffer.from('hi'),
|
||||
event_count: 1,
|
||||
parent_index_block_hash: '',
|
||||
parent_block_hash: '',
|
||||
microblock_canonical: true,
|
||||
microblock_sequence: I32_MAX,
|
||||
microblock_hash: '',
|
||||
execution_cost_read_count: 0,
|
||||
execution_cost_read_length: 0,
|
||||
execution_cost_runtime: 0,
|
||||
execution_cost_write_count: 0,
|
||||
execution_cost_write_length: 0,
|
||||
};
|
||||
const mb1: DbMicroblockPartial = {
|
||||
microblock_hash: '0xff01',
|
||||
microblock_sequence: 0,
|
||||
microblock_parent_hash: block.block_hash,
|
||||
parent_index_block_hash: block.index_block_hash,
|
||||
parent_burn_block_height: 123,
|
||||
parent_burn_block_hash: '0xaa',
|
||||
parent_burn_block_time: 1626122935,
|
||||
};
|
||||
const mbTx1: DbTx = {
|
||||
tx_id: '0x02',
|
||||
tx_index: 0,
|
||||
anchor_mode: 3,
|
||||
nonce: 0,
|
||||
raw_tx: Buffer.alloc(0),
|
||||
type_id: DbTxTypeId.TokenTransfer,
|
||||
status: 1,
|
||||
raw_result: '0x0100000000000000000000000000000001', // u1
|
||||
canonical: true,
|
||||
post_conditions: Buffer.from([0x01, 0xf5]),
|
||||
fee_rate: 1234n,
|
||||
sponsored: false,
|
||||
sender_address: addr1,
|
||||
sponsor_address: undefined,
|
||||
origin_hash_mode: 1,
|
||||
token_transfer_amount: 50n,
|
||||
token_transfer_memo: Buffer.from('hi'),
|
||||
token_transfer_recipient_address: addr2,
|
||||
event_count: 1,
|
||||
parent_index_block_hash: block.index_block_hash,
|
||||
parent_block_hash: block.block_hash,
|
||||
microblock_canonical: true,
|
||||
microblock_sequence: mb1.microblock_sequence,
|
||||
microblock_hash: mb1.microblock_hash,
|
||||
parent_burn_block_time: mb1.parent_burn_block_time,
|
||||
execution_cost_read_count: 0,
|
||||
execution_cost_read_length: 0,
|
||||
execution_cost_runtime: 0,
|
||||
execution_cost_write_count: 0,
|
||||
execution_cost_write_length: 0,
|
||||
index_block_hash: '',
|
||||
block_hash: '',
|
||||
burn_block_time: -1,
|
||||
block_height: -1,
|
||||
};
|
||||
|
||||
const address = apiServer.address;
|
||||
const socket = io(`http://${address}`, { query: { subscriptions: 'microblock' } });
|
||||
const updateWaiter: Waiter<Microblock> = waiter();
|
||||
|
||||
socket.on('microblock', microblock => {
|
||||
updateWaiter.finish(microblock);
|
||||
});
|
||||
await db.update({
|
||||
block: block,
|
||||
microblocks: [],
|
||||
minerRewards: [],
|
||||
txs: [
|
||||
{
|
||||
tx: tx1,
|
||||
stxLockEvents: [],
|
||||
stxEvents: [],
|
||||
ftEvents: [],
|
||||
nftEvents: [],
|
||||
contractLogEvents: [],
|
||||
smartContracts: [],
|
||||
names: [],
|
||||
namespaces: [],
|
||||
},
|
||||
],
|
||||
});
|
||||
await db.updateMicroblocks({
|
||||
microblocks: [mb1],
|
||||
txs: [
|
||||
{
|
||||
tx: mbTx1,
|
||||
stxLockEvents: [],
|
||||
stxEvents: [],
|
||||
ftEvents: [],
|
||||
nftEvents: [],
|
||||
contractLogEvents: [],
|
||||
smartContracts: [],
|
||||
names: [],
|
||||
namespaces: [],
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const result = await updateWaiter;
|
||||
try {
|
||||
expect(result.microblock_hash).toEqual('0xff01');
|
||||
expect(result.microblock_parent_hash).toEqual(block.block_hash);
|
||||
expect(result.txs[0]).toEqual(mbTx1.tx_id);
|
||||
} finally {
|
||||
socket.emit('unsubscribe', 'microblock');
|
||||
socket.close();
|
||||
}
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await apiServer.terminate();
|
||||
dbClient.release();
|
||||
await db?.close();
|
||||
await runMigrations(undefined, 'down');
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user