feat: parallel processing using node cluster

This commit is contained in:
Chris Guimaraes
2023-07-22 14:25:57 +01:00
parent 6b1605b74d
commit d02a7e8ad8
7 changed files with 294 additions and 279 deletions

View File

@@ -1,70 +0,0 @@
#!/usr/bin/env bash
#
# 1. Parsing comman line arguments
#
while getopts w: flag
do
case "${flag}" in
w) workers=${OPTARG};
esac
done
#
# 2. Prepare DB for event-replay
#
PG_USER=postgres \
PG_PASSWORD=postgres \
PG_DATABASE=stacks_blockchain_api \
PG_SCHEMA=public \
PG_PORT=5490 \
node ./lib/event-replay/parquet-based/pre-replay-db.js
wait
#
# 3. NEW_BURN_BLOCK events
#
STACKS_CHAIN_ID=0x00000001 node ./lib/index.js from-parquet-events --new-burn-block
wait
#
# 4. ATTACHMENTS_NEW events
#
STACKS_CHAIN_ID=0x00000001 node ./lib/index.js from-parquet-events --attachment-new
wait
#
# 5. NEW_BLOCK events
#
# Generate ID chunks in accordance to workers amount
node ./lib/event-replay/parquet-based/gen-ids-file.js --workers=${workers}
wait
id_files=(./events/new_block/ids_*.txt)
for id_file in "${id_files[@]}"
do
NODE_OPTIONS=--max-old-space-size=8192 \
STACKS_CHAIN_ID=0x00000001 \
node ./lib/index.js from-parquet-events --new-block --ids-path=${id_file} &
done
wait
#
# 6. Prepare DB for regular operations after event-replay
#
PG_USER=postgres \
PG_PASSWORD=postgres \
PG_DATABASE=stacks_blockchain_api \
PG_SCHEMA=public \
PG_PORT=5490 \
node ./lib/event-replay/parquet-based/post-replay-db.js
wait
exit 0

View File

