diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 49777719..fec68f4c 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -552,6 +552,109 @@ export class PgWriteStore extends PgStore { return result.count; } + async insertBlockBatch(sql: PgSqlClient, blocks: DbBlock[]) { + const values: BlockInsertValues[] = blocks.map(block => ({ + block_hash: block.block_hash, + index_block_hash: block.index_block_hash, + parent_index_block_hash: block.parent_index_block_hash, + parent_block_hash: block.parent_block_hash, + parent_microblock_hash: block.parent_microblock_hash, + parent_microblock_sequence: block.parent_microblock_sequence, + block_height: block.block_height, + burn_block_time: block.burn_block_time, + burn_block_hash: block.burn_block_hash, + burn_block_height: block.burn_block_height, + miner_txid: block.miner_txid, + canonical: block.canonical, + execution_cost_read_count: block.execution_cost_read_count, + execution_cost_read_length: block.execution_cost_read_length, + execution_cost_runtime: block.execution_cost_runtime, + execution_cost_write_count: block.execution_cost_write_count, + execution_cost_write_length: block.execution_cost_write_length, + })); + await sql` + INSERT INTO blocks ${sql(values)} + `; + } + + async insertMicroblock(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise { + const values: MicroblockInsertValues[] = microblocks.map(mb => ({ + canonical: mb.canonical, + microblock_canonical: mb.microblock_canonical, + microblock_hash: mb.microblock_hash, + microblock_sequence: mb.microblock_sequence, + microblock_parent_hash: mb.microblock_parent_hash, + parent_index_block_hash: mb.parent_index_block_hash, + block_height: mb.block_height, + parent_block_height: mb.parent_block_height, + parent_block_hash: mb.parent_block_hash, + index_block_hash: mb.index_block_hash, + block_hash: mb.block_hash, + parent_burn_block_height: mb.parent_burn_block_height, + parent_burn_block_hash: mb.parent_burn_block_hash, + parent_burn_block_time: mb.parent_burn_block_time, + })); + const mbResult = await sql` + INSERT INTO microblocks ${sql(values)} + `; + if (mbResult.count !== microblocks.length) { + throw new Error( + `Unexpected row count after inserting microblocks: ${mbResult.count} vs ${values.length}` + ); + } + } + + async insertTxBatch(sql: PgSqlClient, txs: DbTx[]): Promise { + const values: TxInsertValues[] = txs.map(tx => ({ + tx_id: tx.tx_id, + raw_tx: tx.raw_result, + tx_index: tx.tx_index, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + block_hash: tx.block_hash, + parent_block_hash: tx.parent_block_hash, + block_height: tx.block_height, + burn_block_time: tx.burn_block_time, + parent_burn_block_time: tx.parent_burn_block_time, + type_id: tx.type_id, + anchor_mode: tx.anchor_mode, + status: tx.status, + canonical: tx.canonical, + post_conditions: tx.post_conditions, + nonce: tx.nonce, + fee_rate: tx.fee_rate, + sponsored: tx.sponsored, + sponsor_nonce: tx.sponsor_nonce ?? null, + sponsor_address: tx.sponsor_address ?? null, + sender_address: tx.sender_address, + origin_hash_mode: tx.origin_hash_mode, + microblock_canonical: tx.microblock_canonical, + microblock_sequence: tx.microblock_sequence, + microblock_hash: tx.microblock_hash, + token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null, + token_transfer_amount: tx.token_transfer_amount ?? null, + token_transfer_memo: tx.token_transfer_memo ?? null, + smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null, + smart_contract_contract_id: tx.smart_contract_contract_id ?? null, + smart_contract_source_code: tx.smart_contract_source_code ?? null, + contract_call_contract_id: tx.contract_call_contract_id ?? null, + contract_call_function_name: tx.contract_call_function_name ?? null, + contract_call_function_args: tx.contract_call_function_args ?? null, + poison_microblock_header_1: tx.poison_microblock_header_1 ?? null, + poison_microblock_header_2: tx.poison_microblock_header_2 ?? null, + coinbase_payload: tx.coinbase_payload ?? null, + coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null, + raw_result: tx.raw_result, + event_count: tx.event_count, + execution_cost_read_count: tx.execution_cost_read_count, + execution_cost_read_length: tx.execution_cost_read_length, + execution_cost_runtime: tx.execution_cost_runtime, + execution_cost_write_count: tx.execution_cost_write_count, + execution_cost_write_length: tx.execution_cost_write_length, + })); + await sql`INSERT INTO txs ${sql(values)}`; + } + async updateBurnchainRewardSlotHolders({ burnchainBlockHash, burnchainBlockHeight, diff --git a/src/event-replay/parquet-based/event-replay.ts b/src/event-replay/parquet-based/event-replay.ts index 1bfdac1f..78a26916 100644 --- a/src/event-replay/parquet-based/event-replay.ts +++ b/src/event-replay/parquet-based/event-replay.ts @@ -1,4 +1,3 @@ - import * as tty from 'tty'; import { PgWriteStore } from '../../datastore/pg-write-store'; @@ -7,6 +6,7 @@ import { logger } from '../../logger'; import { createTimeTracker } from './helpers'; import { insertNewBurnBlockEvents } from './importers/new_burn_block_importer'; import { insertNewBlockEvents } from './importers/new_block_importer'; +import { DatasetStore } from './dataset/store'; const MIGRATIONS_TABLE = 'pgmigrations'; @@ -20,6 +20,8 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) => isEventReplay: true, }); + const dataset = await DatasetStore.connect(); + if (wipeDB) { await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' }); } @@ -56,8 +58,8 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) => try { await Promise.all([ - insertNewBurnBlockEvents(db, timeTracker), - // insertNewBlockEvents(db, timeTracker) + insertNewBurnBlockEvents(db, dataset, timeTracker), + insertNewBlockEvents(db, dataset, timeTracker) ]); } catch (err) { throw err; diff --git a/src/event-replay/parquet-based/importers/new_block_importer.ts b/src/event-replay/parquet-based/importers/new_block_importer.ts new file mode 100644 index 00000000..e2548d7b --- /dev/null +++ b/src/event-replay/parquet-based/importers/new_block_importer.ts @@ -0,0 +1,144 @@ +import { Readable, Writable, Transform } from 'stream'; +import { pipeline } from 'stream/promises'; +import { PgWriteStore } from '../../../datastore/pg-write-store'; +import { parseNewBlockMessage } from '../../../event-stream/event-server'; +import { DbBlock, DbMicroblock, DbTx } from '../../../datastore/common'; +import { logger } from '../../../logger'; +import { TimeTracker } from '../helpers'; +import { getApiConfiguredChainID, batchIterate } from '../../../helpers'; +import { CoreNodeBlockMessage } from '../../../event-stream/core-node-message'; +import { DatasetStore } from '../dataset/store'; + +const batchInserters: BatchInserter[] = []; + +const chainID = getApiConfiguredChainID(); + +interface BatchInserter { + push(entries: T[]): Promise; + flush(): Promise; +} + +function createBatchInserter({ + batchSize, + insertFn, +}: { + batchSize: number; + insertFn: (entries: T[]) => Promise; +}): BatchInserter { + let entryBuffer: T[] = []; + return { + async push(entries: T[]) { + entries.length === 1 + ? entryBuffer.push(entries[0]) + : entries.forEach(e => entryBuffer.push(e)); + if (entryBuffer.length === batchSize) { + await insertFn(entryBuffer); + entryBuffer.length = 0; + } else if (entryBuffer.length > batchSize) { + for (const batch of batchIterate(entryBuffer, batchSize)) { + await insertFn(batch); + } + entryBuffer.length = 0; + } + }, + async flush() { + logger.info('Flushing remaining data...'); + if (entryBuffer.length > 0) { + await insertFn(entryBuffer); + entryBuffer = []; + } + }, + }; +} + +const populateBatchInserters = async (db: PgWriteStore) => { + const dbBlockBatchInserter = createBatchInserter({ + batchSize: 100, + insertFn: (entries) => { + logger.info('Inserting blocks...'); + return db.insertBlockBatch(db.sql, entries); + }, + }); + batchInserters.push(dbBlockBatchInserter); + + const dbMicroblockBatchInserter = createBatchInserter({ + batchSize: 200, + insertFn: (entries) => { + logger.info('Inserting microblocks...'); + return db.insertMicroblock(db.sql, entries); + }, + }); + batchInserters.push(dbMicroblockBatchInserter); + + const dbTxBatchInserter = createBatchInserter({ + batchSize: 1000, + insertFn: (entries) => { + logger.info('Inserting txs...'); + return db.insertTxBatch(db.sql, entries); + }, + }); + batchInserters.push(dbTxBatchInserter); + + return new Writable({ + objectMode: true, + write: async (data: CoreNodeBlockMessage, _encoding, next) => { + + let dbData; + try { + dbData = parseNewBlockMessage(chainID, data); + } catch (err) { + logger.error('Error when parsing new_block event'); + console.error(err); + + throw err; + } + + const insertTxs = async (dbData: any) => { + for (const entry of dbData.txs) { + await dbTxBatchInserter.push([entry.tx]); + } + }; + + await Promise.all([ + // Insert blocks + dbBlockBatchInserter.push([dbData.block]), + // Insert microblocks + dbMicroblockBatchInserter.push(dbData.microblocks), + // Insert Txs + insertTxs(dbData) + ]); + + next(); + } + }); +} + +const transformDataToJSON = async () => { + return new Transform({ + objectMode: true, + transform: async (data, _encoding, callback) => { + callback(null, JSON.parse(data.payload)); + } + }); +}; + +export const insertNewBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, timeTracker: TimeTracker) => { + logger.info(`Inserting NEW_BLOCK events to db...`); + + await timeTracker.track('insertNewBlockEvents', async () => { + const payload = await dataset.newBlockEventsOrderedPayloadStream(); + const toJSON = await transformDataToJSON(); + const insertBatchData = await populateBatchInserters(db); + + await pipeline( + Readable.from(payload), + toJSON, + insertBatchData + .on('finish', async () => { + for (const batchInserter of batchInserters) { + await batchInserter.flush(); + } + }) + ) + }); +}; diff --git a/src/event-stream/event-server.ts b/src/event-stream/event-server.ts index 6eb837e6..46041f31 100644 --- a/src/event-stream/event-server.ts +++ b/src/event-stream/event-server.ts @@ -970,3 +970,108 @@ export async function startEventServer(opts: { }); return eventStreamServer; } + +export function parseNewBlockMessage(chainId: ChainID, msg: CoreNodeBlockMessage) { + const parsedTxs: CoreNodeParsedTxMessage[] = []; + const blockData: CoreNodeMsgBlockData = { + ...msg, + }; + msg.transactions.forEach(item => { + const parsedTx = parseMessageTransaction(chainId, item, blockData, msg.events); + if (parsedTx) { + parsedTxs.push(parsedTx); + } + }); + + // calculate total execution cost of the block + const totalCost = msg.transactions.reduce( + (prev, cur) => { + return { + execution_cost_read_count: prev.execution_cost_read_count + cur.execution_cost.read_count, + execution_cost_read_length: + prev.execution_cost_read_length + cur.execution_cost.read_length, + execution_cost_runtime: prev.execution_cost_runtime + cur.execution_cost.runtime, + execution_cost_write_count: + prev.execution_cost_write_count + cur.execution_cost.write_count, + execution_cost_write_length: + prev.execution_cost_write_length + cur.execution_cost.write_length, + }; + }, + { + execution_cost_read_count: 0, + execution_cost_read_length: 0, + execution_cost_runtime: 0, + execution_cost_write_count: 0, + execution_cost_write_length: 0, + } + ); + + const dbBlock: DbBlock = { + canonical: true, + block_hash: msg.block_hash, + index_block_hash: msg.index_block_hash, + parent_index_block_hash: msg.parent_index_block_hash, + parent_block_hash: msg.parent_block_hash, + parent_microblock_hash: msg.parent_microblock, + parent_microblock_sequence: msg.parent_microblock_sequence, + block_height: msg.block_height, + burn_block_time: msg.burn_block_time, + burn_block_hash: msg.burn_block_hash, + burn_block_height: msg.burn_block_height, + miner_txid: msg.miner_txid, + execution_cost_read_count: totalCost.execution_cost_read_count, + execution_cost_read_length: totalCost.execution_cost_read_length, + execution_cost_runtime: totalCost.execution_cost_runtime, + execution_cost_write_count: totalCost.execution_cost_write_count, + execution_cost_write_length: totalCost.execution_cost_write_length, + }; + + const dbMinerRewards: DbMinerReward[] = []; + for (const minerReward of msg.matured_miner_rewards) { + const dbMinerReward: DbMinerReward = { + canonical: true, + block_hash: minerReward.from_stacks_block_hash, + index_block_hash: msg.index_block_hash, + from_index_block_hash: minerReward.from_index_consensus_hash, + mature_block_height: msg.block_height, + recipient: minerReward.recipient, + miner_address: minerReward.miner_address ?? minerReward.recipient, + coinbase_amount: BigInt(minerReward.coinbase_amount), + tx_fees_anchored: BigInt(minerReward.tx_fees_anchored), + tx_fees_streamed_confirmed: BigInt(minerReward.tx_fees_streamed_confirmed), + tx_fees_streamed_produced: BigInt(minerReward.tx_fees_streamed_produced), + }; + dbMinerRewards.push(dbMinerReward); + } + + const dbMicroblocks = parseMicroblocksFromTxs({ + parentIndexBlockHash: msg.parent_index_block_hash, + txs: msg.transactions, + parentBurnBlock: { + height: msg.parent_burn_block_height, + hash: msg.parent_burn_block_hash, + time: msg.parent_burn_block_timestamp, + }, + }).map(mb => { + const microblock: DbMicroblock = { + ...mb, + canonical: true, + microblock_canonical: true, + block_height: msg.block_height, + parent_block_height: msg.block_height - 1, + parent_block_hash: msg.parent_block_hash, + index_block_hash: msg.index_block_hash, + block_hash: msg.block_hash, + }; + return microblock; + }); + + const dbData: DataStoreBlockUpdateData = { + block: dbBlock, + microblocks: dbMicroblocks, + minerRewards: dbMinerRewards, + txs: parseDataStoreTxEventData(parsedTxs, msg.events, msg, chainId), + }; + + return dbData; +}