feat: better handling of raw events insertion

This commit is contained in:
Chris Guimaraes
2023-07-25 16:01:42 +01:00
parent 81f43cf7c3
commit bb70ca99c0
8 changed files with 139 additions and 218 deletions

View File

@@ -1558,3 +1558,8 @@ export interface DbChainTip {
indexBlockHash: string;
burnBlockHeight: number;
}
export enum IndexesState {
Off = 0,
On = 1,
}

View File

@@ -64,6 +64,7 @@ import {
DbChainTip,
DbPox3Event,
RawEventRequestInsertValues,
IndexesState,
} from './common';
import { ClarityAbi } from '@stacks/transactions';
import {
@@ -96,6 +97,8 @@ import { parseResolver, parseZoneFileTxt } from '../event-stream/bns/bns-helpers
import { Pox2EventName } from '../pox-helpers';
import { logger } from '../logger';
const MIGRATIONS_TABLE = 'pgmigrations';
class MicroblockGapError extends Error {
constructor(message: string) {
super(message);
@@ -3273,13 +3276,26 @@ export class PgWriteStore extends PgStore {
}
/**
* (event-replay) Enable or disable indexes for the provided set of tables.
* (event-replay) Enable or disable indexes for all DB tables.
*/
async toggleTableIndexes(sql: PgSqlClient, tables: string[], enabled: boolean): Promise<void> {
const tableSchema = this.sql.options.connection.search_path ?? 'public';
async toggleAllTableIndexes(sql: PgSqlClient, state: IndexesState): Promise<void> {
const enable: boolean = Boolean(state);
const dbName = sql.options.database;
const tableSchema = sql.options.connection.search_path ?? 'public';
const tablesQuery = await sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
console.error(errorMsg);
throw new Error(errorMsg);
}
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
const result = await sql`
UPDATE pg_index
SET ${sql({ indisready: enabled, indisvalid: enabled })}
SET ${sql({ indisready: enable, indisvalid: enable })}
WHERE indrelid = ANY (
SELECT oid FROM pg_class
WHERE relname IN ${sql(tables)}

View File

@@ -19,7 +19,7 @@ export class DatasetStore {
const con = this.db.connect();
return new Promise(resolve => {
con.all(
"SELECT ID FROM READ_PARQUET('events/new_block/canonical/*.parquet')",
"SELECT id FROM READ_PARQUET('events/new_block/canonical/*.parquet')",
(err: any, result: any) => {
if (err) {
throw err;
@@ -103,14 +103,25 @@ export class DatasetStore {
// RAW EVENTS
//
rawEventsStream = (): Promise<QueryResult> => {
rawEvents = (): Promise<QueryResult> => {
return new Promise(resolve => {
const con = this.db.connect();
const res = con.stream(
`SELECT event, payload FROM READ_PARQUET('events/raw*.parquet') ORDER BY id`
);
con.all(
`SELECT method, payload FROM READ_PARQUET([
'events/new_burn_block/canonical/*.parquet',
'events/attachments/new/*.parquet',
'events/new_microblocks/*.parquet',
'events/drop_mempool_tx/*.parquet',
'events/new_mempool_tx/*.parquet',
]) ORDER BY id`,
(err: any, result: any) => {
if (err) {
throw err;
}
resolve(res);
resolve(result);
}
);
});
};
}

View File

@@ -1,41 +0,0 @@
import * as fs from 'fs';
import { logger } from '../../logger';
import { DatasetStore } from './dataset/store';
import { splitIntoChunks } from './helpers';
(async () => {
const args = process.argv.slice(2);
const workers: number = Number(args[0].split('=')[1]);
logger.info({ component: 'event-replay' }, `Generating ID files for ${workers} parallel workers`);
const dir = './events/new_block';
const dataset = DatasetStore.connect();
const ids: number[] = await dataset.newBlockEventsIds();
const batchSize = Math.ceil(ids.length / workers);
const chunks = splitIntoChunks(ids, batchSize);
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
// delete previous files
files.map(file => {
try {
fs.unlinkSync(`${dir}/${file}`);
} catch (err) {
throw err;
}
});
// create id files
chunks.forEach((chunk, idx) => {
const filename = `./events/new_block/ids_${idx + 1}.txt`;
chunk.forEach(id => {
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
});
});
})().catch(err => {
throw err;
});

View File

@@ -1,3 +1,8 @@
import * as fs from 'fs';
import { logger } from '../../logger';
import { DatasetStore } from './dataset/store';
interface TimeTracker {
track<T = void>(name: string, fn: () => Promise<T>): Promise<T>;
trackSync<T = void>(name: string, fn: () => T): T;
@@ -51,40 +56,6 @@ const createTimeTracker = (): TimeTracker => {
};
};
interface Stopwatch {
/** Milliseconds since stopwatch was created. */
getElapsed: () => number;
/** Seconds since stopwatch was created. */
getElapsedSeconds: (roundDecimals?: number) => number;
getElapsedAndRestart: () => number;
restart(): void;
}
function stopwatch(): Stopwatch {
let start = process.hrtime.bigint();
const result: Stopwatch = {
getElapsedSeconds: (roundDecimals?: number) => {
const elapsedMs = result.getElapsed();
const seconds = elapsedMs / 1000;
return roundDecimals === undefined ? seconds : +seconds.toFixed(roundDecimals);
},
getElapsed: () => {
const end = process.hrtime.bigint();
return Number((end - start) / 1_000_000n);
},
getElapsedAndRestart: () => {
const end = process.hrtime.bigint();
const result = Number((end - start) / 1_000_000n);
start = process.hrtime.bigint();
return result;
},
restart: () => {
start = process.hrtime.bigint();
},
};
return result;
}
function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
for (let i = 0; i < arr.length; i += n) {
yield arr.slice(i, i + n);
@@ -95,4 +66,38 @@ const splitIntoChunks = (data: number[], chunk_size: number) => {
return [...chunks(data, chunk_size)];
};
export { createTimeTracker, splitIntoChunks };
const genIdsFiles = async (dataset: DatasetStore) => {
const args = process.argv.slice(2);
const workers: number = Number(args[1].split('=')[1]);
logger.info({ component: 'event-replay' }, `Generating ID files for ${workers} parallel workers`);
const dir = './events/new_block';
const ids: number[] = await dataset.newBlockEventsIds();
const batchSize = Math.ceil(ids.length / workers);
const chunks = splitIntoChunks(ids, batchSize);
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
// delete previous files
files.map(file => {
try {
fs.unlinkSync(`${dir}/${file}`);
} catch (err) {
throw err;
}
});
// create id files
chunks.forEach((chunk, idx) => {
const filename = `./events/new_block/ids_${idx + 1}.txt`;
chunk.forEach(id => {
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
});
});
return fs.readdirSync(dir).filter(f => f.endsWith('txt'));
};
export { createTimeTracker, splitIntoChunks, genIdsFiles };

View File

@@ -50,13 +50,13 @@ function createBatchInserter<T>({
const insertInBatch = (db: PgWriteStore) => {
const dbRawEventBatchInserter = createBatchInserter<RawEventRequestInsertValues>({
batchSize: 500,
batchSize: 200,
insertFn: async entries => {
logger.debug(
{ component: 'event-replay' },
'Inserting into event_observer_requests table...'
);
return db.insertRawEventRequestBatch(db.sql, entries);
return await db.insertRawEventRequestBatch(db.sql, entries);
},
});
batchInserters.push(dbRawEventBatchInserter);
@@ -64,11 +64,7 @@ const insertInBatch = (db: PgWriteStore) => {
return new Writable({
objectMode: true,
write: async (data, _encoding, next) => {
const insertRawEvents = async (data: any) => {
await dbRawEventBatchInserter.push([{ event_path: data.event, payload: data.payload }]);
};
await insertRawEvents(data);
await dbRawEventBatchInserter.push([{ event_path: data.method, payload: data.payload }]);
next();
},
@@ -78,7 +74,7 @@ const insertInBatch = (db: PgWriteStore) => {
export const processRawEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
logger.info({ component: 'event-replay' }, 'RAW events process started');
const payload = await dataset.rawEventsStream();
const payload = await dataset.rawEvents();
const insert = insertInBatch(db);
await pipeline(

View File

@@ -1,35 +1,45 @@
import * as tty from 'tty';
import * as fs from 'fs';
import { PgWriteStore } from '../../datastore/pg-write-store';
import { logger } from '../../logger';
import { createTimeTracker } from './helpers';
import { createTimeTracker, genIdsFiles } from './helpers';
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
import { processRawEvents } from './importers/raw-importer';
import { DatasetStore } from './dataset/store';
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
import { splitIntoChunks } from './helpers';
import { IndexesState } from '../../datastore/common';
import * as _cluster from 'cluster';
const cluster = (_cluster as unknown) as _cluster.Cluster; // typings fix
const MIGRATIONS_TABLE = 'pgmigrations';
enum IndexesState {
On,
Off,
}
export class ReplayController {
private readonly db;
private readonly dataset;
/**
*
*/
private constructor(db: PgWriteStore, dataset: DatasetStore) {
this.db = db;
this.dataset = dataset;
}
/**
*
*/
static async init() {
const db = await PgWriteStore.connect({
usageName: 'event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = DatasetStore.connect();
return new ReplayController(db, dataset);
}
/**
*
*/
@@ -74,17 +84,39 @@ export class ReplayController {
}
};
/**
*
*/
ingestRawEvents = async () => {
const timeTracker = createTimeTracker();
try {
await timeTracker.track('RAW_EVENTS', async () => {
await processRawEvents(this.db, this.dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};
/**
*
*/
ingestNewBlockEvents = (): Promise<boolean> => {
return new Promise(async resolve => {
cluster.setupPrimary({
exec: __dirname + '/new-block-worker',
exec: __dirname + '/workers/new-block-worker',
});
let workersReady = 0;
const idFiles = await this.genIdsFiles();
const idFiles = await genIdsFiles(this.dataset);
for (const idFile of idFiles) {
cluster.fork().send(idFile);
workersReady++;
@@ -114,53 +146,6 @@ export class ReplayController {
});
};
/**
*
*/
ingestRawEvents = async () => {
const timeTracker = createTimeTracker();
try {
await timeTracker.track('RAW_EVENTS', async () => {
await processRawEvents(this.db, this.dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};
/**
*
*/
private toggleIndexes = async (state: IndexesState) => {
const db = this.db;
const dbName = db.sql.options.database;
const tableSchema = db.sql.options.connection.search_path ?? 'public';
const tablesQuery = await db.sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
console.error(errorMsg);
throw new Error(errorMsg);
}
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
if (state === IndexesState.Off) {
await db.toggleTableIndexes(db.sql, tables, false);
} else if (state == IndexesState.On) {
await db.toggleTableIndexes(db.sql, tables, true);
}
};
/**
*
*/
@@ -181,77 +166,20 @@ export class ReplayController {
{ component: 'event-replay' },
'Disabling indexes and constraints to speed up insertion'
);
await this.toggleIndexes(IndexesState.Off);
await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.Off);
};
/**
*
*/
private genIdsFiles = async () => {
const args = process.argv.slice(2);
const workers: number = Number(args[1].split('=')[1]);
logger.info(
{ component: 'event-replay' },
`Generating ID files for ${workers} parallel workers`
);
const dir = './events/new_block';
const ids: number[] = await this.dataset.newBlockEventsIds();
const batchSize = Math.ceil(ids.length / workers);
const chunks = splitIntoChunks(ids, batchSize);
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
// delete previous files
files.map(file => {
try {
fs.unlinkSync(`${dir}/${file}`);
} catch (err) {
throw err;
}
});
// create id files
chunks.forEach((chunk, idx) => {
const filename = `./events/new_block/ids_${idx + 1}.txt`;
chunk.forEach(id => {
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
});
});
return fs.readdirSync(dir).filter(f => f.endsWith('txt'));
};
/**
*
*/
static async init() {
const db = await PgWriteStore.connect({
usageName: 'event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = DatasetStore.connect();
return new ReplayController(db, dataset);
}
/**
*
*/
teardown = async () => {
const db = this.db;
// Re-enabling indexes
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
await this.toggleIndexes(IndexesState.On);
await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On);
// Refreshing materialized views
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
await db.finishEventReplay();
await this.db.finishEventReplay();
await this.db.close();
};
@@ -265,6 +193,7 @@ export class ReplayController {
this.ingestAttachmentNewEvents(),
this.ingestRawEvents(),
]);
await this.ingestNewBlockEvents();
};
}

View File

@@ -1,11 +1,11 @@
import * as fs from 'fs';
import * as tty from 'tty';
import { processNewBlockEvents } from './importers/new-block-importer';
import { PgWriteStore } from '../../datastore/pg-write-store';
import { DatasetStore } from './dataset/store';
import { logger } from '../../logger';
import { createTimeTracker } from './helpers';
import { processNewBlockEvents } from '../importers/new-block-importer';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { DatasetStore } from '../dataset/store';
import { logger } from '../../../logger';
import { createTimeTracker } from '../helpers';
const ingestNewBlock = async (idFile?: string) => {
const db = await PgWriteStore.connect({