mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
feat: event-replay new_block events handling
This commit is contained in:
@@ -552,6 +552,109 @@ export class PgWriteStore extends PgStore {
|
||||
return result.count;
|
||||
}
|
||||
|
||||
async insertBlockBatch(sql: PgSqlClient, blocks: DbBlock[]) {
|
||||
const values: BlockInsertValues[] = blocks.map(block => ({
|
||||
block_hash: block.block_hash,
|
||||
index_block_hash: block.index_block_hash,
|
||||
parent_index_block_hash: block.parent_index_block_hash,
|
||||
parent_block_hash: block.parent_block_hash,
|
||||
parent_microblock_hash: block.parent_microblock_hash,
|
||||
parent_microblock_sequence: block.parent_microblock_sequence,
|
||||
block_height: block.block_height,
|
||||
burn_block_time: block.burn_block_time,
|
||||
burn_block_hash: block.burn_block_hash,
|
||||
burn_block_height: block.burn_block_height,
|
||||
miner_txid: block.miner_txid,
|
||||
canonical: block.canonical,
|
||||
execution_cost_read_count: block.execution_cost_read_count,
|
||||
execution_cost_read_length: block.execution_cost_read_length,
|
||||
execution_cost_runtime: block.execution_cost_runtime,
|
||||
execution_cost_write_count: block.execution_cost_write_count,
|
||||
execution_cost_write_length: block.execution_cost_write_length,
|
||||
}));
|
||||
await sql`
|
||||
INSERT INTO blocks ${sql(values)}
|
||||
`;
|
||||
}
|
||||
|
||||
async insertMicroblock(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise<void> {
|
||||
const values: MicroblockInsertValues[] = microblocks.map(mb => ({
|
||||
canonical: mb.canonical,
|
||||
microblock_canonical: mb.microblock_canonical,
|
||||
microblock_hash: mb.microblock_hash,
|
||||
microblock_sequence: mb.microblock_sequence,
|
||||
microblock_parent_hash: mb.microblock_parent_hash,
|
||||
parent_index_block_hash: mb.parent_index_block_hash,
|
||||
block_height: mb.block_height,
|
||||
parent_block_height: mb.parent_block_height,
|
||||
parent_block_hash: mb.parent_block_hash,
|
||||
index_block_hash: mb.index_block_hash,
|
||||
block_hash: mb.block_hash,
|
||||
parent_burn_block_height: mb.parent_burn_block_height,
|
||||
parent_burn_block_hash: mb.parent_burn_block_hash,
|
||||
parent_burn_block_time: mb.parent_burn_block_time,
|
||||
}));
|
||||
const mbResult = await sql`
|
||||
INSERT INTO microblocks ${sql(values)}
|
||||
`;
|
||||
if (mbResult.count !== microblocks.length) {
|
||||
throw new Error(
|
||||
`Unexpected row count after inserting microblocks: ${mbResult.count} vs ${values.length}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async insertTxBatch(sql: PgSqlClient, txs: DbTx[]): Promise<void> {
|
||||
const values: TxInsertValues[] = txs.map(tx => ({
|
||||
tx_id: tx.tx_id,
|
||||
raw_tx: tx.raw_result,
|
||||
tx_index: tx.tx_index,
|
||||
index_block_hash: tx.index_block_hash,
|
||||
parent_index_block_hash: tx.parent_index_block_hash,
|
||||
block_hash: tx.block_hash,
|
||||
parent_block_hash: tx.parent_block_hash,
|
||||
block_height: tx.block_height,
|
||||
burn_block_time: tx.burn_block_time,
|
||||
parent_burn_block_time: tx.parent_burn_block_time,
|
||||
type_id: tx.type_id,
|
||||
anchor_mode: tx.anchor_mode,
|
||||
status: tx.status,
|
||||
canonical: tx.canonical,
|
||||
post_conditions: tx.post_conditions,
|
||||
nonce: tx.nonce,
|
||||
fee_rate: tx.fee_rate,
|
||||
sponsored: tx.sponsored,
|
||||
sponsor_nonce: tx.sponsor_nonce ?? null,
|
||||
sponsor_address: tx.sponsor_address ?? null,
|
||||
sender_address: tx.sender_address,
|
||||
origin_hash_mode: tx.origin_hash_mode,
|
||||
microblock_canonical: tx.microblock_canonical,
|
||||
microblock_sequence: tx.microblock_sequence,
|
||||
microblock_hash: tx.microblock_hash,
|
||||
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
|
||||
token_transfer_amount: tx.token_transfer_amount ?? null,
|
||||
token_transfer_memo: tx.token_transfer_memo ?? null,
|
||||
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
|
||||
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
|
||||
smart_contract_source_code: tx.smart_contract_source_code ?? null,
|
||||
contract_call_contract_id: tx.contract_call_contract_id ?? null,
|
||||
contract_call_function_name: tx.contract_call_function_name ?? null,
|
||||
contract_call_function_args: tx.contract_call_function_args ?? null,
|
||||
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
|
||||
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
|
||||
coinbase_payload: tx.coinbase_payload ?? null,
|
||||
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
|
||||
raw_result: tx.raw_result,
|
||||
event_count: tx.event_count,
|
||||
execution_cost_read_count: tx.execution_cost_read_count,
|
||||
execution_cost_read_length: tx.execution_cost_read_length,
|
||||
execution_cost_runtime: tx.execution_cost_runtime,
|
||||
execution_cost_write_count: tx.execution_cost_write_count,
|
||||
execution_cost_write_length: tx.execution_cost_write_length,
|
||||
}));
|
||||
await sql`INSERT INTO txs ${sql(values)}`;
|
||||
}
|
||||
|
||||
async updateBurnchainRewardSlotHolders({
|
||||
burnchainBlockHash,
|
||||
burnchainBlockHeight,
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
import * as tty from 'tty';
|
||||
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
@@ -7,6 +6,7 @@ import { logger } from '../../logger';
|
||||
import { createTimeTracker } from './helpers';
|
||||
import { insertNewBurnBlockEvents } from './importers/new_burn_block_importer';
|
||||
import { insertNewBlockEvents } from './importers/new_block_importer';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
|
||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
||||
|
||||
@@ -20,6 +20,8 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) =>
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
const dataset = await DatasetStore.connect();
|
||||
|
||||
if (wipeDB) {
|
||||
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
|
||||
}
|
||||
@@ -56,8 +58,8 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) =>
|
||||
|
||||
try {
|
||||
await Promise.all([
|
||||
insertNewBurnBlockEvents(db, timeTracker),
|
||||
// insertNewBlockEvents(db, timeTracker)
|
||||
insertNewBurnBlockEvents(db, dataset, timeTracker),
|
||||
insertNewBlockEvents(db, dataset, timeTracker)
|
||||
]);
|
||||
} catch (err) {
|
||||
throw err;
|
||||
|
||||
144
src/event-replay/parquet-based/importers/new_block_importer.ts
Normal file
144
src/event-replay/parquet-based/importers/new_block_importer.ts
Normal file
@@ -0,0 +1,144 @@
|
||||
import { Readable, Writable, Transform } from 'stream';
|
||||
import { pipeline } from 'stream/promises';
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { parseNewBlockMessage } from '../../../event-stream/event-server';
|
||||
import { DbBlock, DbMicroblock, DbTx } from '../../../datastore/common';
|
||||
import { logger } from '../../../logger';
|
||||
import { TimeTracker } from '../helpers';
|
||||
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
|
||||
import { CoreNodeBlockMessage } from '../../../event-stream/core-node-message';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
|
||||
const batchInserters: BatchInserter[] = [];
|
||||
|
||||
const chainID = getApiConfiguredChainID();
|
||||
|
||||
interface BatchInserter<T = any> {
|
||||
push(entries: T[]): Promise<void>;
|
||||
flush(): Promise<void>;
|
||||
}
|
||||
|
||||
function createBatchInserter<T>({
|
||||
batchSize,
|
||||
insertFn,
|
||||
}: {
|
||||
batchSize: number;
|
||||
insertFn: (entries: T[]) => Promise<void>;
|
||||
}): BatchInserter<T> {
|
||||
let entryBuffer: T[] = [];
|
||||
return {
|
||||
async push(entries: T[]) {
|
||||
entries.length === 1
|
||||
? entryBuffer.push(entries[0])
|
||||
: entries.forEach(e => entryBuffer.push(e));
|
||||
if (entryBuffer.length === batchSize) {
|
||||
await insertFn(entryBuffer);
|
||||
entryBuffer.length = 0;
|
||||
} else if (entryBuffer.length > batchSize) {
|
||||
for (const batch of batchIterate(entryBuffer, batchSize)) {
|
||||
await insertFn(batch);
|
||||
}
|
||||
entryBuffer.length = 0;
|
||||
}
|
||||
},
|
||||
async flush() {
|
||||
logger.info('Flushing remaining data...');
|
||||
if (entryBuffer.length > 0) {
|
||||
await insertFn(entryBuffer);
|
||||
entryBuffer = [];
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const populateBatchInserters = async (db: PgWriteStore) => {
|
||||
const dbBlockBatchInserter = createBatchInserter<DbBlock>({
|
||||
batchSize: 100,
|
||||
insertFn: (entries) => {
|
||||
logger.info('Inserting blocks...');
|
||||
return db.insertBlockBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbBlockBatchInserter);
|
||||
|
||||
const dbMicroblockBatchInserter = createBatchInserter<DbMicroblock>({
|
||||
batchSize: 200,
|
||||
insertFn: (entries) => {
|
||||
logger.info('Inserting microblocks...');
|
||||
return db.insertMicroblock(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbMicroblockBatchInserter);
|
||||
|
||||
const dbTxBatchInserter = createBatchInserter<DbTx>({
|
||||
batchSize: 1000,
|
||||
insertFn: (entries) => {
|
||||
logger.info('Inserting txs...');
|
||||
return db.insertTxBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbTxBatchInserter);
|
||||
|
||||
return new Writable({
|
||||
objectMode: true,
|
||||
write: async (data: CoreNodeBlockMessage, _encoding, next) => {
|
||||
|
||||
let dbData;
|
||||
try {
|
||||
dbData = parseNewBlockMessage(chainID, data);
|
||||
} catch (err) {
|
||||
logger.error('Error when parsing new_block event');
|
||||
console.error(err);
|
||||
|
||||
throw err;
|
||||
}
|
||||
|
||||
const insertTxs = async (dbData: any) => {
|
||||
for (const entry of dbData.txs) {
|
||||
await dbTxBatchInserter.push([entry.tx]);
|
||||
}
|
||||
};
|
||||
|
||||
await Promise.all([
|
||||
// Insert blocks
|
||||
dbBlockBatchInserter.push([dbData.block]),
|
||||
// Insert microblocks
|
||||
dbMicroblockBatchInserter.push(dbData.microblocks),
|
||||
// Insert Txs
|
||||
insertTxs(dbData)
|
||||
]);
|
||||
|
||||
next();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const transformDataToJSON = async () => {
|
||||
return new Transform({
|
||||
objectMode: true,
|
||||
transform: async (data, _encoding, callback) => {
|
||||
callback(null, JSON.parse(data.payload));
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
export const insertNewBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, timeTracker: TimeTracker) => {
|
||||
logger.info(`Inserting NEW_BLOCK events to db...`);
|
||||
|
||||
await timeTracker.track('insertNewBlockEvents', async () => {
|
||||
const payload = await dataset.newBlockEventsOrderedPayloadStream();
|
||||
const toJSON = await transformDataToJSON();
|
||||
const insertBatchData = await populateBatchInserters(db);
|
||||
|
||||
await pipeline(
|
||||
Readable.from(payload),
|
||||
toJSON,
|
||||
insertBatchData
|
||||
.on('finish', async () => {
|
||||
for (const batchInserter of batchInserters) {
|
||||
await batchInserter.flush();
|
||||
}
|
||||
})
|
||||
)
|
||||
});
|
||||
};
|
||||
@@ -970,3 +970,108 @@ export async function startEventServer(opts: {
|
||||
});
|
||||
return eventStreamServer;
|
||||
}
|
||||
|
||||
export function parseNewBlockMessage(chainId: ChainID, msg: CoreNodeBlockMessage) {
|
||||
const parsedTxs: CoreNodeParsedTxMessage[] = [];
|
||||
const blockData: CoreNodeMsgBlockData = {
|
||||
...msg,
|
||||
};
|
||||
msg.transactions.forEach(item => {
|
||||
const parsedTx = parseMessageTransaction(chainId, item, blockData, msg.events);
|
||||
if (parsedTx) {
|
||||
parsedTxs.push(parsedTx);
|
||||
}
|
||||
});
|
||||
|
||||
// calculate total execution cost of the block
|
||||
const totalCost = msg.transactions.reduce(
|
||||
(prev, cur) => {
|
||||
return {
|
||||
execution_cost_read_count: prev.execution_cost_read_count + cur.execution_cost.read_count,
|
||||
execution_cost_read_length:
|
||||
prev.execution_cost_read_length + cur.execution_cost.read_length,
|
||||
execution_cost_runtime: prev.execution_cost_runtime + cur.execution_cost.runtime,
|
||||
execution_cost_write_count:
|
||||
prev.execution_cost_write_count + cur.execution_cost.write_count,
|
||||
execution_cost_write_length:
|
||||
prev.execution_cost_write_length + cur.execution_cost.write_length,
|
||||
};
|
||||
},
|
||||
{
|
||||
execution_cost_read_count: 0,
|
||||
execution_cost_read_length: 0,
|
||||
execution_cost_runtime: 0,
|
||||
execution_cost_write_count: 0,
|
||||
execution_cost_write_length: 0,
|
||||
}
|
||||
);
|
||||
|
||||
const dbBlock: DbBlock = {
|
||||
canonical: true,
|
||||
block_hash: msg.block_hash,
|
||||
index_block_hash: msg.index_block_hash,
|
||||
parent_index_block_hash: msg.parent_index_block_hash,
|
||||
parent_block_hash: msg.parent_block_hash,
|
||||
parent_microblock_hash: msg.parent_microblock,
|
||||
parent_microblock_sequence: msg.parent_microblock_sequence,
|
||||
block_height: msg.block_height,
|
||||
burn_block_time: msg.burn_block_time,
|
||||
burn_block_hash: msg.burn_block_hash,
|
||||
burn_block_height: msg.burn_block_height,
|
||||
miner_txid: msg.miner_txid,
|
||||
execution_cost_read_count: totalCost.execution_cost_read_count,
|
||||
execution_cost_read_length: totalCost.execution_cost_read_length,
|
||||
execution_cost_runtime: totalCost.execution_cost_runtime,
|
||||
execution_cost_write_count: totalCost.execution_cost_write_count,
|
||||
execution_cost_write_length: totalCost.execution_cost_write_length,
|
||||
};
|
||||
|
||||
const dbMinerRewards: DbMinerReward[] = [];
|
||||
for (const minerReward of msg.matured_miner_rewards) {
|
||||
const dbMinerReward: DbMinerReward = {
|
||||
canonical: true,
|
||||
block_hash: minerReward.from_stacks_block_hash,
|
||||
index_block_hash: msg.index_block_hash,
|
||||
from_index_block_hash: minerReward.from_index_consensus_hash,
|
||||
mature_block_height: msg.block_height,
|
||||
recipient: minerReward.recipient,
|
||||
miner_address: minerReward.miner_address ?? minerReward.recipient,
|
||||
coinbase_amount: BigInt(minerReward.coinbase_amount),
|
||||
tx_fees_anchored: BigInt(minerReward.tx_fees_anchored),
|
||||
tx_fees_streamed_confirmed: BigInt(minerReward.tx_fees_streamed_confirmed),
|
||||
tx_fees_streamed_produced: BigInt(minerReward.tx_fees_streamed_produced),
|
||||
};
|
||||
dbMinerRewards.push(dbMinerReward);
|
||||
}
|
||||
|
||||
const dbMicroblocks = parseMicroblocksFromTxs({
|
||||
parentIndexBlockHash: msg.parent_index_block_hash,
|
||||
txs: msg.transactions,
|
||||
parentBurnBlock: {
|
||||
height: msg.parent_burn_block_height,
|
||||
hash: msg.parent_burn_block_hash,
|
||||
time: msg.parent_burn_block_timestamp,
|
||||
},
|
||||
}).map(mb => {
|
||||
const microblock: DbMicroblock = {
|
||||
...mb,
|
||||
canonical: true,
|
||||
microblock_canonical: true,
|
||||
block_height: msg.block_height,
|
||||
parent_block_height: msg.block_height - 1,
|
||||
parent_block_hash: msg.parent_block_hash,
|
||||
index_block_hash: msg.index_block_hash,
|
||||
block_hash: msg.block_hash,
|
||||
};
|
||||
return microblock;
|
||||
});
|
||||
|
||||
const dbData: DataStoreBlockUpdateData = {
|
||||
block: dbBlock,
|
||||
microblocks: dbMicroblocks,
|
||||
minerRewards: dbMinerRewards,
|
||||
txs: parseDataStoreTxEventData(parsedTxs, msg.events, msg, chainId),
|
||||
};
|
||||
|
||||
return dbData;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user