From 3ede07f134ac121505ca00b5bab7dba93a3def17 Mon Sep 17 00:00:00 2001 From: Chris Guimaraes Date: Thu, 27 Jul 2023 20:49:28 +0100 Subject: [PATCH] feat: event-replay remainder events handling --- .../parquet-based/dataset/store.ts | 20 ++++++++ .../importers/attachment-new-importer.ts | 29 ++++++----- .../importers/remainder-importer.ts | 48 +++++++++++++++++++ .../parquet-based/replay-controller.ts | 39 ++++++++++++++- src/index.ts | 2 +- 5 files changed, 123 insertions(+), 15 deletions(-) create mode 100644 src/event-replay/parquet-based/importers/remainder-importer.ts diff --git a/src/event-replay/parquet-based/dataset/store.ts b/src/event-replay/parquet-based/dataset/store.ts index 20c79913..c5b894fe 100644 --- a/src/event-replay/parquet-based/dataset/store.ts +++ b/src/event-replay/parquet-based/dataset/store.ts @@ -135,4 +135,24 @@ export class DatasetStore { resolve(res); }); }; + + // + // REMAINDER EVENTS + // + + remainderEvents = (): Promise => { + return new Promise(resolve => { + const con = this.db.connect(); + con.all( + `SELECT method, payload FROM READ_PARQUET('events/remainder/*.parquet') ORDER BY id`, + (err: any, res: any) => { + if (err) { + throw err; + } + + resolve(res); + } + ); + }); + }; } diff --git a/src/event-replay/parquet-based/importers/attachment-new-importer.ts b/src/event-replay/parquet-based/importers/attachment-new-importer.ts index e65cd2d3..e94d971a 100644 --- a/src/event-replay/parquet-based/importers/attachment-new-importer.ts +++ b/src/event-replay/parquet-based/importers/attachment-new-importer.ts @@ -42,22 +42,25 @@ export const processAttachmentNewEvents = async (db: PgWriteStore, dataset: Data blockHeights.push(el.attachment!.blockHeight); } } - // get events from block heights - const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights); - for (const event of blockEvents) { - for (const ds of ary) { - if (ds.blockData?.index_block_hash === event.index_block_hash) { - const txs = JSON.parse(event.payload).transactions; - for (const tx of txs) { - if (ds.attachment!.txId === tx.txid) { - ds.blockData!.microblock_hash = tx.microblock_hash || ''; - ds.blockData!.microblock_sequence = tx.microblock_sequence || I32_MAX; + if (blockHeights.length > 0) { + // get events from block heights + const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights); + + for (const event of blockEvents) { + for (const ds of ary) { + if (ds.blockData?.index_block_hash === event.index_block_hash) { + const txs = JSON.parse(event.payload).transactions; + for (const tx of txs) { + if (ds.attachment!.txId === tx.txid) { + ds.blockData!.microblock_hash = tx.microblock_hash || ''; + ds.blockData!.microblock_sequence = tx.microblock_sequence || I32_MAX; + } } - } - ds.blockData!.index_block_hash = event.index_block_hash; - ds.blockData!.parent_index_block_hash = event.parent_index_block_hash; + ds.blockData!.index_block_hash = event.index_block_hash; + ds.blockData!.parent_index_block_hash = event.parent_index_block_hash; + } } } } diff --git a/src/event-replay/parquet-based/importers/remainder-importer.ts b/src/event-replay/parquet-based/importers/remainder-importer.ts new file mode 100644 index 00000000..b13f9010 --- /dev/null +++ b/src/event-replay/parquet-based/importers/remainder-importer.ts @@ -0,0 +1,48 @@ +import { Readable, Writable } from 'stream'; +import { pipeline } from 'stream/promises'; +import { PgWriteStore } from '../../../datastore/pg-write-store'; +import { logger } from '../../../logger'; +import { DatasetStore } from '../dataset/store'; +import { EventStreamServer, startEventServer } from '../../../event-stream/event-server'; +import { getApiConfiguredChainID, httpPostRequest } from '../../../helpers'; + +const chainID = getApiConfiguredChainID(); + +const processRequests = (eventServer: EventStreamServer) => { + return new Writable({ + objectMode: true, + write: async (data, _encoding, next) => { + await httpPostRequest({ + host: '127.0.0.1', + port: eventServer.serverAddress.port, + path: data.method, + headers: { 'Content-Type': 'application/json' }, + body: data.payload, + throwOnNotOK: true, + }); + + next(); + }, + }); +}; + +export const processRemainderEvents = async (db: PgWriteStore, dataset: DatasetStore) => { + logger.info({ component: 'event-replay' }, 'REMAINDER events processing started'); + + const eventServer = await startEventServer({ + datastore: db, + chainId: chainID, + serverHost: '127.0.0.1', + serverPort: 0, + }); + + const eventStream = await dataset.remainderEvents(); + const process = processRequests(eventServer); + + await pipeline( + Readable.from(eventStream), + process.on('finish', async () => { + await eventServer.closeAsync(); + }) + ); +}; diff --git a/src/event-replay/parquet-based/replay-controller.ts b/src/event-replay/parquet-based/replay-controller.ts index edd7f825..5e22939f 100644 --- a/src/event-replay/parquet-based/replay-controller.ts +++ b/src/event-replay/parquet-based/replay-controller.ts @@ -6,6 +6,7 @@ import { createTimeTracker, genIdsFiles } from './helpers'; import { processNewBurnBlockEvents } from './importers/new-burn-block-importer'; import { processAttachmentNewEvents } from './importers/attachment-new-importer'; import { processRawEvents } from './importers/raw-importer'; +import { processRemainderEvents } from './importers/remainder-importer'; import { DatasetStore } from './dataset/store'; import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations'; import { IndexesState } from '../../datastore/common'; @@ -183,6 +184,28 @@ export class ReplayController { }); }; + /** + * + */ + private ingestRemainderEvents = async () => { + const timeTracker = createTimeTracker(); + + try { + await timeTracker.track('REMAINDER_EVENTS', async () => { + await processRemainderEvents(this.db, this.dataset); + }); + } catch (err) { + throw err; + } finally { + if (true || tty.isatty(1)) { + console.log('Tracked function times:'); + console.table(timeTracker.getDurations(3)); + } else { + logger.info(`Tracked function times`, timeTracker.getDurations(3)); + } + } + }; + /** * */ @@ -209,24 +232,38 @@ export class ReplayController { /** * */ - teardown = async () => { + finalize = async () => { // Re-enabling indexes logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables'); await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On); + // to be replayed with regular HTTP POSTs + await this.ingestRemainderEvents(); + // Refreshing materialized views logger.info({ component: 'event-replay' }, `Refreshing materialized views`); await this.db.finishEventReplay(); + // Close DB + logger.info({ component: 'event-replay' }, 'Closing DB connection'); await this.db.close(); + + // Exit with success + logger.info({ component: 'event-replay' }, 'Finishing event-replay with success'); + process.exit(0); }; /** * */ do = async () => { + // NEW_BURN_BLOCK and ATTACHMENTS/NEW events await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]); + + // RAW events to event_observer_requests table await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]); + + // NEW_BLOCK events await this.ingestNewBlockEvents(); }; } diff --git a/src/index.ts b/src/index.ts index 84b3839a..83516e23 100644 --- a/src/index.ts +++ b/src/index.ts @@ -304,7 +304,7 @@ async function handleProgramArgs() { const replay = await ReplayController.init(); await replay.prepare(); await replay.do(); - await replay.teardown(); + await replay.finalize(); } else if (parsedOpts._[0]) { throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`); } else {