mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
feat: parallel processing using node cluster
This commit is contained in:
@@ -1,70 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
|
|
||||||
#
|
|
||||||
# 1. Parsing comman line arguments
|
|
||||||
#
|
|
||||||
while getopts w: flag
|
|
||||||
do
|
|
||||||
case "${flag}" in
|
|
||||||
w) workers=${OPTARG};
|
|
||||||
esac
|
|
||||||
done
|
|
||||||
|
|
||||||
#
|
|
||||||
# 2. Prepare DB for event-replay
|
|
||||||
#
|
|
||||||
PG_USER=postgres \
|
|
||||||
PG_PASSWORD=postgres \
|
|
||||||
PG_DATABASE=stacks_blockchain_api \
|
|
||||||
PG_SCHEMA=public \
|
|
||||||
PG_PORT=5490 \
|
|
||||||
node ./lib/event-replay/parquet-based/pre-replay-db.js
|
|
||||||
|
|
||||||
wait
|
|
||||||
|
|
||||||
#
|
|
||||||
# 3. NEW_BURN_BLOCK events
|
|
||||||
#
|
|
||||||
STACKS_CHAIN_ID=0x00000001 node ./lib/index.js from-parquet-events --new-burn-block
|
|
||||||
|
|
||||||
wait
|
|
||||||
|
|
||||||
#
|
|
||||||
# 4. ATTACHMENTS_NEW events
|
|
||||||
#
|
|
||||||
STACKS_CHAIN_ID=0x00000001 node ./lib/index.js from-parquet-events --attachment-new
|
|
||||||
|
|
||||||
wait
|
|
||||||
|
|
||||||
#
|
|
||||||
# 5. NEW_BLOCK events
|
|
||||||
#
|
|
||||||
|
|
||||||
# Generate ID chunks in accordance to workers amount
|
|
||||||
node ./lib/event-replay/parquet-based/gen-ids-file.js --workers=${workers}
|
|
||||||
|
|
||||||
wait
|
|
||||||
|
|
||||||
id_files=(./events/new_block/ids_*.txt)
|
|
||||||
for id_file in "${id_files[@]}"
|
|
||||||
do
|
|
||||||
NODE_OPTIONS=--max-old-space-size=8192 \
|
|
||||||
STACKS_CHAIN_ID=0x00000001 \
|
|
||||||
node ./lib/index.js from-parquet-events --new-block --ids-path=${id_file} &
|
|
||||||
done
|
|
||||||
|
|
||||||
wait
|
|
||||||
|
|
||||||
#
|
|
||||||
# 6. Prepare DB for regular operations after event-replay
|
|
||||||
#
|
|
||||||
PG_USER=postgres \
|
|
||||||
PG_PASSWORD=postgres \
|
|
||||||
PG_DATABASE=stacks_blockchain_api \
|
|
||||||
PG_SCHEMA=public \
|
|
||||||
PG_PORT=5490 \
|
|
||||||
node ./lib/event-replay/parquet-based/post-replay-db.js
|
|
||||||
|
|
||||||
wait
|
|
||||||
|
|
||||||
exit 0
|
|
||||||
@@ -1,109 +0,0 @@
|
|||||||
import * as tty from 'tty';
|
|
||||||
import * as fs from 'fs';
|
|
||||||
|
|
||||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
|
||||||
import { logger } from '../../logger';
|
|
||||||
import { createTimeTracker } from './helpers';
|
|
||||||
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
|
|
||||||
import { processNewBlockEvents } from './importers/new-block-importer';
|
|
||||||
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
|
|
||||||
import { DatasetStore } from './dataset/store';
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
const ingestNewBurnBlock = async () => {
|
|
||||||
const timeTracker = createTimeTracker();
|
|
||||||
|
|
||||||
const db = await PgWriteStore.connect({
|
|
||||||
usageName: 'event-replay',
|
|
||||||
skipMigrations: true,
|
|
||||||
withNotifier: false,
|
|
||||||
isEventReplay: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
const dataset = DatasetStore.connect();
|
|
||||||
|
|
||||||
try {
|
|
||||||
await timeTracker.track('NEW_BURN_BLOCK_EVENTS', async () => {
|
|
||||||
await processNewBurnBlockEvents(db, dataset);
|
|
||||||
});
|
|
||||||
} catch (err) {
|
|
||||||
throw err;
|
|
||||||
} finally {
|
|
||||||
if (true || tty.isatty(1)) {
|
|
||||||
console.log('Tracked function times:');
|
|
||||||
console.table(timeTracker.getDurations(3));
|
|
||||||
} else {
|
|
||||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
|
||||||
}
|
|
||||||
|
|
||||||
await db.close();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
const ingestAttachmentNew = async () => {
|
|
||||||
const timeTracker = createTimeTracker();
|
|
||||||
|
|
||||||
const db = await PgWriteStore.connect({
|
|
||||||
usageName: 'event-replay',
|
|
||||||
skipMigrations: true,
|
|
||||||
withNotifier: false,
|
|
||||||
isEventReplay: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
const dataset = DatasetStore.connect();
|
|
||||||
|
|
||||||
try {
|
|
||||||
await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => {
|
|
||||||
await processAttachmentNewEvents(db, dataset);
|
|
||||||
});
|
|
||||||
} catch (err) {
|
|
||||||
throw err;
|
|
||||||
} finally {
|
|
||||||
if (true || tty.isatty(1)) {
|
|
||||||
console.log('Tracked function times:');
|
|
||||||
console.table(timeTracker.getDurations(3));
|
|
||||||
} else {
|
|
||||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
|
||||||
}
|
|
||||||
|
|
||||||
await db.close();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const ingestNewBlock = async (idsPath?: string) => {
|
|
||||||
const timeTracker = createTimeTracker();
|
|
||||||
|
|
||||||
const db = await PgWriteStore.connect({
|
|
||||||
usageName: 'event-replay',
|
|
||||||
skipMigrations: true,
|
|
||||||
withNotifier: false,
|
|
||||||
isEventReplay: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
const dataset = DatasetStore.connect();
|
|
||||||
|
|
||||||
try {
|
|
||||||
const idsFileContent = fs.readFileSync(`${idsPath}`, 'utf-8');
|
|
||||||
const ids = idsFileContent.split(/\r?\n/);
|
|
||||||
|
|
||||||
await timeTracker.track('NEW_BLOCK_EVENTS', async () => {
|
|
||||||
await processNewBlockEvents(db, dataset, ids);
|
|
||||||
});
|
|
||||||
} catch (err) {
|
|
||||||
throw err;
|
|
||||||
} finally {
|
|
||||||
if (true || tty.isatty(1)) {
|
|
||||||
console.log('Tracked function times:');
|
|
||||||
console.table(timeTracker.getDurations(3));
|
|
||||||
} else {
|
|
||||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export { ingestNewBlock, ingestAttachmentNew, ingestNewBurnBlock };
|
|
||||||
49
src/event-replay/parquet-based/new-block-worker.ts
Normal file
49
src/event-replay/parquet-based/new-block-worker.ts
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
import * as fs from 'fs';
|
||||||
|
import * as tty from 'tty';
|
||||||
|
|
||||||
|
import { processNewBlockEvents } from './importers/new-block-importer';
|
||||||
|
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||||
|
import { DatasetStore } from './dataset/store';
|
||||||
|
import { logger } from '../../logger';
|
||||||
|
import { createTimeTracker } from './helpers';
|
||||||
|
|
||||||
|
const ingestNewBlock = async (idFile?: string) => {
|
||||||
|
const db = await PgWriteStore.connect({
|
||||||
|
usageName: `${idFile}`,
|
||||||
|
skipMigrations: true,
|
||||||
|
withNotifier: false,
|
||||||
|
isEventReplay: true,
|
||||||
|
});
|
||||||
|
const dataset = DatasetStore.connect();
|
||||||
|
|
||||||
|
const timeTracker = createTimeTracker();
|
||||||
|
|
||||||
|
const dir = './events/new_block';
|
||||||
|
|
||||||
|
try {
|
||||||
|
const idsFileContent = fs.readFileSync(`${dir}/${idFile}`, 'utf-8');
|
||||||
|
const ids = idsFileContent.split(/\r?\n/);
|
||||||
|
|
||||||
|
await timeTracker.track('NEW_BLOCK_EVENTS', async () => {
|
||||||
|
await processNewBlockEvents(db, dataset, ids);
|
||||||
|
});
|
||||||
|
|
||||||
|
// notify parent
|
||||||
|
process.send?.({
|
||||||
|
msgType: 'FINISH',
|
||||||
|
msg: 'Worker has finished',
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
throw err;
|
||||||
|
} finally {
|
||||||
|
if (true || tty.isatty(1)) {
|
||||||
|
console.table(timeTracker.getDurations(3));
|
||||||
|
} else {
|
||||||
|
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
process.on('message', async (msg: string) => {
|
||||||
|
await ingestNewBlock(msg);
|
||||||
|
});
|
||||||
@@ -1,37 +0,0 @@
|
|||||||
import { logger } from '../../logger';
|
|
||||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
|
||||||
|
|
||||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
|
||||||
|
|
||||||
(async () => {
|
|
||||||
const db = await PgWriteStore.connect({
|
|
||||||
usageName: 'post-event-replay',
|
|
||||||
skipMigrations: true,
|
|
||||||
withNotifier: false,
|
|
||||||
isEventReplay: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
// Re-enable indexes
|
|
||||||
const dbName = db.sql.options.database;
|
|
||||||
const tableSchema = db.sql.options.connection.search_path ?? 'public';
|
|
||||||
const tablesQuery = await db.sql<{ tablename: string }[]>`
|
|
||||||
SELECT tablename FROM pg_catalog.pg_tables
|
|
||||||
WHERE tablename != ${MIGRATIONS_TABLE}
|
|
||||||
AND schemaname = ${tableSchema}`;
|
|
||||||
if (tablesQuery.length === 0) {
|
|
||||||
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
|
||||||
console.error(errorMsg);
|
|
||||||
throw new Error(errorMsg);
|
|
||||||
}
|
|
||||||
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
|
||||||
|
|
||||||
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
|
|
||||||
await db.toggleTableIndexes(db.sql, tables, true);
|
|
||||||
logger.info({ component: 'event-replay' }, `Indexes re-enabled on tables: ${tables.join(', ')}`);
|
|
||||||
|
|
||||||
// Refreshing materialized views
|
|
||||||
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
|
|
||||||
await db.finishEventReplay();
|
|
||||||
})().catch(err => {
|
|
||||||
throw err;
|
|
||||||
});
|
|
||||||
@@ -1,47 +0,0 @@
|
|||||||
import { logger } from '../../logger';
|
|
||||||
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
|
||||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
|
||||||
|
|
||||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
|
||||||
|
|
||||||
(async () => {
|
|
||||||
const db = await PgWriteStore.connect({
|
|
||||||
usageName: 'pre-event-replay',
|
|
||||||
skipMigrations: true,
|
|
||||||
withNotifier: false,
|
|
||||||
isEventReplay: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.info({ component: 'event-replay' }, 'Cleaning up the Database');
|
|
||||||
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
|
|
||||||
|
|
||||||
logger.info({ component: 'event-replay' }, 'Migrating tables');
|
|
||||||
try {
|
|
||||||
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(error);
|
|
||||||
throw new Error('DB migration cycle failed');
|
|
||||||
}
|
|
||||||
|
|
||||||
const dbName = db.sql.options.database;
|
|
||||||
const tableSchema = db.sql.options.connection.search_path ?? 'public';
|
|
||||||
const tablesQuery = await db.sql<{ tablename: string }[]>`
|
|
||||||
SELECT tablename FROM pg_catalog.pg_tables
|
|
||||||
WHERE tablename != ${MIGRATIONS_TABLE}
|
|
||||||
AND schemaname = ${tableSchema}`;
|
|
||||||
if (tablesQuery.length === 0) {
|
|
||||||
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
|
||||||
console.error(errorMsg);
|
|
||||||
throw new Error(errorMsg);
|
|
||||||
}
|
|
||||||
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
{ component: 'event-replay' },
|
|
||||||
'Disabling indexes and constraints to speed up insertion'
|
|
||||||
);
|
|
||||||
await db.toggleTableIndexes(db.sql, tables, false);
|
|
||||||
logger.info({ component: 'event-replay' }, `Indexes disabled on tables: ${tables.join(', ')}`);
|
|
||||||
})().catch(err => {
|
|
||||||
throw err;
|
|
||||||
});
|
|
||||||
240
src/event-replay/parquet-based/replay-controller.ts
Normal file
240
src/event-replay/parquet-based/replay-controller.ts
Normal file
@@ -0,0 +1,240 @@
|
|||||||
|
import * as tty from 'tty';
|
||||||
|
import * as fs from 'fs';
|
||||||
|
|
||||||
|
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||||
|
import { logger } from '../../logger';
|
||||||
|
import { createTimeTracker } from './helpers';
|
||||||
|
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
|
||||||
|
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
|
||||||
|
import { DatasetStore } from './dataset/store';
|
||||||
|
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
||||||
|
import { splitIntoChunks } from './helpers';
|
||||||
|
|
||||||
|
import * as _cluster from 'cluster';
|
||||||
|
const cluster = (_cluster as unknown) as _cluster.Cluster; // typings fix
|
||||||
|
|
||||||
|
const MIGRATIONS_TABLE = 'pgmigrations';
|
||||||
|
|
||||||
|
enum IndexesState {
|
||||||
|
On,
|
||||||
|
Off,
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ReplayController {
|
||||||
|
private readonly db;
|
||||||
|
private readonly dataset;
|
||||||
|
|
||||||
|
private constructor(db: PgWriteStore, dataset: DatasetStore) {
|
||||||
|
this.db = db;
|
||||||
|
this.dataset = dataset;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private ingestNewBurnBlockEvents = async () => {
|
||||||
|
const timeTracker = createTimeTracker();
|
||||||
|
|
||||||
|
try {
|
||||||
|
await timeTracker.track('NEW_BURN_BLOCK_EVENTS', async () => {
|
||||||
|
await processNewBurnBlockEvents(this.db, this.dataset);
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
throw err;
|
||||||
|
} finally {
|
||||||
|
if (true || tty.isatty(1)) {
|
||||||
|
console.log('Tracked function times:');
|
||||||
|
console.table(timeTracker.getDurations(3));
|
||||||
|
} else {
|
||||||
|
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private ingestAttachmentNewEvents = async () => {
|
||||||
|
const timeTracker = createTimeTracker();
|
||||||
|
|
||||||
|
try {
|
||||||
|
await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => {
|
||||||
|
await processAttachmentNewEvents(this.db, this.dataset);
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
throw err;
|
||||||
|
} finally {
|
||||||
|
if (true || tty.isatty(1)) {
|
||||||
|
console.log('Tracked function times:');
|
||||||
|
console.table(timeTracker.getDurations(3));
|
||||||
|
} else {
|
||||||
|
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private toggleIndexes = async (state: IndexesState) => {
|
||||||
|
const db = this.db;
|
||||||
|
const dbName = db.sql.options.database;
|
||||||
|
const tableSchema = db.sql.options.connection.search_path ?? 'public';
|
||||||
|
const tablesQuery = await db.sql<{ tablename: string }[]>`
|
||||||
|
SELECT tablename FROM pg_catalog.pg_tables
|
||||||
|
WHERE tablename != ${MIGRATIONS_TABLE}
|
||||||
|
AND schemaname = ${tableSchema}`;
|
||||||
|
if (tablesQuery.length === 0) {
|
||||||
|
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
|
||||||
|
console.error(errorMsg);
|
||||||
|
throw new Error(errorMsg);
|
||||||
|
}
|
||||||
|
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);
|
||||||
|
|
||||||
|
if (state === IndexesState.Off) {
|
||||||
|
await db.toggleTableIndexes(db.sql, tables, false);
|
||||||
|
} else if (state == IndexesState.On) {
|
||||||
|
await db.toggleTableIndexes(db.sql, tables, true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
prepare = async () => {
|
||||||
|
logger.info({ component: 'event-replay' }, 'Cleaning up the Database');
|
||||||
|
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
|
||||||
|
|
||||||
|
logger.info({ component: 'event-replay' }, 'Migrating tables');
|
||||||
|
try {
|
||||||
|
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error);
|
||||||
|
throw new Error('DB migration cycle failed');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disabling indexes
|
||||||
|
logger.info(
|
||||||
|
{ component: 'event-replay' },
|
||||||
|
'Disabling indexes and constraints to speed up insertion'
|
||||||
|
);
|
||||||
|
await this.toggleIndexes(IndexesState.Off);
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private genIdsFiles = async () => {
|
||||||
|
const args = process.argv.slice(2);
|
||||||
|
const workers: number = Number(args[1].split('=')[1]);
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
{ component: 'event-replay' },
|
||||||
|
`Generating ID files for ${workers} parallel workers`
|
||||||
|
);
|
||||||
|
|
||||||
|
const dir = './events/new_block';
|
||||||
|
|
||||||
|
const ids: number[] = await this.dataset.newBlockEventsIds();
|
||||||
|
const batchSize = Math.ceil(ids.length / workers);
|
||||||
|
const chunks = splitIntoChunks(ids, batchSize);
|
||||||
|
|
||||||
|
const files = fs.readdirSync(dir).filter(f => f.endsWith('txt'));
|
||||||
|
|
||||||
|
// delete previous files
|
||||||
|
files.map(file => {
|
||||||
|
try {
|
||||||
|
fs.unlinkSync(`${dir}/${file}`);
|
||||||
|
} catch (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// create id files
|
||||||
|
chunks.forEach((chunk, idx) => {
|
||||||
|
const filename = `./events/new_block/ids_${idx + 1}.txt`;
|
||||||
|
chunk.forEach(id => {
|
||||||
|
fs.writeFileSync(filename, id.toString() + '\n', { flag: 'a' });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
return fs.readdirSync(dir).filter(f => f.endsWith('txt'));
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
static async init() {
|
||||||
|
const db = await PgWriteStore.connect({
|
||||||
|
usageName: 'event-replay',
|
||||||
|
skipMigrations: true,
|
||||||
|
withNotifier: false,
|
||||||
|
isEventReplay: true,
|
||||||
|
});
|
||||||
|
const dataset = DatasetStore.connect();
|
||||||
|
|
||||||
|
return new ReplayController(db, dataset);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
teardown = async () => {
|
||||||
|
const db = this.db;
|
||||||
|
|
||||||
|
// Re-enabling indexes
|
||||||
|
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
|
||||||
|
await this.toggleIndexes(IndexesState.On);
|
||||||
|
|
||||||
|
// Refreshing materialized views
|
||||||
|
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
|
||||||
|
await db.finishEventReplay();
|
||||||
|
|
||||||
|
await this.db.close();
|
||||||
|
};
|
||||||
|
|
||||||
|
ingestNewBlockEvents = (): Promise<boolean> => {
|
||||||
|
return new Promise(async resolve => {
|
||||||
|
cluster.setupPrimary({
|
||||||
|
exec: __dirname + '/new-block-worker',
|
||||||
|
});
|
||||||
|
|
||||||
|
let workersReady = 0;
|
||||||
|
const idFiles = await this.genIdsFiles();
|
||||||
|
for (const idFile of idFiles) {
|
||||||
|
cluster.fork().send(idFile);
|
||||||
|
workersReady++;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const id in cluster.workers) {
|
||||||
|
const worker: _cluster.Worker | undefined = cluster.workers[id];
|
||||||
|
worker?.on('message', (msg, _handle) => {
|
||||||
|
switch (msg.msgType) {
|
||||||
|
case 'FINISH':
|
||||||
|
logger.info({ component: 'event-replay' }, `${msg.msg}`);
|
||||||
|
workersReady--;
|
||||||
|
worker.disconnect();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// default action
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
worker?.on('disconnect', () => {
|
||||||
|
if (workersReady === 0) {
|
||||||
|
resolve(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
do = async () => {
|
||||||
|
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
|
||||||
|
await this.ingestNewBlockEvents();
|
||||||
|
};
|
||||||
|
}
|
||||||
21
src/index.ts
21
src/index.ts
@@ -27,11 +27,7 @@ import { isFtMetadataEnabled, isNftMetadataEnabled } from './token-metadata/help
|
|||||||
import { TokensProcessorQueue } from './token-metadata/tokens-processor-queue';
|
import { TokensProcessorQueue } from './token-metadata/tokens-processor-queue';
|
||||||
import { registerMempoolPromStats } from './datastore/helpers';
|
import { registerMempoolPromStats } from './datastore/helpers';
|
||||||
import { logger } from './logger';
|
import { logger } from './logger';
|
||||||
import {
|
import { ReplayController } from './event-replay/parquet-based/replay-controller';
|
||||||
ingestAttachmentNew,
|
|
||||||
ingestNewBlock,
|
|
||||||
ingestNewBurnBlock,
|
|
||||||
} from './event-replay/parquet-based/event-replay';
|
|
||||||
|
|
||||||
enum StacksApiMode {
|
enum StacksApiMode {
|
||||||
/**
|
/**
|
||||||
@@ -305,17 +301,10 @@ async function handleProgramArgs() {
|
|||||||
args.options.force
|
args.options.force
|
||||||
);
|
);
|
||||||
} else if (args.operand === 'from-parquet-events') {
|
} else if (args.operand === 'from-parquet-events') {
|
||||||
if (args.options['new-burn-block']) {
|
const replay = await ReplayController.init();
|
||||||
await ingestNewBurnBlock();
|
await replay.prepare();
|
||||||
}
|
await replay.do();
|
||||||
|
await replay.teardown();
|
||||||
if (args.options['attachment-new']) {
|
|
||||||
await ingestAttachmentNew();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (args.options['new-block']) {
|
|
||||||
await ingestNewBlock(args.options['ids-path']);
|
|
||||||
}
|
|
||||||
} else if (parsedOpts._[0]) {
|
} else if (parsedOpts._[0]) {
|
||||||
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
|
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user