feat: add microblock update support to socket-io

* feat: add microblockUpdate to socket topics

* chore: start adding tests

* chore: finish socket-io test
This commit is contained in:
Rafael Cárdenas
2021-10-05 09:15:50 -05:00
committed by GitHub
parent 13c33e5475
commit 204d7979a9
7 changed files with 239 additions and 3 deletions

View File

@@ -78,6 +78,14 @@ export class StacksApiSocketClient {
this.handleSubscription('block', false);
}
subscribeMicroblocks() {
return this.handleSubscription('microblock', true);
}
unsubscribeMicroblocks() {
this.handleSubscription('microblock', false);
}
subscribeMempool() {
return this.handleSubscription('mempool', true);
}
@@ -107,6 +115,7 @@ export class StacksApiSocketClient {
this.socket.on('disconnect', reason => console.warn('disconnected', reason));
this.socket.on('connect_error', error => console.error('connect_error', error));
this.socket.on('block', block => console.log('block', block));
this.socket.on('microblock', microblock => console.log('microblock', microblock));
this.socket.on('mempool', tx => console.log('mempool', tx));
this.socket.on('address-transaction', (address, data) =>
console.log('address-transaction', address, data)

View File

@@ -1,8 +1,8 @@
import type { AddressStxBalanceResponse, AddressTransactionWithTransfers, Block, MempoolTransaction } from '..';
import type { AddressStxBalanceResponse, AddressTransactionWithTransfers, Block, Microblock, MempoolTransaction } from '..';
export type AddressTransactionTopic = `address-transaction:${string}`;
export type AddressStxBalanceTopic = `address-stx-balance:${string}`;
export type Topic = 'block' | 'mempool' | AddressTransactionTopic | AddressStxBalanceTopic;
export type Topic = 'block' | 'microblock' | 'mempool' | AddressTransactionTopic | AddressStxBalanceTopic;
export interface ClientToServerMessages {
subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void;
@@ -11,6 +11,7 @@ export interface ClientToServerMessages {
export interface ServerToClientMessages {
block: (block: Block) => void;
microblock: (microblock: Microblock) => void;
mempool: (transaction: MempoolTransaction) => void;
// @ts-ignore scheduled for support in TS v4.3 https://github.com/microsoft/TypeScript/pull/26797

View File

@@ -11,7 +11,12 @@ import {
Topic,
ServerToClientMessages,
} from '@stacks/stacks-blockchain-api-types';
import { parseDbBlock, parseDbMempoolTx, parseDbTx } from '../controllers/db-controller';
import {
getMicroblockFromDataStore,
parseDbBlock,
parseDbMempoolTx,
parseDbTx,
} from '../controllers/db-controller';
import { isProdEnv, logError, logger } from '../../helpers';
interface SocketIOMetrics {
@@ -145,6 +150,22 @@ export function createSocketIORouter(db: DataStore, server: http.Server) {
}
});
db.on('microblockUpdate', async microblockHash => {
const microblockTopic: Topic = 'microblock';
if (adapter.rooms.has(microblockTopic)) {
const microblockQuery = await getMicroblockFromDataStore({
db: db,
microblockHash: microblockHash,
});
if (!microblockQuery.found) {
return;
}
const microblock = microblockQuery.result;
prometheus?.sendEvent('microblock');
io.to(microblockTopic).emit('microblock', microblock);
}
});
db.on('txUpdate', async txId => {
// Only parse and emit data if there are currently subscriptions to the mempool topic
const mempoolTopic: Topic = 'mempool';

View File

@@ -364,6 +364,7 @@ export type DataStoreEventEmitter = StrictEventEmitter<
microblocksAccepted: string[],
microblocksStreamed: string[]
) => void;
microblockUpdate: (microblockHash: string) => void;
addressUpdate: (address: string, blockHeight: number) => void;
nameUpdate: (info: string) => void;
tokensUpdate: (contractID: string) => void;

View File

@@ -13,6 +13,10 @@ export type PgBlockNotificationPayload = {
microblocksStreamed: string[];
};
export type PgMicroblockNotificationPayload = {
microblockHash: string;
};
export type PgAddressNotificationPayload = {
address: string;
blockHeight: number;
@@ -32,6 +36,7 @@ export type PgTokensNotificationPayload = {
export type PgNotificationPayload =
| PgBlockNotificationPayload
| PgMicroblockNotificationPayload
| PgTxNotificationPayload
| PgAddressNotificationPayload
| PgTokenMetadataNotificationPayload
@@ -71,6 +76,10 @@ export class PgNotifier {
this.notify({ type: 'blockUpdate', payload: payload });
}
public sendMicroblock(payload: PgMicroblockNotificationPayload) {
this.notify({ type: 'microblockUpdate', payload: payload });
}
public sendTx(payload: PgTxNotificationPayload) {
this.notify({ type: 'txUpdate', payload: payload });
}

View File

@@ -98,6 +98,7 @@ import { ClarityAbi } from '@stacks/transactions';
import {
PgAddressNotificationPayload,
PgBlockNotificationPayload,
PgMicroblockNotificationPayload,
PgNameNotificationPayload,
PgNotifier,
PgTokenMetadataNotificationPayload,
@@ -604,6 +605,9 @@ export class PgDataStore
block.microblocksStreamed
);
break;
case 'microblockUpdate':
const microblock = notification.payload as PgMicroblockNotificationPayload;
this.emit('microblockUpdate', microblock.microblockHash);
case 'txUpdate':
const tx = notification.payload as PgTxNotificationPayload;
this.emit('txUpdate', tx.txId);
@@ -984,6 +988,9 @@ export class PgDataStore
}
await this.insertMicroblockData(client, dbMicroblocks, txs);
dbMicroblocks.forEach(microblock =>
this.notifier?.sendMicroblock({ microblockHash: microblock.microblock_hash })
);
// Find any microblocks that have been orphaned by this latest microblock chain tip.
// This function also checks that each microblock parent hash points to an existing microblock in the db.

View File

@@ -0,0 +1,188 @@
import { io } from 'socket.io-client';
import { ChainID } from '@stacks/common';
import { PoolClient } from 'pg';
import { ApiServer, startApiServer } from '../api/init';
import { cycleMigrations, runMigrations, PgDataStore } from '../datastore/postgres-store';
import { DbBlock, DbMicroblockPartial, DbTx, DbTxTypeId } from '../datastore/common';
import { I32_MAX, waiter, Waiter } from '../helpers';
import { Microblock } from '../../docs/generated';
describe('socket-io', () => {
let apiServer: ApiServer;
let db: PgDataStore;
let dbClient: PoolClient;
beforeEach(async () => {
process.env.PG_DATABASE = 'postgres';
await cycleMigrations();
db = await PgDataStore.connect();
dbClient = await db.pool.connect();
apiServer = await startApiServer({
datastore: db,
chainId: ChainID.Testnet,
httpLogLevel: 'silly',
});
});
test('socket-io > microblock updates', async () => {
const addr1 = 'ST28D4Q6RCQSJ6F7TEYWQDS4N1RXYEP9YBWMYSB97';
const addr2 = 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6';
const block: DbBlock = {
block_hash: '0x1234',
index_block_hash: '0xdeadbeef',
parent_index_block_hash: '0x00',
parent_block_hash: '0xff0011',
parent_microblock_hash: '',
block_height: 1,
burn_block_time: 94869286,
burn_block_hash: '0x1234',
burn_block_height: 123,
miner_txid: '0x4321',
canonical: true,
parent_microblock_sequence: 0,
execution_cost_read_count: 0,
execution_cost_read_length: 0,
execution_cost_runtime: 0,
execution_cost_write_count: 0,
execution_cost_write_length: 0,
};
const tx1: DbTx = {
tx_id: '0x01',
tx_index: 0,
anchor_mode: 3,
nonce: 0,
raw_tx: Buffer.alloc(0),
index_block_hash: block.index_block_hash,
block_hash: block.block_hash,
block_height: block.block_height,
burn_block_time: block.burn_block_time,
parent_burn_block_time: 1626122935,
type_id: DbTxTypeId.Coinbase,
status: 1,
raw_result: '0x0100000000000000000000000000000001', // u1
canonical: true,
post_conditions: Buffer.from([0x01, 0xf5]),
fee_rate: 1234n,
sponsored: false,
sponsor_address: undefined,
sender_address: addr1,
origin_hash_mode: 1,
coinbase_payload: Buffer.from('hi'),
event_count: 1,
parent_index_block_hash: '',
parent_block_hash: '',
microblock_canonical: true,
microblock_sequence: I32_MAX,
microblock_hash: '',
execution_cost_read_count: 0,
execution_cost_read_length: 0,
execution_cost_runtime: 0,
execution_cost_write_count: 0,
execution_cost_write_length: 0,
};
const mb1: DbMicroblockPartial = {
microblock_hash: '0xff01',
microblock_sequence: 0,
microblock_parent_hash: block.block_hash,
parent_index_block_hash: block.index_block_hash,
parent_burn_block_height: 123,
parent_burn_block_hash: '0xaa',
parent_burn_block_time: 1626122935,
};
const mbTx1: DbTx = {
tx_id: '0x02',
tx_index: 0,
anchor_mode: 3,
nonce: 0,
raw_tx: Buffer.alloc(0),
type_id: DbTxTypeId.TokenTransfer,
status: 1,
raw_result: '0x0100000000000000000000000000000001', // u1
canonical: true,
post_conditions: Buffer.from([0x01, 0xf5]),
fee_rate: 1234n,
sponsored: false,
sender_address: addr1,
sponsor_address: undefined,
origin_hash_mode: 1,
token_transfer_amount: 50n,
token_transfer_memo: Buffer.from('hi'),
token_transfer_recipient_address: addr2,
event_count: 1,
parent_index_block_hash: block.index_block_hash,
parent_block_hash: block.block_hash,
microblock_canonical: true,
microblock_sequence: mb1.microblock_sequence,
microblock_hash: mb1.microblock_hash,
parent_burn_block_time: mb1.parent_burn_block_time,
execution_cost_read_count: 0,
execution_cost_read_length: 0,
execution_cost_runtime: 0,
execution_cost_write_count: 0,
execution_cost_write_length: 0,
index_block_hash: '',
block_hash: '',
burn_block_time: -1,
block_height: -1,
};
const address = apiServer.address;
const socket = io(`http://${address}`, { query: { subscriptions: 'microblock' } });
const updateWaiter: Waiter<Microblock> = waiter();
socket.on('microblock', microblock => {
updateWaiter.finish(microblock);
});
await db.update({
block: block,
microblocks: [],
minerRewards: [],
txs: [
{
tx: tx1,
stxLockEvents: [],
stxEvents: [],
ftEvents: [],
nftEvents: [],
contractLogEvents: [],
smartContracts: [],
names: [],
namespaces: [],
},
],
});
await db.updateMicroblocks({
microblocks: [mb1],
txs: [
{
tx: mbTx1,
stxLockEvents: [],
stxEvents: [],
ftEvents: [],
nftEvents: [],
contractLogEvents: [],
smartContracts: [],
names: [],
namespaces: [],
},
],
});
const result = await updateWaiter;
try {
expect(result.microblock_hash).toEqual('0xff01');
expect(result.microblock_parent_hash).toEqual(block.block_hash);
expect(result.txs[0]).toEqual(mbTx1.tx_id);
} finally {
socket.emit('unsubscribe', 'microblock');
socket.close();
}
});
afterEach(async () => {
await apiServer.terminate();
dbClient.release();
await db?.close();
await runMigrations(undefined, 'down');
});
});