feat: processing raw events in parallel

This commit is contained in:
Chris Guimaraes
2023-07-26 11:42:58 +01:00
parent bb70ca99c0
commit 7a6f241923
4 changed files with 119 additions and 6 deletions

View File

@@ -124,4 +124,15 @@ export class DatasetStore {
);
});
};
rawEventsByIds = (ids: number[]): Promise<QueryResult> => {
return new Promise(resolve => {
const con = this.db.connect();
const res = con.stream(
`SELECT method, payload FROM READ_PARQUET('events/new_block/canonical/*.parquet') WHERE id IN (${ids}) ORDER BY id`
);
resolve(res);
});
};
}

View File

@@ -86,3 +86,23 @@ export const processRawEvents = async (db: PgWriteStore, dataset: DatasetStore)
})
);
};
export const processRawEventsInParallel = async (
db: PgWriteStore,
dataset: DatasetStore,
ids: any
) => {
logger.info({ component: 'event-replay' }, 'RAW events parallel processing started');
const payload = await dataset.rawEventsByIds(ids);
const insert = insertInBatch(db);
await pipeline(
Readable.from(payload),
insert.on('finish', async () => {
for (const batchInserter of batchInserters) {
await batchInserter.flush();
}
})
);
};

View File

@@ -106,6 +106,43 @@ export class ReplayController {
}
};
ingestRawNewBlockEvents = async () => {
return new Promise(async resolve => {
cluster.setupPrimary({
exec: __dirname + '/workers/raw-worker',
});
let workersReady = 0;
const idFiles = await genIdsFiles(this.dataset);
for (const idFile of idFiles) {
cluster.fork().send(idFile);
workersReady++;
}
for (const id in cluster.workers) {
const worker: _cluster.Worker | undefined = cluster.workers[id];
worker?.on('message', (msg, _handle) => {
switch (msg.msgType) {
case 'FINISH':
logger.info({ component: 'event-replay' }, `${msg.msg}`);
workersReady--;
worker.disconnect();
break;
default:
// default action
break;
}
});
worker?.on('disconnect', () => {
if (workersReady === 0) {
resolve(true);
}
});
}
});
};
/**
*
*/
@@ -188,12 +225,8 @@ export class ReplayController {
*
*/
do = async () => {
await Promise.all([
this.ingestNewBurnBlockEvents(),
this.ingestAttachmentNewEvents(),
this.ingestRawEvents(),
]);
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]);
await this.ingestNewBlockEvents();
};
}

View File

@@ -0,0 +1,49 @@
import * as fs from 'fs';
import * as tty from 'tty';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { DatasetStore } from '../dataset/store';
import { logger } from '../../../logger';
import { createTimeTracker } from '../helpers';
import { processRawEventsInParallel } from '../importers/raw-importer';
const ingestNewBlock = async (idFile?: string) => {
const db = await PgWriteStore.connect({
usageName: `${idFile}`,
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
const dataset = DatasetStore.connect();
const timeTracker = createTimeTracker();
const dir = './events/new_block';
try {
const idsFileContent = fs.readFileSync(`${dir}/${idFile}`, 'utf-8');
const ids = idsFileContent.split(/\r?\n/);
await timeTracker.track('RAW_EVENTS_PARALLEL', async () => {
await processRawEventsInParallel(db, dataset, ids);
});
// notify parent
process.send?.({
msgType: 'FINISH',
msg: 'Worker has finished',
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};
process.on('message', async (msg: string) => {
await ingestNewBlock(msg);
});