@@ -1,109 +0,0 @@
import * as tty from 'tty';
import * as fs from 'fs';
import { PgWriteStore } from '../../datastore/pg-write-store';
import { logger } from '../../logger';
import { createTimeTracker } from './helpers';
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
import { processNewBlockEvents } from './importers/new-block-importer';
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
import { DatasetStore } from './dataset/store';
/**
*
*/
const ingestNewBurnBlock = async () => {
const timeTracker = createTimeTracker();
const db = await PgWriteStore.connect({
usageName: 'event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = DatasetStore.connect();
try {
await timeTracker.track('NEW_BURN_BLOCK_EVENTS', async () => {
await processNewBurnBlockEvents(db, dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
await db.close();
}
};
/**
*
*/
const ingestAttachmentNew = async () => {
const timeTracker = createTimeTracker();
const db = await PgWriteStore.connect({
usageName: 'event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = DatasetStore.connect();
try {
await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => {
await processAttachmentNewEvents(db, dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
await db.close();
}
};
const ingestNewBlock = async (idsPath?: string) => {
const timeTracker = createTimeTracker();
const db = await PgWriteStore.connect({
usageName: 'event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = DatasetStore.connect();
try {
const idsFileContent = fs.readFileSync(`${idsPath}`, 'utf-8');
const ids = idsFileContent.split(/\r?\n/);
await timeTracker.track('NEW_BLOCK_EVENTS', async () => {
await processNewBlockEvents(db, dataset, ids);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};
export { ingestNewBlock, ingestAttachmentNew, ingestNewBurnBlock };

View File

@@ -0,0 +1,49 @@
import * as fs from 'fs';
import * as tty from 'tty';
import { processNewBlockEvents } from './importers/new-block-importer';
import { PgWriteStore } from '../../datastore/pg-write-store';
import { DatasetStore } from './dataset/store';
import { logger } from '../../logger';
import { createTimeTracker } from './helpers';
const ingestNewBlock = async (idFile?: string) => {
const db = await PgWriteStore.connect({
usageName: `${idFile}`,
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = DatasetStore.connect();
const timeTracker = createTimeTracker();
const dir = './events/new_block';
try {
const idsFileContent = fs.readFileSync(`${dir}/${idFile}`, 'utf-8');
const ids = idsFileContent.split(/\r?\n/);
await timeTracker.track('NEW_BLOCK_EVENTS', async () => {
await processNewBlockEvents(db, dataset, ids);
});
// notify parent
process.send?.({
msgType: 'FINISH',
msg: 'Worker has finished',
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};
process.on('message', async (msg: string) => {
await ingestNewBlock(msg);
});

View File

@@ -1,37 +0,0 @@
import { logger } from '../../logger';
import { PgWriteStore } from '../../datastore/pg-write-store';
const MIGRATIONS_TABLE = 'pgmigrations';
(async () => {
const db = await PgWriteStore.connect({
usageName: 'post-event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
// Re-enable indexes
const dbName = db.sql.options.database;
const tableSchema = db.sql.options.connection.search_path ?? 'public';
const tablesQuery = await db.sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
console.error(errorMsg);
throw new Error(errorMsg);
}
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
await db.toggleTableIndexes(db.sql, tables, true);
logger.info({ component: 'event-replay' }, `Indexes re-enabled on tables: ${tables.join(', ')}`);
// Refreshing materialized views
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
await db.finishEventReplay();
})().catch(err => {
throw err;
});

View File

@@ -1,47 +0,0 @@
import { logger } from '../../logger';
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
import { PgWriteStore } from '../../datastore/pg-write-store';
const MIGRATIONS_TABLE = 'pgmigrations';
(async () => {
const db = await PgWriteStore.connect({
usageName: 'pre-event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
logger.info({ component: 'event-replay' }, 'Cleaning up the Database');
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
logger.info({ component: 'event-replay' }, 'Migrating tables');
try {
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
} catch (error) {
logger.error(error);
throw new Error('DB migration cycle failed');
}
const dbName = db.sql.options.database;
const tableSchema = db.sql.options.connection.search_path ?? 'public';
const tablesQuery = await db.sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
console.error(errorMsg);
throw new Error(errorMsg);
}
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
logger.info(
{ component: 'event-replay' },
'Disabling indexes and constraints to speed up insertion'
);
await db.toggleTableIndexes(db.sql, tables, false);
logger.info({ component: 'event-replay' }, `Indexes disabled on tables: ${tables.join(', ')}`);
})().catch(err => {
throw err;
});

View File

@@ -0,0 +1,240 @@
import * as tty from 'tty';
import * as fs from 'fs';
import { PgWriteStore } from '../../datastore/pg-write-store';
import { logger } from '../../logger';
import { createTimeTracker } from './helpers';
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
import { DatasetStore } from './dataset/store';
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
import { splitIntoChunks } from './helpers';
import * as _cluster from 'cluster';
const cluster = (_cluster as unknown) as _cluster.Cluster; // typings fix
const MIGRATIONS_TABLE = 'pgmigrations';
enum IndexesState {
On,
Off,
}
export class ReplayController {
private readonly db;
private readonly dataset;
private constructor(db: PgWriteStore, dataset: DatasetStore) {
this.db = db;
this.dataset = dataset;
}
/**
*
*/
private ingestNewBurnBlockEvents = async () => {
const timeTracker = createTimeTracker();
try {
await timeTracker.track('NEW_BURN_BLOCK_EVENTS', async () => {
await processNewBurnBlockEvents(this.db, this.dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};
/**
*
*/
private ingestAttachmentNewEvents = async () => {
const timeTracker = createTimeTracker();
try {
await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => {
await processAttachmentNewEvents(this.db, this.dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};
/**
*
*/
private toggleIndexes = async (state: IndexesState) => {
const db = this.db;
const dbName = db.sql.options.database;
const tableSchema = db.sql.options.connection.search_path ?? 'public';
const tablesQuery = await db.sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
console.error(errorMsg);
throw new Error(errorMsg);
}
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
if (state === IndexesState.Off) {
await db.toggleTableIndexes(db.sql, tables, false);
} else if (state == IndexesState.On) {
await db.toggleTableIndexes(db.sql, tables, true);
}
};
/**
*
*/
prepare = async () => {
logger.info({ component: 'event-replay' }, 'Cleaning up the Database');
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
logger.info({ component: 'event-replay' }, 'Migrating tables');
try {
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
} catch (error) {
logger.error(error);
throw new Error('DB migration cycle failed');
}
// Disabling indexes
logger.info(
{ component: 'event-replay' },
'Disabling indexes and constraints to speed up insertion'
);
await this.toggleIndexes(IndexesState.Off);
};
/**
*
*/
private genIdsFiles = async () => {
const args = process.argv.slice(2);
const workers: number = Number(args[1].split('=')[1]);
logger.info(
{ component: 'event-replay' },
`Generating ID files for ${workers} parallel workers`
);
const dir = './events/new_block';
const ids: number[] = await this.dataset.newBlockEventsIds();
const batchSize = Math.ceil(ids.length / workers);
const chunks = splitIntoChunks(ids, batchSize);
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
// delete previous files
files.map(file => {
try {
fs.unlinkSync(`${dir}/${file}`);
} catch (err) {
throw err;
}
});
// create id files
chunks.forEach((chunk, idx) => {
const filename = `./events/new_block/ids_${idx + 1}.txt`;
chunk.forEach(id => {
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
});
});
return fs.readdirSync(dir).filter(f => f.endsWith('txt'));
};
/**
*
*/
static async init() {
const db = await PgWriteStore.connect({
usageName: 'event-replay',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = DatasetStore.connect();
return new ReplayController(db, dataset);
}
/**
*
*/
teardown = async () => {
const db = this.db;
// Re-enabling indexes
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
await this.toggleIndexes(IndexesState.On);
// Refreshing materialized views
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
await db.finishEventReplay();
await this.db.close();
};
ingestNewBlockEvents = (): Promise<boolean> => {
return new Promise(async resolve => {
cluster.setupPrimary({
exec: __dirname + '/new-block-worker',
});
let workersReady = 0;
const idFiles = await this.genIdsFiles();
for (const idFile of idFiles) {
cluster.fork().send(idFile);
workersReady++;
}
for (const id in cluster.workers) {
const worker: _cluster.Worker | undefined = cluster.workers[id];
worker?.on('message', (msg, _handle) => {
switch (msg.msgType) {
case 'FINISH':
logger.info({ component: 'event-replay' }, `${msg.msg}`);
workersReady--;
worker.disconnect();
break;
default:
// default action
break;
}
});
worker?.on('disconnect', () => {
if (workersReady === 0) {
resolve(true);
}
});
}
});
};
/**
*
*/
do = async () => {
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
await this.ingestNewBlockEvents();
};
}

View File

@@ -27,11 +27,7 @@ import { isFtMetadataEnabled, isNftMetadataEnabled } from './token-metadata/help
import { TokensProcessorQueue } from './token-metadata/tokens-processor-queue'; import { TokensProcessorQueue } from './token-metadata/tokens-processor-queue';
import { registerMempoolPromStats } from './datastore/helpers'; import { registerMempoolPromStats } from './datastore/helpers';
import { logger } from './logger'; import { logger } from './logger';
import { import { ReplayController } from './event-replay/parquet-based/replay-controller';
ingestAttachmentNew,
ingestNewBlock,
ingestNewBurnBlock,
} from './event-replay/parquet-based/event-replay';
enum StacksApiMode { enum StacksApiMode {
/** /**
@@ -305,17 +301,10 @@ async function handleProgramArgs() {
args.options.force args.options.force
); );
} else if (args.operand === 'from-parquet-events') { } else if (args.operand === 'from-parquet-events') {
if (args.options['new-burn-block']) { const replay = await ReplayController.init();
await ingestNewBurnBlock(); await replay.prepare();
} await replay.do();
await replay.teardown();
if (args.options['attachment-new']) {
await ingestAttachmentNew();
}
if (args.options['new-block']) {
await ingestNewBlock(args.options['ids-path']);
}
} else if (parsedOpts._[0]) { } else if (parsedOpts._[0]) {
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`); throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
} else { } else {