mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 08:34:40 +08:00
feat: processing raw events in parallel
This commit is contained in:
@@ -124,4 +124,15 @@ export class DatasetStore {
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
rawEventsByIds = (ids: number[]): Promise<QueryResult> => {
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
const res = con.stream(
|
||||
`SELECT method, payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id`
|
||||
);
|
||||
|
||||
resolve(res);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@@ -86,3 +86,23 @@ export const processRawEvents = async (db: PgWriteStore, dataset: DatasetStore)
|
||||
})
|
||||
);
|
||||
};
|
||||
|
||||
export const processRawEventsInParallel = async (
|
||||
db: PgWriteStore,
|
||||
dataset: DatasetStore,
|
||||
ids: any
|
||||
) => {
|
||||
logger.info({ component: 'event-replay' }, 'RAW events parallel processing started');
|
||||
|
||||
const payload = await dataset.rawEventsByIds(ids);
|
||||
const insert = insertInBatch(db);
|
||||
|
||||
await pipeline(
|
||||
Readable.from(payload),
|
||||
insert.on('finish', async () => {
|
||||
for (const batchInserter of batchInserters) {
|
||||
await batchInserter.flush();
|
||||
}
|
||||
})
|
||||
);
|
||||
};
|
||||
|
||||
@@ -106,6 +106,43 @@ export class ReplayController {
|
||||
}
|
||||
};
|
||||
|
||||
ingestRawNewBlockEvents = async () => {
|
||||
return new Promise(async resolve => {
|
||||
cluster.setupPrimary({
|
||||
exec: __dirname + '/workers/raw-worker',
|
||||
});
|
||||
|
||||
let workersReady = 0;
|
||||
const idFiles = await genIdsFiles(this.dataset);
|
||||
for (const idFile of idFiles) {
|
||||
cluster.fork().send(idFile);
|
||||
workersReady++;
|
||||
}
|
||||
|
||||
for (const id in cluster.workers) {
|
||||
const worker: _cluster.Worker | undefined = cluster.workers[id];
|
||||
worker?.on('message', (msg, _handle) => {
|
||||
switch (msg.msgType) {
|
||||
case 'FINISH':
|
||||
logger.info({ component: 'event-replay' }, `${msg.msg}`);
|
||||
workersReady--;
|
||||
worker.disconnect();
|
||||
break;
|
||||
default:
|
||||
// default action
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
worker?.on('disconnect', () => {
|
||||
if (workersReady === 0) {
|
||||
resolve(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@@ -188,12 +225,8 @@ export class ReplayController {
|
||||
*
|
||||
*/
|
||||
do = async () => {
|
||||
await Promise.all([
|
||||
this.ingestNewBurnBlockEvents(),
|
||||
this.ingestAttachmentNewEvents(),
|
||||
this.ingestRawEvents(),
|
||||
]);
|
||||
|
||||
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
|
||||
await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]);
|
||||
await this.ingestNewBlockEvents();
|
||||
};
|
||||
}
|
||||
|
||||
49
src/event-replay/parquet-based/workers/raw-worker.ts
Normal file
49
src/event-replay/parquet-based/workers/raw-worker.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import * as fs from 'fs';
|
||||
import * as tty from 'tty';
|
||||
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
import { logger } from '../../../logger';
|
||||
import { createTimeTracker } from '../helpers';
|
||||
import { processRawEventsInParallel } from '../importers/raw-importer';
|
||||
|
||||
const ingestNewBlock = async (idFile?: string) => {
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: `${idFile}`,
|
||||
skipMigrations: true,
|
||||
withNotifier: false,
|
||||
isEventReplay: true,
|
||||
});
|
||||
const dataset = DatasetStore.connect();
|
||||
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const dir = './events/new_block';
|
||||
|
||||
try {
|
||||
const idsFileContent = fs.readFileSync(`${dir}/${idFile}`, 'utf-8');
|
||||
const ids = idsFileContent.split(/\r?\n/);
|
||||
|
||||
await timeTracker.track('RAW_EVENTS_PARALLEL', async () => {
|
||||
await processRawEventsInParallel(db, dataset, ids);
|
||||
});
|
||||
|
||||
// notify parent
|
||||
process.send?.({
|
||||
msgType: 'FINISH',
|
||||
msg: 'Worker has finished',
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
process.on('message', async (msg: string) => {
|
||||
await ingestNewBlock(msg);
|
||||
});
|
||||
Reference in New Issue
Block a user