From bb70ca99c07bf777557bca5e4b9924d104d8f7fd Mon Sep 17 00:00:00 2001 From: Chris Guimaraes Date: Tue, 25 Jul 2023 16:01:42 +0100 Subject: [PATCH] feat: better handling of raw events insertion --- src/datastore/common.ts | 5 + src/datastore/pg-write-store.ts | 24 ++- .../parquet-based/dataset/store.ts | 23 ++- .../parquet-based/gen-ids-file.ts | 41 ----- src/event-replay/parquet-based/helpers.ts | 75 ++++---- .../parquet-based/importers/raw-importer.ts | 12 +- .../parquet-based/replay-controller.ts | 167 +++++------------- .../{ => workers}/new-block-worker.ts | 10 +- 8 files changed, 139 insertions(+), 218 deletions(-) delete mode 100644 src/event-replay/parquet-based/gen-ids-file.ts rename src/event-replay/parquet-based/{ => workers}/new-block-worker.ts (79%) diff --git a/src/datastore/common.ts b/src/datastore/common.ts index f093119d..6d60242b 100644 --- a/src/datastore/common.ts +++ b/src/datastore/common.ts @@ -1558,3 +1558,8 @@ export interface DbChainTip { indexBlockHash: string; burnBlockHeight: number; } + +export enum IndexesState { + Off = 0, + On = 1, +} diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 8bb4cdba..6e3de113 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -64,6 +64,7 @@ import { DbChainTip, DbPox3Event, RawEventRequestInsertValues, + IndexesState, } from './common'; import { ClarityAbi } from '@stacks/transactions'; import { @@ -96,6 +97,8 @@ import { parseResolver, parseZoneFileTxt } from '../event-stream/bns/bns-helpers import { Pox2EventName } from '../pox-helpers'; import { logger } from '../logger'; +const MIGRATIONS_TABLE = 'pgmigrations'; + class MicroblockGapError extends Error { constructor(message: string) { super(message); @@ -3273,13 +3276,26 @@ export class PgWriteStore extends PgStore { } /** - * (event-replay) Enable or disable indexes for the provided set of tables. + * (event-replay) Enable or disable indexes for all DB tables. */ - async toggleTableIndexes(sql: PgSqlClient, tables: string[], enabled: boolean): Promise { - const tableSchema = this.sql.options.connection.search_path ?? 'public'; + async toggleAllTableIndexes(sql: PgSqlClient, state: IndexesState): Promise { + const enable: boolean = Boolean(state); + const dbName = sql.options.database; + const tableSchema = sql.options.connection.search_path ?? 'public'; + const tablesQuery = await sql<{ tablename: string }[]>` + SELECT tablename FROM pg_catalog.pg_tables + WHERE tablename != ${MIGRATIONS_TABLE} + AND schemaname = ${tableSchema}`; + if (tablesQuery.length === 0) { + const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`; + console.error(errorMsg); + throw new Error(errorMsg); + } + const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename); + const result = await sql` UPDATE pg_index - SET ${sql({ indisready: enabled, indisvalid: enabled })} + SET ${sql({ indisready: enable, indisvalid: enable })} WHERE indrelid = ANY ( SELECT oid FROM pg_class WHERE relname IN ${sql(tables)} diff --git a/src/event-replay/parquet-based/dataset/store.ts b/src/event-replay/parquet-based/dataset/store.ts index 26b0f663..73e59c9a 100644 --- a/src/event-replay/parquet-based/dataset/store.ts +++ b/src/event-replay/parquet-based/dataset/store.ts @@ -19,7 +19,7 @@ export class DatasetStore { const con = this.db.connect(); return new Promise(resolve => { con.all( - "SELECT ID FROM READ_PARQUET('events/new_block/canonical/*.parquet')", + "SELECT id FROM READ_PARQUET('events/new_block/canonical/*.parquet')", (err: any, result: any) => { if (err) { throw err; @@ -103,14 +103,25 @@ export class DatasetStore { // RAW EVENTS // - rawEventsStream = (): Promise => { + rawEvents = (): Promise => { return new Promise(resolve => { const con = this.db.connect(); - const res = con.stream( - `SELECT event, payload FROM READ_PARQUET('events/raw*.parquet') ORDER BY id` - ); + con.all( + `SELECT method, payload FROM READ_PARQUET([ + 'events/new_burn_block/canonical/*.parquet', + 'events/attachments/new/*.parquet', + 'events/new_microblocks/*.parquet', + 'events/drop_mempool_tx/*.parquet', + 'events/new_mempool_tx/*.parquet', + ]) ORDER BY id`, + (err: any, result: any) => { + if (err) { + throw err; + } - resolve(res); + resolve(result); + } + ); }); }; } diff --git a/src/event-replay/parquet-based/gen-ids-file.ts b/src/event-replay/parquet-based/gen-ids-file.ts deleted file mode 100644 index 6d61c3da..00000000 --- a/src/event-replay/parquet-based/gen-ids-file.ts +++ /dev/null @@ -1,41 +0,0 @@ -import * as fs from 'fs'; - -import { logger } from '../../logger'; -import { DatasetStore } from './dataset/store'; -import { splitIntoChunks } from './helpers'; - -(async () => { - const args = process.argv.slice(2); - const workers: number = Number(args[0].split('=')[1]); - - logger.info({ component: 'event-replay' }, `Generating ID files for ${workers} parallel workers`); - - const dir = './events/new_block'; - - const dataset = DatasetStore.connect(); - - const ids: number[] = await dataset.newBlockEventsIds(); - const batchSize = Math.ceil(ids.length / workers); - const chunks = splitIntoChunks(ids, batchSize); - - const files = fs.readdirSync(dir).filter(f => f.endsWith('txt')); - - // delete previous files - files.map(file => { - try { - fs.unlinkSync(`${dir}/${file}`); - } catch (err) { - throw err; - } - }); - - // create id files - chunks.forEach((chunk, idx) => { - const filename = `./events/new_block/ids_${idx + 1}.txt`; - chunk.forEach(id => { - fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' }); - }); - }); -})().catch(err => { - throw err; -}); diff --git a/src/event-replay/parquet-based/helpers.ts b/src/event-replay/parquet-based/helpers.ts index 48587d99..4814baa2 100644 --- a/src/event-replay/parquet-based/helpers.ts +++ b/src/event-replay/parquet-based/helpers.ts @@ -1,3 +1,8 @@ +import * as fs from 'fs'; + +import { logger } from '../../logger'; +import { DatasetStore } from './dataset/store'; + interface TimeTracker { track(name: string, fn: () => Promise): Promise; trackSync(name: string, fn: () => T): T; @@ -51,40 +56,6 @@ const createTimeTracker = (): TimeTracker => { }; }; -interface Stopwatch { - /** Milliseconds since stopwatch was created. */ - getElapsed: () => number; - /** Seconds since stopwatch was created. */ - getElapsedSeconds: (roundDecimals?: number) => number; - getElapsedAndRestart: () => number; - restart(): void; -} - -function stopwatch(): Stopwatch { - let start = process.hrtime.bigint(); - const result: Stopwatch = { - getElapsedSeconds: (roundDecimals?: number) => { - const elapsedMs = result.getElapsed(); - const seconds = elapsedMs / 1000; - return roundDecimals === undefined ? seconds : +seconds.toFixed(roundDecimals); - }, - getElapsed: () => { - const end = process.hrtime.bigint(); - return Number((end - start) / 1_000_000n); - }, - getElapsedAndRestart: () => { - const end = process.hrtime.bigint(); - const result = Number((end - start) / 1_000_000n); - start = process.hrtime.bigint(); - return result; - }, - restart: () => { - start = process.hrtime.bigint(); - }, - }; - return result; -} - function* chunks(arr: T[], n: number): Generator { for (let i = 0; i < arr.length; i += n) { yield arr.slice(i, i + n); @@ -95,4 +66,38 @@ const splitIntoChunks = (data: number[], chunk_size: number) => { return [...chunks(data, chunk_size)]; }; -export { createTimeTracker, splitIntoChunks }; +const genIdsFiles = async (dataset: DatasetStore) => { + const args = process.argv.slice(2); + const workers: number = Number(args[1].split('=')[1]); + + logger.info({ component: 'event-replay' }, `Generating ID files for ${workers} parallel workers`); + + const dir = './events/new_block'; + + const ids: number[] = await dataset.newBlockEventsIds(); + const batchSize = Math.ceil(ids.length / workers); + const chunks = splitIntoChunks(ids, batchSize); + + const files = fs.readdirSync(dir).filter(f => f.endsWith('txt')); + + // delete previous files + files.map(file => { + try { + fs.unlinkSync(`${dir}/${file}`); + } catch (err) { + throw err; + } + }); + + // create id files + chunks.forEach((chunk, idx) => { + const filename = `./events/new_block/ids_${idx + 1}.txt`; + chunk.forEach(id => { + fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' }); + }); + }); + + return fs.readdirSync(dir).filter(f => f.endsWith('txt')); +}; + +export { createTimeTracker, splitIntoChunks, genIdsFiles }; diff --git a/src/event-replay/parquet-based/importers/raw-importer.ts b/src/event-replay/parquet-based/importers/raw-importer.ts index 53d207e9..e931538d 100644 --- a/src/event-replay/parquet-based/importers/raw-importer.ts +++ b/src/event-replay/parquet-based/importers/raw-importer.ts @@ -50,13 +50,13 @@ function createBatchInserter({ const insertInBatch = (db: PgWriteStore) => { const dbRawEventBatchInserter = createBatchInserter({ - batchSize: 500, + batchSize: 200, insertFn: async entries => { logger.debug( { component: 'event-replay' }, 'Inserting into event_observer_requests table...' ); - return db.insertRawEventRequestBatch(db.sql, entries); + return await db.insertRawEventRequestBatch(db.sql, entries); }, }); batchInserters.push(dbRawEventBatchInserter); @@ -64,11 +64,7 @@ const insertInBatch = (db: PgWriteStore) => { return new Writable({ objectMode: true, write: async (data, _encoding, next) => { - const insertRawEvents = async (data: any) => { - await dbRawEventBatchInserter.push([{ event_path: data.event, payload: data.payload }]); - }; - - await insertRawEvents(data); + await dbRawEventBatchInserter.push([{ event_path: data.method, payload: data.payload }]); next(); }, @@ -78,7 +74,7 @@ const insertInBatch = (db: PgWriteStore) => { export const processRawEvents = async (db: PgWriteStore, dataset: DatasetStore) => { logger.info({ component: 'event-replay' }, 'RAW events process started'); - const payload = await dataset.rawEventsStream(); + const payload = await dataset.rawEvents(); const insert = insertInBatch(db); await pipeline( diff --git a/src/event-replay/parquet-based/replay-controller.ts b/src/event-replay/parquet-based/replay-controller.ts index d81f1928..9d3d7a70 100644 --- a/src/event-replay/parquet-based/replay-controller.ts +++ b/src/event-replay/parquet-based/replay-controller.ts @@ -1,35 +1,45 @@ import * as tty from 'tty'; -import * as fs from 'fs'; import { PgWriteStore } from '../../datastore/pg-write-store'; import { logger } from '../../logger'; -import { createTimeTracker } from './helpers'; +import { createTimeTracker, genIdsFiles } from './helpers'; import { processNewBurnBlockEvents } from './importers/new-burn-block-importer'; import { processAttachmentNewEvents } from './importers/attachment-new-importer'; import { processRawEvents } from './importers/raw-importer'; import { DatasetStore } from './dataset/store'; import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations'; -import { splitIntoChunks } from './helpers'; +import { IndexesState } from '../../datastore/common'; import * as _cluster from 'cluster'; const cluster = (_cluster as unknown) as _cluster.Cluster; // typings fix -const MIGRATIONS_TABLE = 'pgmigrations'; - -enum IndexesState { - On, - Off, -} - export class ReplayController { private readonly db; private readonly dataset; + /** + * + */ private constructor(db: PgWriteStore, dataset: DatasetStore) { this.db = db; this.dataset = dataset; } + /** + * + */ + static async init() { + const db = await PgWriteStore.connect({ + usageName: 'event-replay', + skipMigrations: true, + withNotifier: false, + isEventReplay: true, + }); + const dataset = DatasetStore.connect(); + + return new ReplayController(db, dataset); + } + /** * */ @@ -74,17 +84,39 @@ export class ReplayController { } }; + /** + * + */ + ingestRawEvents = async () => { + const timeTracker = createTimeTracker(); + + try { + await timeTracker.track('RAW_EVENTS', async () => { + await processRawEvents(this.db, this.dataset); + }); + } catch (err) { + throw err; + } finally { + if (true || tty.isatty(1)) { + console.log('Tracked function times:'); + console.table(timeTracker.getDurations(3)); + } else { + logger.info(`Tracked function times`, timeTracker.getDurations(3)); + } + } + }; + /** * */ ingestNewBlockEvents = (): Promise => { return new Promise(async resolve => { cluster.setupPrimary({ - exec: __dirname + '/new-block-worker', + exec: __dirname + '/workers/new-block-worker', }); let workersReady = 0; - const idFiles = await this.genIdsFiles(); + const idFiles = await genIdsFiles(this.dataset); for (const idFile of idFiles) { cluster.fork().send(idFile); workersReady++; @@ -114,53 +146,6 @@ export class ReplayController { }); }; - /** - * - */ - ingestRawEvents = async () => { - const timeTracker = createTimeTracker(); - - try { - await timeTracker.track('RAW_EVENTS', async () => { - await processRawEvents(this.db, this.dataset); - }); - } catch (err) { - throw err; - } finally { - if (true || tty.isatty(1)) { - console.log('Tracked function times:'); - console.table(timeTracker.getDurations(3)); - } else { - logger.info(`Tracked function times`, timeTracker.getDurations(3)); - } - } - }; - - /** - * - */ - private toggleIndexes = async (state: IndexesState) => { - const db = this.db; - const dbName = db.sql.options.database; - const tableSchema = db.sql.options.connection.search_path ?? 'public'; - const tablesQuery = await db.sql<{ tablename: string }[]>` - SELECT tablename FROM pg_catalog.pg_tables - WHERE tablename != ${MIGRATIONS_TABLE} - AND schemaname = ${tableSchema}`; - if (tablesQuery.length === 0) { - const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`; - console.error(errorMsg); - throw new Error(errorMsg); - } - const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename); - - if (state === IndexesState.Off) { - await db.toggleTableIndexes(db.sql, tables, false); - } else if (state == IndexesState.On) { - await db.toggleTableIndexes(db.sql, tables, true); - } - }; - /** * */ @@ -181,77 +166,20 @@ export class ReplayController { { component: 'event-replay' }, 'Disabling indexes and constraints to speed up insertion' ); - await this.toggleIndexes(IndexesState.Off); + await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.Off); }; - /** - * - */ - private genIdsFiles = async () => { - const args = process.argv.slice(2); - const workers: number = Number(args[1].split('=')[1]); - - logger.info( - { component: 'event-replay' }, - `Generating ID files for ${workers} parallel workers` - ); - - const dir = './events/new_block'; - - const ids: number[] = await this.dataset.newBlockEventsIds(); - const batchSize = Math.ceil(ids.length / workers); - const chunks = splitIntoChunks(ids, batchSize); - - const files = fs.readdirSync(dir).filter(f => f.endsWith('txt')); - - // delete previous files - files.map(file => { - try { - fs.unlinkSync(`${dir}/${file}`); - } catch (err) { - throw err; - } - }); - - // create id files - chunks.forEach((chunk, idx) => { - const filename = `./events/new_block/ids_${idx + 1}.txt`; - chunk.forEach(id => { - fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' }); - }); - }); - - return fs.readdirSync(dir).filter(f => f.endsWith('txt')); - }; - - /** - * - */ - static async init() { - const db = await PgWriteStore.connect({ - usageName: 'event-replay', - skipMigrations: true, - withNotifier: false, - isEventReplay: true, - }); - const dataset = DatasetStore.connect(); - - return new ReplayController(db, dataset); - } - /** * */ teardown = async () => { - const db = this.db; - // Re-enabling indexes logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables'); - await this.toggleIndexes(IndexesState.On); + await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On); // Refreshing materialized views logger.info({ component: 'event-replay' }, `Refreshing materialized views`); - await db.finishEventReplay(); + await this.db.finishEventReplay(); await this.db.close(); }; @@ -265,6 +193,7 @@ export class ReplayController { this.ingestAttachmentNewEvents(), this.ingestRawEvents(), ]); + await this.ingestNewBlockEvents(); }; } diff --git a/src/event-replay/parquet-based/new-block-worker.ts b/src/event-replay/parquet-based/workers/new-block-worker.ts similarity index 79% rename from src/event-replay/parquet-based/new-block-worker.ts rename to src/event-replay/parquet-based/workers/new-block-worker.ts index 3653a7ee..e7d7b485 100644 --- a/src/event-replay/parquet-based/new-block-worker.ts +++ b/src/event-replay/parquet-based/workers/new-block-worker.ts @@ -1,11 +1,11 @@ import * as fs from 'fs'; import * as tty from 'tty'; -import { processNewBlockEvents } from './importers/new-block-importer'; -import { PgWriteStore } from '../../datastore/pg-write-store'; -import { DatasetStore } from './dataset/store'; -import { logger } from '../../logger'; -import { createTimeTracker } from './helpers'; +import { processNewBlockEvents } from '../importers/new-block-importer'; +import { PgWriteStore } from '../../../datastore/pg-write-store'; +import { DatasetStore } from '../dataset/store'; +import { logger } from '../../../logger'; +import { createTimeTracker } from '../helpers'; const ingestNewBlock = async (idFile?: string) => { const db = await PgWriteStore.connect({