feat: event-replay raw events handling

This commit is contained in:
Chris Guimaraes
2023-07-24 17:17:46 +01:00
parent d02a7e8ad8
commit 81f43cf7c3
5 changed files with 191 additions and 39 deletions

View File

@@ -1220,6 +1220,11 @@ export interface StxLockEventInsertValues {
contract_name: string;
}
export interface RawEventRequestInsertValues {
event_path: string;
payload: string;
}
export interface Pox2EventQueryResult {
event_index: number;
tx_id: string;

View File

@@ -63,6 +63,7 @@ import {
DbMempoolTxRaw,
DbChainTip,
DbPox3Event,
RawEventRequestInsertValues,
} from './common';
import { ClarityAbi } from '@stacks/transactions';
import {
@@ -3262,6 +3263,15 @@ export class PgWriteStore extends PgStore {
}
}
async insertRawEventRequestBatch(
sql: PgSqlClient,
events: RawEventRequestInsertValues[]
): Promise<void> {
await sql`
INSERT INTO event_observer_requests ${this.sql(events)}
`;
}
/**
* (event-replay) Enable or disable indexes for the provided set of tables.
*/

View File

@@ -12,7 +12,7 @@ export class DatasetStore {
}
//
// NEW_BLOCK canonical payload manipulation/queries
// NEW_BLOCK EVENTS
//
newBlockEventsIds = (): Promise<number[]> => {
@@ -98,4 +98,19 @@ export class DatasetStore {
);
});
};
//
// RAW EVENTS
//
rawEventsStream = (): Promise<QueryResult> => {
return new Promise(resolve => {
const con = this.db.connect();
const res = con.stream(
`SELECT event, payload FROM READ_PARQUET('events/raw*.parquet') ORDER BY id`
);
resolve(res);
});
};
}

View File

@@ -0,0 +1,92 @@
import { Readable, Writable, Transform } from 'stream';
import { pipeline } from 'stream/promises';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { RawEventRequestInsertValues } from '../../../datastore/common';
import { logger } from '../../../logger';
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
import { DatasetStore } from '../dataset/store';
const chainID = getApiConfiguredChainID();
const batchInserters: BatchInserter[] = [];
interface BatchInserter<T = any> {
push(entries: T[]): Promise<void>;
flush(): Promise<void>;
}
function createBatchInserter<T>({
batchSize,
insertFn,
}: {
batchSize: number;
insertFn: (entries: T[]) => Promise<void>;
}): BatchInserter<T> {
let entryBuffer: T[] = [];
return {
async push(entries: T[]) {
entries.length === 1
? entryBuffer.push(entries[0])
: entries.forEach(e => entryBuffer.push(e));
if (entryBuffer.length === batchSize) {
await insertFn(entryBuffer);
entryBuffer.length = 0;
} else if (entryBuffer.length > batchSize) {
for (const batch of batchIterate(entryBuffer, batchSize)) {
await insertFn(batch);
}
entryBuffer.length = 0;
}
},
async flush() {
logger.debug({ component: 'event-replay' }, 'Flushing remaining data...');
if (entryBuffer.length > 0) {
await insertFn(entryBuffer);
entryBuffer = [];
}
},
};
}
const insertInBatch = (db: PgWriteStore) => {
const dbRawEventBatchInserter = createBatchInserter<RawEventRequestInsertValues>({
batchSize: 500,
insertFn: async entries => {
logger.debug(
{ component: 'event-replay' },
'Inserting into event_observer_requests table...'
);
return db.insertRawEventRequestBatch(db.sql, entries);
},
});
batchInserters.push(dbRawEventBatchInserter);
return new Writable({
objectMode: true,
write: async (data, _encoding, next) => {
const insertRawEvents = async (data: any) => {
await dbRawEventBatchInserter.push([{ event_path: data.event, payload: data.payload }]);
};
await insertRawEvents(data);
next();
},
});
};
export const processRawEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
logger.info({ component: 'event-replay' }, 'RAW events process started');
const payload = await dataset.rawEventsStream();
const insert = insertInBatch(db);
await pipeline(
Readable.from(payload),
insert.on('finish', async () => {
for (const batchInserter of batchInserters) {
await batchInserter.flush();
}
})
);
};

View File

@@ -6,6 +6,7 @@ import { logger } from '../../logger';
import { createTimeTracker } from './helpers';
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
import { processRawEvents } from './importers/raw-importer';
import { DatasetStore } from './dataset/store';
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
import { splitIntoChunks } from './helpers';
@@ -73,6 +74,68 @@ export class ReplayController {
}
};
/**
*
*/
ingestNewBlockEvents = (): Promise<boolean> => {
return new Promise(async resolve => {
cluster.setupPrimary({
exec: __dirname + '/new-block-worker',
});
let workersReady = 0;
const idFiles = await this.genIdsFiles();
for (const idFile of idFiles) {
cluster.fork().send(idFile);
workersReady++;
}
for (const id in cluster.workers) {
const worker: _cluster.Worker | undefined = cluster.workers[id];
worker?.on('message', (msg, _handle) => {
switch (msg.msgType) {
case 'FINISH':
logger.info({ component: 'event-replay' }, `${msg.msg}`);
workersReady--;
worker.disconnect();
break;
default:
// default action
break;
}
});
worker?.on('disconnect', () => {
if (workersReady === 0) {
resolve(true);
}
});
}
});
};
/**
*
*/
ingestRawEvents = async () => {
const timeTracker = createTimeTracker();
try {
await timeTracker.track('RAW_EVENTS', async () => {
await processRawEvents(this.db, this.dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};
/**
*
*/
@@ -193,48 +256,15 @@ export class ReplayController {
await this.db.close();
};
ingestNewBlockEvents = (): Promise<boolean> => {
return new Promise(async resolve => {
cluster.setupPrimary({
exec: __dirname + '/new-block-worker',
});
let workersReady = 0;
const idFiles = await this.genIdsFiles();
for (const idFile of idFiles) {
cluster.fork().send(idFile);
workersReady++;
}
for (const id in cluster.workers) {
const worker: _cluster.Worker | undefined = cluster.workers[id];
worker?.on('message', (msg, _handle) => {
switch (msg.msgType) {
case 'FINISH':
logger.info({ component: 'event-replay' }, `${msg.msg}`);
workersReady--;
worker.disconnect();
break;
default:
// default action
break;
}
});
worker?.on('disconnect', () => {
if (workersReady === 0) {
resolve(true);
}
});
}
});
};
/**
*
*/
do = async () => {
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
await Promise.all([
this.ingestNewBurnBlockEvents(),
this.ingestAttachmentNewEvents(),
this.ingestRawEvents(),
]);
await this.ingestNewBlockEvents();
};
}