mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
feat: better handling of raw events insertion
This commit is contained in:
@@ -1558,3 +1558,8 @@ export interface DbChainTip {
|
||||
indexBlockHash: string;
|
||||
burnBlockHeight: number;
|
||||
}
|
||||
|
||||
export enum IndexesState {
|
||||
Off = 0,
|
||||
On = 1,
|
||||
}
|
||||
|
||||
@@ -64,6 +64,7 @@ import {
|
||||
DbChainTip,
|
||||
DbPox3Event,
|
||||
RawEventRequestInsertValues,
|
||||
IndexesState,
|
||||
} from './common';
|
||||
import { ClarityAbi } from '@stacks/transactions';
|
||||
import {
|
||||
@@ -96,6 +97,8 @@ import { parseResolver, parseZoneFileTxt } from '../event-stream/bns/bns-helpers
|
||||
import { Pox2EventName } from '../pox-helpers';
|
||||
import { logger } from '../logger';
|
||||
|
||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
||||
|
||||
class MicroblockGapError extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
@@ -3273,13 +3276,26 @@ export class PgWriteStore extends PgStore {
|
||||
}
|
||||
|
||||
/**
|
||||
* (event-replay) Enable or disable indexes for the provided set of tables.
|
||||
* (event-replay) Enable or disable indexes for all DB tables.
|
||||
*/
|
||||
async toggleTableIndexes(sql: PgSqlClient, tables: string[], enabled: boolean): Promise<void> {
|
||||
const tableSchema = this.sql.options.connection.search_path ?? 'public';
|
||||
async toggleAllTableIndexes(sql: PgSqlClient, state: IndexesState): Promise<void> {
|
||||
const enable: boolean = Boolean(state);
|
||||
const dbName = sql.options.database;
|
||||
const tableSchema = sql.options.connection.search_path ?? 'public';
|
||||
const tablesQuery = await sql<{ tablename: string }[]>`
|
||||
SELECT tablename FROM pg_catalog.pg_tables
|
||||
WHERE tablename != ${MIGRATIONS_TABLE}
|
||||
AND schemaname = ${tableSchema}`;
|
||||
if (tablesQuery.length === 0) {
|
||||
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
||||
console.error(errorMsg);
|
||||
throw new Error(errorMsg);
|
||||
}
|
||||
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
||||
|
||||
const result = await sql`
|
||||
UPDATE pg_index
|
||||
SET ${sql({ indisready: enabled, indisvalid: enabled })}
|
||||
SET ${sql({ indisready: enable, indisvalid: enable })}
|
||||
WHERE indrelid = ANY (
|
||||
SELECT oid FROM pg_class
|
||||
WHERE relname IN ${sql(tables)}
|
||||
|
||||
@@ -19,7 +19,7 @@ export class DatasetStore {
|
||||
const con = this.db.connect();
|
||||
return new Promise(resolve => {
|
||||
con.all(
|
||||
"SELECT ID FROM READ_PARQUET('events/new_block/canonical/*.parquet')",
|
||||
"SELECT id FROM READ_PARQUET('events/new_block/canonical/*.parquet')",
|
||||
(err: any, result: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
@@ -103,14 +103,25 @@ export class DatasetStore {
|
||||
// RAW EVENTS
|
||||
//
|
||||
|
||||
rawEventsStream = (): Promise<QueryResult> => {
|
||||
rawEvents = (): Promise<QueryResult> => {
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
const res = con.stream(
|
||||
`SELECT event, payload FROM READ_PARQUET('events/raw*.parquet') ORDER BY id`
|
||||
);
|
||||
con.all(
|
||||
`SELECT method, payload FROM READ_PARQUET([
|
||||
'events/new_burn_block/canonical/*.parquet',
|
||||
'events/attachments/new/*.parquet',
|
||||
'events/new_microblocks/*.parquet',
|
||||
'events/drop_mempool_tx/*.parquet',
|
||||
'events/new_mempool_tx/*.parquet',
|
||||
]) ORDER BY id`,
|
||||
(err: any, result: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
resolve(res);
|
||||
resolve(result);
|
||||
}
|
||||
);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
import * as fs from 'fs';
|
||||
|
||||
import { logger } from '../../logger';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
import { splitIntoChunks } from './helpers';
|
||||
|
||||
(async () => {
|
||||
const args = process.argv.slice(2);
|
||||
const workers: number = Number(args[0].split('=')[1]);
|
||||
|
||||
logger.info({ component: 'event-replay' }, `Generating ID files for ${workers} parallel workers`);
|
||||
|
||||
const dir = './events/new_block';
|
||||
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
const ids: number[] = await dataset.newBlockEventsIds();
|
||||
const batchSize = Math.ceil(ids.length / workers);
|
||||
const chunks = splitIntoChunks(ids, batchSize);
|
||||
|
||||
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
|
||||
|
||||
// delete previous files
|
||||
files.map(file => {
|
||||
try {
|
||||
fs.unlinkSync(`${dir}/${file}`);
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
// create id files
|
||||
chunks.forEach((chunk, idx) => {
|
||||
const filename = `./events/new_block/ids_${idx + 1}.txt`;
|
||||
chunk.forEach(id => {
|
||||
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
|
||||
});
|
||||
});
|
||||
})().catch(err => {
|
||||
throw err;
|
||||
});
|
||||
@@ -1,3 +1,8 @@
|
||||
import * as fs from 'fs';
|
||||
|
||||
import { logger } from '../../logger';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
|
||||
interface TimeTracker {
|
||||
track<T = void>(name: string, fn: () => Promise<T>): Promise<T>;
|
||||
trackSync<T = void>(name: string, fn: () => T): T;
|
||||
@@ -51,40 +56,6 @@ const createTimeTracker = (): TimeTracker => {
|
||||
};
|
||||
};
|
||||
|
||||
interface Stopwatch {
|
||||
/** Milliseconds since stopwatch was created. */
|
||||
getElapsed: () => number;
|
||||
/** Seconds since stopwatch was created. */
|
||||
getElapsedSeconds: (roundDecimals?: number) => number;
|
||||
getElapsedAndRestart: () => number;
|
||||
restart(): void;
|
||||
}
|
||||
|
||||
function stopwatch(): Stopwatch {
|
||||
let start = process.hrtime.bigint();
|
||||
const result: Stopwatch = {
|
||||
getElapsedSeconds: (roundDecimals?: number) => {
|
||||
const elapsedMs = result.getElapsed();
|
||||
const seconds = elapsedMs / 1000;
|
||||
return roundDecimals === undefined ? seconds : +seconds.toFixed(roundDecimals);
|
||||
},
|
||||
getElapsed: () => {
|
||||
const end = process.hrtime.bigint();
|
||||
return Number((end - start) / 1_000_000n);
|
||||
},
|
||||
getElapsedAndRestart: () => {
|
||||
const end = process.hrtime.bigint();
|
||||
const result = Number((end - start) / 1_000_000n);
|
||||
start = process.hrtime.bigint();
|
||||
return result;
|
||||
},
|
||||
restart: () => {
|
||||
start = process.hrtime.bigint();
|
||||
},
|
||||
};
|
||||
return result;
|
||||
}
|
||||
|
||||
function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
|
||||
for (let i = 0; i < arr.length; i += n) {
|
||||
yield arr.slice(i, i + n);
|
||||
@@ -95,4 +66,38 @@ const splitIntoChunks = (data: number[], chunk_size: number) => {
|
||||
return [...chunks(data, chunk_size)];
|
||||
};
|
||||
|
||||
export { createTimeTracker, splitIntoChunks };
|
||||
const genIdsFiles = async (dataset: DatasetStore) => {
|
||||
const args = process.argv.slice(2);
|
||||
const workers: number = Number(args[1].split('=')[1]);
|
||||
|
||||
logger.info({ component: 'event-replay' }, `Generating ID files for ${workers} parallel workers`);
|
||||
|
||||
const dir = './events/new_block';
|
||||
|
||||
const ids: number[] = await dataset.newBlockEventsIds();
|
||||
const batchSize = Math.ceil(ids.length / workers);
|
||||
const chunks = splitIntoChunks(ids, batchSize);
|
||||
|
||||
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
|
||||
|
||||
// delete previous files
|
||||
files.map(file => {
|
||||
try {
|
||||
fs.unlinkSync(`${dir}/${file}`);
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
// create id files
|
||||
chunks.forEach((chunk, idx) => {
|
||||
const filename = `./events/new_block/ids_${idx + 1}.txt`;
|
||||
chunk.forEach(id => {
|
||||
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
|
||||
});
|
||||
});
|
||||
|
||||
return fs.readdirSync(dir).filter(f => f.endsWith('txt'));
|
||||
};
|
||||
|
||||
export { createTimeTracker, splitIntoChunks, genIdsFiles };
|
||||
|
||||
@@ -50,13 +50,13 @@ function createBatchInserter<T>({
|
||||
|
||||
const insertInBatch = (db: PgWriteStore) => {
|
||||
const dbRawEventBatchInserter = createBatchInserter<RawEventRequestInsertValues>({
|
||||
batchSize: 500,
|
||||
batchSize: 200,
|
||||
insertFn: async entries => {
|
||||
logger.debug(
|
||||
{ component: 'event-replay' },
|
||||
'Inserting into event_observer_requests table...'
|
||||
);
|
||||
return db.insertRawEventRequestBatch(db.sql, entries);
|
||||
return await db.insertRawEventRequestBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbRawEventBatchInserter);
|
||||
@@ -64,11 +64,7 @@ const insertInBatch = (db: PgWriteStore) => {
|
||||
return new Writable({
|
||||
objectMode: true,
|
||||
write: async (data, _encoding, next) => {
|
||||
const insertRawEvents = async (data: any) => {
|
||||
await dbRawEventBatchInserter.push([{ event_path: data.event, payload: data.payload }]);
|
||||
};
|
||||
|
||||
await insertRawEvents(data);
|
||||
await dbRawEventBatchInserter.push([{ event_path: data.method, payload: data.payload }]);
|
||||
|
||||
next();
|
||||
},
|
||||
@@ -78,7 +74,7 @@ const insertInBatch = (db: PgWriteStore) => {
|
||||
export const processRawEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
|
||||
logger.info({ component: 'event-replay' }, 'RAW events process started');
|
||||
|
||||
const payload = await dataset.rawEventsStream();
|
||||
const payload = await dataset.rawEvents();
|
||||
const insert = insertInBatch(db);
|
||||
|
||||
await pipeline(
|
||||
|
||||
@@ -1,35 +1,45 @@
|
||||
import * as tty from 'tty';
|
||||
import * as fs from 'fs';
|
||||
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
import { logger } from '../../logger';
|
||||
import { createTimeTracker } from './helpers';
|
||||
import { createTimeTracker, genIdsFiles } from './helpers';
|
||||
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
|
||||
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
|
||||
import { processRawEvents } from './importers/raw-importer';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
||||
import { splitIntoChunks } from './helpers';
|
||||
import { IndexesState } from '../../datastore/common';
|
||||
|
||||
import * as _cluster from 'cluster';
|
||||
const cluster = (_cluster as unknown) as _cluster.Cluster; // typings fix
|
||||
|
||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
||||
|
||||
enum IndexesState {
|
||||
On,
|
||||
Off,
|
||||
}
|
||||
|
||||
export class ReplayController {
|
||||
private readonly db;
|
||||
private readonly dataset;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private constructor(db: PgWriteStore, dataset: DatasetStore) {
|
||||
this.db = db;
|
||||
this.dataset = dataset;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
static async init() {
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
return new ReplayController(db, dataset);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@@ -74,17 +84,39 @@ export class ReplayController {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
ingestRawEvents = async () => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
try {
|
||||
await timeTracker.track('RAW_EVENTS', async () => {
|
||||
await processRawEvents(this.db, this.dataset);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
ingestNewBlockEvents = (): Promise<boolean> => {
|
||||
return new Promise(async resolve => {
|
||||
cluster.setupPrimary({
|
||||
exec: __dirname + '/new-block-worker',
|
||||
exec: __dirname + '/workers/new-block-worker',
|
||||
});
|
||||
|
||||
let workersReady = 0;
|
||||
const idFiles = await this.genIdsFiles();
|
||||
const idFiles = await genIdsFiles(this.dataset);
|
||||
for (const idFile of idFiles) {
|
||||
cluster.fork().send(idFile);
|
||||
workersReady++;
|
||||
@@ -114,53 +146,6 @@ export class ReplayController {
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
ingestRawEvents = async () => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
try {
|
||||
await timeTracker.track('RAW_EVENTS', async () => {
|
||||
await processRawEvents(this.db, this.dataset);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private toggleIndexes = async (state: IndexesState) => {
|
||||
const db = this.db;
|
||||
const dbName = db.sql.options.database;
|
||||
const tableSchema = db.sql.options.connection.search_path ?? 'public';
|
||||
const tablesQuery = await db.sql<{ tablename: string }[]>`
|
||||
SELECT tablename FROM pg_catalog.pg_tables
|
||||
WHERE tablename != ${MIGRATIONS_TABLE}
|
||||
AND schemaname = ${tableSchema}`;
|
||||
if (tablesQuery.length === 0) {
|
||||
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
||||
console.error(errorMsg);
|
||||
throw new Error(errorMsg);
|
||||
}
|
||||
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
||||
|
||||
if (state === IndexesState.Off) {
|
||||
await db.toggleTableIndexes(db.sql, tables, false);
|
||||
} else if (state == IndexesState.On) {
|
||||
await db.toggleTableIndexes(db.sql, tables, true);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@@ -181,77 +166,20 @@ export class ReplayController {
|
||||
{ component: 'event-replay' },
|
||||
'Disabling indexes and constraints to speed up insertion'
|
||||
);
|
||||
await this.toggleIndexes(IndexesState.Off);
|
||||
await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.Off);
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private genIdsFiles = async () => {
|
||||
const args = process.argv.slice(2);
|
||||
const workers: number = Number(args[1].split('=')[1]);
|
||||
|
||||
logger.info(
|
||||
{ component: 'event-replay' },
|
||||
`Generating ID files for ${workers} parallel workers`
|
||||
);
|
||||
|
||||
const dir = './events/new_block';
|
||||
|
||||
const ids: number[] = await this.dataset.newBlockEventsIds();
|
||||
const batchSize = Math.ceil(ids.length / workers);
|
||||
const chunks = splitIntoChunks(ids, batchSize);
|
||||
|
||||
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
|
||||
|
||||
// delete previous files
|
||||
files.map(file => {
|
||||
try {
|
||||
fs.unlinkSync(`${dir}/${file}`);
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
// create id files
|
||||
chunks.forEach((chunk, idx) => {
|
||||
const filename = `./events/new_block/ids_${idx + 1}.txt`;
|
||||
chunk.forEach(id => {
|
||||
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
|
||||
});
|
||||
});
|
||||
|
||||
return fs.readdirSync(dir).filter(f => f.endsWith('txt'));
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
static async init() {
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
return new ReplayController(db, dataset);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
teardown = async () => {
|
||||
const db = this.db;
|
||||
|
||||
// Re-enabling indexes
|
||||
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
|
||||
await this.toggleIndexes(IndexesState.On);
|
||||
await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On);
|
||||
|
||||
// Refreshing materialized views
|
||||
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
|
||||
await db.finishEventReplay();
|
||||
await this.db.finishEventReplay();
|
||||
|
||||
await this.db.close();
|
||||
};
|
||||
@@ -265,6 +193,7 @@ export class ReplayController {
|
||||
this.ingestAttachmentNewEvents(),
|
||||
this.ingestRawEvents(),
|
||||
]);
|
||||
|
||||
await this.ingestNewBlockEvents();
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import * as fs from 'fs';
|
||||
import * as tty from 'tty';
|
||||
|
||||
import { processNewBlockEvents } from './importers/new-block-importer';
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
import { logger } from '../../logger';
|
||||
import { createTimeTracker } from './helpers';
|
||||
import { processNewBlockEvents } from '../importers/new-block-importer';
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
import { logger } from '../../../logger';
|
||||
import { createTimeTracker } from '../helpers';
|
||||
|
||||
const ingestNewBlock = async (idFile?: string) => {
|
||||
const db = await PgWriteStore.connect({
|
||||
Reference in New Issue
Block a user