mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
feat: parallel processing using node cluster
This commit is contained in:
@@ -1,70 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
#
|
||||
# 1. Parsing comman line arguments
|
||||
#
|
||||
while getopts w: flag
|
||||
do
|
||||
case "${flag}" in
|
||||
w) workers=${OPTARG};
|
||||
esac
|
||||
done
|
||||
|
||||
#
|
||||
# 2. Prepare DB for event-replay
|
||||
#
|
||||
PG_USER=postgres \
|
||||
PG_PASSWORD=postgres \
|
||||
PG_DATABASE=stacks_blockchain_api \
|
||||
PG_SCHEMA=public \
|
||||
PG_PORT=5490 \
|
||||
node ./lib/event-replay/parquet-based/pre-replay-db.js
|
||||
|
||||
wait
|
||||
|
||||
#
|
||||
# 3. NEW_BURN_BLOCK events
|
||||
#
|
||||
STACKS_CHAIN_ID=0x00000001 node ./lib/index.js from-parquet-events --new-burn-block
|
||||
|
||||
wait
|
||||
|
||||
#
|
||||
# 4. ATTACHMENTS_NEW events
|
||||
#
|
||||
STACKS_CHAIN_ID=0x00000001 node ./lib/index.js from-parquet-events --attachment-new
|
||||
|
||||
wait
|
||||
|
||||
#
|
||||
# 5. NEW_BLOCK events
|
||||
#
|
||||
|
||||
# Generate ID chunks in accordance to workers amount
|
||||
node ./lib/event-replay/parquet-based/gen-ids-file.js --workers=${workers}
|
||||
|
||||
wait
|
||||
|
||||
id_files=(./events/new_block/ids_*.txt)
|
||||
for id_file in "${id_files[@]}"
|
||||
do
|
||||
NODE_OPTIONS=--max-old-space-size=8192 \
|
||||
STACKS_CHAIN_ID=0x00000001 \
|
||||
node ./lib/index.js from-parquet-events --new-block --ids-path=${id_file} &
|
||||
done
|
||||
|
||||
wait
|
||||
|
||||
#
|
||||
# 6. Prepare DB for regular operations after event-replay
|
||||
#
|
||||
PG_USER=postgres \
|
||||
PG_PASSWORD=postgres \
|
||||
PG_DATABASE=stacks_blockchain_api \
|
||||
PG_SCHEMA=public \
|
||||
PG_PORT=5490 \
|
||||
node ./lib/event-replay/parquet-based/post-replay-db.js
|
||||
|
||||
wait
|
||||
|
||||
exit 0
|
||||
@@ -1,109 +0,0 @@
|
||||
import * as tty from 'tty';
|
||||
import * as fs from 'fs';
|
||||
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
import { logger } from '../../logger';
|
||||
import { createTimeTracker } from './helpers';
|
||||
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
|
||||
import { processNewBlockEvents } from './importers/new-block-importer';
|
||||
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
const ingestNewBurnBlock = async () => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
try {
|
||||
await timeTracker.track('NEW_BURN_BLOCK_EVENTS', async () => {
|
||||
await processNewBurnBlockEvents(db, dataset);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
|
||||
await db.close();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
const ingestAttachmentNew = async () => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
try {
|
||||
await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => {
|
||||
await processAttachmentNewEvents(db, dataset);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
|
||||
await db.close();
|
||||
}
|
||||
};
|
||||
|
||||
const ingestNewBlock = async (idsPath?: string) => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
try {
|
||||
const idsFileContent = fs.readFileSync(`${idsPath}`, 'utf-8');
|
||||
const ids = idsFileContent.split(/\r?\n/);
|
||||
|
||||
await timeTracker.track('NEW_BLOCK_EVENTS', async () => {
|
||||
await processNewBlockEvents(db, dataset, ids);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export { ingestNewBlock, ingestAttachmentNew, ingestNewBurnBlock };
|
||||
49
src/event-replay/parquet-based/new-block-worker.ts
Normal file
49
src/event-replay/parquet-based/new-block-worker.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import * as fs from 'fs';
|
||||
import * as tty from 'tty';
|
||||
|
||||
import { processNewBlockEvents } from './importers/new-block-importer';
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
import { logger } from '../../logger';
|
||||
import { createTimeTracker } from './helpers';
|
||||
|
||||
const ingestNewBlock = async (idFile?: string) => {
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: `${idFile}`,
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const dir = './events/new_block';
|
||||
|
||||
try {
|
||||
const idsFileContent = fs.readFileSync(`${dir}/${idFile}`, 'utf-8');
|
||||
const ids = idsFileContent.split(/\r?\n/);
|
||||
|
||||
await timeTracker.track('NEW_BLOCK_EVENTS', async () => {
|
||||
await processNewBlockEvents(db, dataset, ids);
|
||||
});
|
||||
|
||||
// notify parent
|
||||
process.send?.({
|
||||
msgType: 'FINISH',
|
||||
msg: 'Worker has finished',
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
process.on('message', async (msg: string) => {
|
||||
await ingestNewBlock(msg);
|
||||
});
|
||||
@@ -1,37 +0,0 @@
|
||||
import { logger } from '../../logger';
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
|
||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
||||
|
||||
(async () => {
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'post-event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
// Re-enable indexes
|
||||
const dbName = db.sql.options.database;
|
||||
const tableSchema = db.sql.options.connection.search_path ?? 'public';
|
||||
const tablesQuery = await db.sql<{ tablename: string }[]>`
|
||||
SELECT tablename FROM pg_catalog.pg_tables
|
||||
WHERE tablename != ${MIGRATIONS_TABLE}
|
||||
AND schemaname = ${tableSchema}`;
|
||||
if (tablesQuery.length === 0) {
|
||||
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
||||
console.error(errorMsg);
|
||||
throw new Error(errorMsg);
|
||||
}
|
||||
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
||||
|
||||
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
|
||||
await db.toggleTableIndexes(db.sql, tables, true);
|
||||
logger.info({ component: 'event-replay' }, `Indexes re-enabled on tables: ${tables.join(', ')}`);
|
||||
|
||||
// Refreshing materialized views
|
||||
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
|
||||
await db.finishEventReplay();
|
||||
})().catch(err => {
|
||||
throw err;
|
||||
});
|
||||
@@ -1,47 +0,0 @@
|
||||
import { logger } from '../../logger';
|
||||
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
|
||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
||||
|
||||
(async () => {
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'pre-event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
|
||||
logger.info({ component: 'event-replay' }, 'Cleaning up the Database');
|
||||
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
|
||||
|
||||
logger.info({ component: 'event-replay' }, 'Migrating tables');
|
||||
try {
|
||||
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
throw new Error('DB migration cycle failed');
|
||||
}
|
||||
|
||||
const dbName = db.sql.options.database;
|
||||
const tableSchema = db.sql.options.connection.search_path ?? 'public';
|
||||
const tablesQuery = await db.sql<{ tablename: string }[]>`
|
||||
SELECT tablename FROM pg_catalog.pg_tables
|
||||
WHERE tablename != ${MIGRATIONS_TABLE}
|
||||
AND schemaname = ${tableSchema}`;
|
||||
if (tablesQuery.length === 0) {
|
||||
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
||||
console.error(errorMsg);
|
||||
throw new Error(errorMsg);
|
||||
}
|
||||
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
||||
|
||||
logger.info(
|
||||
{ component: 'event-replay' },
|
||||
'Disabling indexes and constraints to speed up insertion'
|
||||
);
|
||||
await db.toggleTableIndexes(db.sql, tables, false);
|
||||
logger.info({ component: 'event-replay' }, `Indexes disabled on tables: ${tables.join(', ')}`);
|
||||
})().catch(err => {
|
||||
throw err;
|
||||
});
|
||||
240
src/event-replay/parquet-based/replay-controller.ts
Normal file
240
src/event-replay/parquet-based/replay-controller.ts
Normal file
@@ -0,0 +1,240 @@
|
||||
import * as tty from 'tty';
|
||||
import * as fs from 'fs';
|
||||
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
import { logger } from '../../logger';
|
||||
import { createTimeTracker } from './helpers';
|
||||
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
|
||||
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
||||
import { splitIntoChunks } from './helpers';
|
||||
|
||||
import * as _cluster from 'cluster';
|
||||
const cluster = (_cluster as unknown) as _cluster.Cluster; // typings fix
|
||||
|
||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
||||
|
||||
enum IndexesState {
|
||||
On,
|
||||
Off,
|
||||
}
|
||||
|
||||
export class ReplayController {
|
||||
private readonly db;
|
||||
private readonly dataset;
|
||||
|
||||
private constructor(db: PgWriteStore, dataset: DatasetStore) {
|
||||
this.db = db;
|
||||
this.dataset = dataset;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private ingestNewBurnBlockEvents = async () => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
try {
|
||||
await timeTracker.track('NEW_BURN_BLOCK_EVENTS', async () => {
|
||||
await processNewBurnBlockEvents(this.db, this.dataset);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private ingestAttachmentNewEvents = async () => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
try {
|
||||
await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => {
|
||||
await processAttachmentNewEvents(this.db, this.dataset);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private toggleIndexes = async (state: IndexesState) => {
|
||||
const db = this.db;
|
||||
const dbName = db.sql.options.database;
|
||||
const tableSchema = db.sql.options.connection.search_path ?? 'public';
|
||||
const tablesQuery = await db.sql<{ tablename: string }[]>`
|
||||
SELECT tablename FROM pg_catalog.pg_tables
|
||||
WHERE tablename != ${MIGRATIONS_TABLE}
|
||||
AND schemaname = ${tableSchema}`;
|
||||
if (tablesQuery.length === 0) {
|
||||
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
||||
console.error(errorMsg);
|
||||
throw new Error(errorMsg);
|
||||
}
|
||||
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
||||
|
||||
if (state === IndexesState.Off) {
|
||||
await db.toggleTableIndexes(db.sql, tables, false);
|
||||
} else if (state == IndexesState.On) {
|
||||
await db.toggleTableIndexes(db.sql, tables, true);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
prepare = async () => {
|
||||
logger.info({ component: 'event-replay' }, 'Cleaning up the Database');
|
||||
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
|
||||
|
||||
logger.info({ component: 'event-replay' }, 'Migrating tables');
|
||||
try {
|
||||
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
throw new Error('DB migration cycle failed');
|
||||
}
|
||||
|
||||
// Disabling indexes
|
||||
logger.info(
|
||||
{ component: 'event-replay' },
|
||||
'Disabling indexes and constraints to speed up insertion'
|
||||
);
|
||||
await this.toggleIndexes(IndexesState.Off);
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private genIdsFiles = async () => {
|
||||
const args = process.argv.slice(2);
|
||||
const workers: number = Number(args[1].split('=')[1]);
|
||||
|
||||
logger.info(
|
||||
{ component: 'event-replay' },
|
||||
`Generating ID files for ${workers} parallel workers`
|
||||
);
|
||||
|
||||
const dir = './events/new_block';
|
||||
|
||||
const ids: number[] = await this.dataset.newBlockEventsIds();
|
||||
const batchSize = Math.ceil(ids.length / workers);
|
||||
const chunks = splitIntoChunks(ids, batchSize);
|
||||
|
||||
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
|
||||
|
||||
// delete previous files
|
||||
files.map(file => {
|
||||
try {
|
||||
fs.unlinkSync(`${dir}/${file}`);
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
// create id files
|
||||
chunks.forEach((chunk, idx) => {
|
||||
const filename = `./events/new_block/ids_${idx + 1}.txt`;
|
||||
chunk.forEach(id => {
|
||||
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
|
||||
});
|
||||
});
|
||||
|
||||
return fs.readdirSync(dir).filter(f => f.endsWith('txt'));
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
static async init() {
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'event-replay',
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
return new ReplayController(db, dataset);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
teardown = async () => {
|
||||
const db = this.db;
|
||||
|
||||
// Re-enabling indexes
|
||||
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
|
||||
await this.toggleIndexes(IndexesState.On);
|
||||
|
||||
// Refreshing materialized views
|
||||
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
|
||||
await db.finishEventReplay();
|
||||
|
||||
await this.db.close();
|
||||
};
|
||||
|
||||
ingestNewBlockEvents = (): Promise<boolean> => {
|
||||
return new Promise(async resolve => {
|
||||
cluster.setupPrimary({
|
||||
exec: __dirname + '/new-block-worker',
|
||||
});
|
||||
|
||||
let workersReady = 0;
|
||||
const idFiles = await this.genIdsFiles();
|
||||
for (const idFile of idFiles) {
|
||||
cluster.fork().send(idFile);
|
||||
workersReady++;
|
||||
}
|
||||
|
||||
for (const id in cluster.workers) {
|
||||
const worker: _cluster.Worker | undefined = cluster.workers[id];
|
||||
worker?.on('message', (msg, _handle) => {
|
||||
switch (msg.msgType) {
|
||||
case 'FINISH':
|
||||
logger.info({ component: 'event-replay' }, `${msg.msg}`);
|
||||
workersReady--;
|
||||
worker.disconnect();
|
||||
break;
|
||||
default:
|
||||
// default action
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
worker?.on('disconnect', () => {
|
||||
if (workersReady === 0) {
|
||||
resolve(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
do = async () => {
|
||||
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
|
||||
await this.ingestNewBlockEvents();
|
||||
};
|
||||
}
|
||||
21
src/index.ts
21
src/index.ts
@@ -27,11 +27,7 @@ import { isFtMetadataEnabled, isNftMetadataEnabled } from './token-metadata/help
|
||||
import { TokensProcessorQueue } from './token-metadata/tokens-processor-queue';
|
||||
import { registerMempoolPromStats } from './datastore/helpers';
|
||||
import { logger } from './logger';
|
||||
import {
|
||||
ingestAttachmentNew,
|
||||
ingestNewBlock,
|
||||
ingestNewBurnBlock,
|
||||
} from './event-replay/parquet-based/event-replay';
|
||||
import { ReplayController } from './event-replay/parquet-based/replay-controller';
|
||||
|
||||
enum StacksApiMode {
|
||||
/**
|
||||
@@ -305,17 +301,10 @@ async function handleProgramArgs() {
|
||||
args.options.force
|
||||
);
|
||||
} else if (args.operand === 'from-parquet-events') {
|
||||
if (args.options['new-burn-block']) {
|
||||
await ingestNewBurnBlock();
|
||||
}
|
||||
|
||||
if (args.options['attachment-new']) {
|
||||
await ingestAttachmentNew();
|
||||
}
|
||||
|
||||
if (args.options['new-block']) {
|
||||
await ingestNewBlock(args.options['ids-path']);
|
||||
}
|
||||
const replay = await ReplayController.init();
|
||||
await replay.prepare();
|
||||
await replay.do();
|
||||
await replay.teardown();
|
||||
} else if (parsedOpts._[0]) {
|
||||
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user