diff --git a/src/event-replay/parquet-based/dataset/store.ts b/src/event-replay/parquet-based/dataset/store.ts index 73e59c9a..20c79913 100644 --- a/src/event-replay/parquet-based/dataset/store.ts +++ b/src/event-replay/parquet-based/dataset/store.ts @@ -124,4 +124,15 @@ export class DatasetStore { ); }); }; + + rawEventsByIds = (ids: number[]): Promise => { + return new Promise(resolve => { + const con = this.db.connect(); + const res = con.stream( + `SELECT method, payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id` + ); + + resolve(res); + }); + }; } diff --git a/src/event-replay/parquet-based/importers/raw-importer.ts b/src/event-replay/parquet-based/importers/raw-importer.ts index e931538d..17de87e3 100644 --- a/src/event-replay/parquet-based/importers/raw-importer.ts +++ b/src/event-replay/parquet-based/importers/raw-importer.ts @@ -86,3 +86,23 @@ export const processRawEvents = async (db: PgWriteStore, dataset: DatasetStore) }) ); }; + +export const processRawEventsInParallel = async ( + db: PgWriteStore, + dataset: DatasetStore, + ids: any +) => { + logger.info({ component: 'event-replay' }, 'RAW events parallel processing started'); + + const payload = await dataset.rawEventsByIds(ids); + const insert = insertInBatch(db); + + await pipeline( + Readable.from(payload), + insert.on('finish', async () => { + for (const batchInserter of batchInserters) { + await batchInserter.flush(); + } + }) + ); +}; diff --git a/src/event-replay/parquet-based/replay-controller.ts b/src/event-replay/parquet-based/replay-controller.ts index 9d3d7a70..edd7f825 100644 --- a/src/event-replay/parquet-based/replay-controller.ts +++ b/src/event-replay/parquet-based/replay-controller.ts @@ -106,6 +106,43 @@ export class ReplayController { } }; + ingestRawNewBlockEvents = async () => { + return new Promise(async resolve => { + cluster.setupPrimary({ + exec: __dirname + '/workers/raw-worker', + }); + + let workersReady = 0; + const idFiles = await genIdsFiles(this.dataset); + for (const idFile of idFiles) { + cluster.fork().send(idFile); + workersReady++; + } + + for (const id in cluster.workers) { + const worker: _cluster.Worker | undefined = cluster.workers[id]; + worker?.on('message', (msg, _handle) => { + switch (msg.msgType) { + case 'FINISH': + logger.info({ component: 'event-replay' }, `${msg.msg}`); + workersReady--; + worker.disconnect(); + break; + default: + // default action + break; + } + }); + + worker?.on('disconnect', () => { + if (workersReady === 0) { + resolve(true); + } + }); + } + }); + }; + /** * */ @@ -188,12 +225,8 @@ export class ReplayController { * */ do = async () => { - await Promise.all([ - this.ingestNewBurnBlockEvents(), - this.ingestAttachmentNewEvents(), - this.ingestRawEvents(), - ]); - + await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]); + await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]); await this.ingestNewBlockEvents(); }; } diff --git a/src/event-replay/parquet-based/workers/raw-worker.ts b/src/event-replay/parquet-based/workers/raw-worker.ts new file mode 100644 index 00000000..04fb370b --- /dev/null +++ b/src/event-replay/parquet-based/workers/raw-worker.ts @@ -0,0 +1,49 @@ +import * as fs from 'fs'; +import * as tty from 'tty'; + +import { PgWriteStore } from '../../../datastore/pg-write-store'; +import { DatasetStore } from '../dataset/store'; +import { logger } from '../../../logger'; +import { createTimeTracker } from '../helpers'; +import { processRawEventsInParallel } from '../importers/raw-importer'; + +const ingestNewBlock = async (idFile?: string) => { + const db = await PgWriteStore.connect({ + usageName: `${idFile}`, + skipMigrations: true, + withNotifier: false, + isEventReplay: true, + }); + const dataset = DatasetStore.connect(); + + const timeTracker = createTimeTracker(); + + const dir = './events/new_block'; + + try { + const idsFileContent = fs.readFileSync(`${dir}/${idFile}`, 'utf-8'); + const ids = idsFileContent.split(/\r?\n/); + + await timeTracker.track('RAW_EVENTS_PARALLEL', async () => { + await processRawEventsInParallel(db, dataset, ids); + }); + + // notify parent + process.send?.({ + msgType: 'FINISH', + msg: 'Worker has finished', + }); + } catch (err) { + throw err; + } finally { + if (true || tty.isatty(1)) { + console.table(timeTracker.getDurations(3)); + } else { + logger.info(`Tracked function times`, timeTracker.getDurations(3)); + } + } +}; + +process.on('message', async (msg: string) => { + await ingestNewBlock(msg); +});