mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
feat: event-replay raw events handling
This commit is contained in:
@@ -1220,6 +1220,11 @@ export interface StxLockEventInsertValues {
|
||||
contract_name: string;
|
||||
}
|
||||
|
||||
export interface RawEventRequestInsertValues {
|
||||
event_path: string;
|
||||
payload: string;
|
||||
}
|
||||
|
||||
export interface Pox2EventQueryResult {
|
||||
event_index: number;
|
||||
tx_id: string;
|
||||
|
||||
@@ -63,6 +63,7 @@ import {
|
||||
DbMempoolTxRaw,
|
||||
DbChainTip,
|
||||
DbPox3Event,
|
||||
RawEventRequestInsertValues,
|
||||
} from './common';
|
||||
import { ClarityAbi } from '@stacks/transactions';
|
||||
import {
|
||||
@@ -3262,6 +3263,15 @@ export class PgWriteStore extends PgStore {
|
||||
}
|
||||
}
|
||||
|
||||
async insertRawEventRequestBatch(
|
||||
sql: PgSqlClient,
|
||||
events: RawEventRequestInsertValues[]
|
||||
): Promise<void> {
|
||||
await sql`
|
||||
INSERT INTO event_observer_requests ${this.sql(events)}
|
||||
`;
|
||||
}
|
||||
|
||||
/**
|
||||
* (event-replay) Enable or disable indexes for the provided set of tables.
|
||||
*/
|
||||
|
||||
@@ -12,7 +12,7 @@ export class DatasetStore {
|
||||
}
|
||||
|
||||
//
|
||||
// NEW_BLOCK canonical payload manipulation/queries
|
||||
// NEW_BLOCK EVENTS
|
||||
//
|
||||
|
||||
newBlockEventsIds = (): Promise<number[]> => {
|
||||
@@ -98,4 +98,19 @@ export class DatasetStore {
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
//
|
||||
// RAW EVENTS
|
||||
//
|
||||
|
||||
rawEventsStream = (): Promise<QueryResult> => {
|
||||
return new Promise(resolve => {
|
||||
const con = this.db.connect();
|
||||
const res = con.stream(
|
||||
`SELECT event, payload FROM READ_PARQUET('events/raw*.parquet') ORDER BY id`
|
||||
);
|
||||
|
||||
resolve(res);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
92
src/event-replay/parquet-based/importers/raw-importer.ts
Normal file
92
src/event-replay/parquet-based/importers/raw-importer.ts
Normal file
@@ -0,0 +1,92 @@
|
||||
import { Readable, Writable, Transform } from 'stream';
|
||||
import { pipeline } from 'stream/promises';
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { RawEventRequestInsertValues } from '../../../datastore/common';
|
||||
import { logger } from '../../../logger';
|
||||
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
|
||||
const chainID = getApiConfiguredChainID();
|
||||
|
||||
const batchInserters: BatchInserter[] = [];
|
||||
|
||||
interface BatchInserter<T = any> {
|
||||
push(entries: T[]): Promise<void>;
|
||||
flush(): Promise<void>;
|
||||
}
|
||||
|
||||
function createBatchInserter<T>({
|
||||
batchSize,
|
||||
insertFn,
|
||||
}: {
|
||||
batchSize: number;
|
||||
insertFn: (entries: T[]) => Promise<void>;
|
||||
}): BatchInserter<T> {
|
||||
let entryBuffer: T[] = [];
|
||||
return {
|
||||
async push(entries: T[]) {
|
||||
entries.length === 1
|
||||
? entryBuffer.push(entries[0])
|
||||
: entries.forEach(e => entryBuffer.push(e));
|
||||
if (entryBuffer.length === batchSize) {
|
||||
await insertFn(entryBuffer);
|
||||
entryBuffer.length = 0;
|
||||
} else if (entryBuffer.length > batchSize) {
|
||||
for (const batch of batchIterate(entryBuffer, batchSize)) {
|
||||
await insertFn(batch);
|
||||
}
|
||||
entryBuffer.length = 0;
|
||||
}
|
||||
},
|
||||
async flush() {
|
||||
logger.debug({ component: 'event-replay' }, 'Flushing remaining data...');
|
||||
if (entryBuffer.length > 0) {
|
||||
await insertFn(entryBuffer);
|
||||
entryBuffer = [];
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const insertInBatch = (db: PgWriteStore) => {
|
||||
const dbRawEventBatchInserter = createBatchInserter<RawEventRequestInsertValues>({
|
||||
batchSize: 500,
|
||||
insertFn: async entries => {
|
||||
logger.debug(
|
||||
{ component: 'event-replay' },
|
||||
'Inserting into event_observer_requests table...'
|
||||
);
|
||||
return db.insertRawEventRequestBatch(db.sql, entries);
|
||||
},
|
||||
});
|
||||
batchInserters.push(dbRawEventBatchInserter);
|
||||
|
||||
return new Writable({
|
||||
objectMode: true,
|
||||
write: async (data, _encoding, next) => {
|
||||
const insertRawEvents = async (data: any) => {
|
||||
await dbRawEventBatchInserter.push([{ event_path: data.event, payload: data.payload }]);
|
||||
};
|
||||
|
||||
await insertRawEvents(data);
|
||||
|
||||
next();
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
export const processRawEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
|
||||
logger.info({ component: 'event-replay' }, 'RAW events process started');
|
||||
|
||||
const payload = await dataset.rawEventsStream();
|
||||
const insert = insertInBatch(db);
|
||||
|
||||
await pipeline(
|
||||
Readable.from(payload),
|
||||
insert.on('finish', async () => {
|
||||
for (const batchInserter of batchInserters) {
|
||||
await batchInserter.flush();
|
||||
}
|
||||
})
|
||||
);
|
||||
};
|
||||
@@ -6,6 +6,7 @@ import { logger } from '../../logger';
|
||||
import { createTimeTracker } from './helpers';
|
||||
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
|
||||
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
|
||||
import { processRawEvents } from './importers/raw-importer';
|
||||
import { DatasetStore } from './dataset/store';
|
||||
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
||||
import { splitIntoChunks } from './helpers';
|
||||
@@ -73,6 +74,68 @@ export class ReplayController {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
ingestNewBlockEvents = (): Promise<boolean> => {
|
||||
return new Promise(async resolve => {
|
||||
cluster.setupPrimary({
|
||||
exec: __dirname + '/new-block-worker',
|
||||
});
|
||||
|
||||
let workersReady = 0;
|
||||
const idFiles = await this.genIdsFiles();
|
||||
for (const idFile of idFiles) {
|
||||
cluster.fork().send(idFile);
|
||||
workersReady++;
|
||||
}
|
||||
|
||||
for (const id in cluster.workers) {
|
||||
const worker: _cluster.Worker | undefined = cluster.workers[id];
|
||||
worker?.on('message', (msg, _handle) => {
|
||||
switch (msg.msgType) {
|
||||
case 'FINISH':
|
||||
logger.info({ component: 'event-replay' }, `${msg.msg}`);
|
||||
workersReady--;
|
||||
worker.disconnect();
|
||||
break;
|
||||
default:
|
||||
// default action
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
worker?.on('disconnect', () => {
|
||||
if (workersReady === 0) {
|
||||
resolve(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
ingestRawEvents = async () => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
try {
|
||||
await timeTracker.track('RAW_EVENTS', async () => {
|
||||
await processRawEvents(this.db, this.dataset);
|
||||
});
|
||||
} catch (err) {
|
||||
throw err;
|
||||
} finally {
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@@ -193,48 +256,15 @@ export class ReplayController {
|
||||
await this.db.close();
|
||||
};
|
||||
|
||||
ingestNewBlockEvents = (): Promise<boolean> => {
|
||||
return new Promise(async resolve => {
|
||||
cluster.setupPrimary({
|
||||
exec: __dirname + '/new-block-worker',
|
||||
});
|
||||
|
||||
let workersReady = 0;
|
||||
const idFiles = await this.genIdsFiles();
|
||||
for (const idFile of idFiles) {
|
||||
cluster.fork().send(idFile);
|
||||
workersReady++;
|
||||
}
|
||||
|
||||
for (const id in cluster.workers) {
|
||||
const worker: _cluster.Worker | undefined = cluster.workers[id];
|
||||
worker?.on('message', (msg, _handle) => {
|
||||
switch (msg.msgType) {
|
||||
case 'FINISH':
|
||||
logger.info({ component: 'event-replay' }, `${msg.msg}`);
|
||||
workersReady--;
|
||||
worker.disconnect();
|
||||
break;
|
||||
default:
|
||||
// default action
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
worker?.on('disconnect', () => {
|
||||
if (workersReady === 0) {
|
||||
resolve(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
do = async () => {
|
||||
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
|
||||
await Promise.all([
|
||||
this.ingestNewBurnBlockEvents(),
|
||||
this.ingestAttachmentNewEvents(),
|
||||
this.ingestRawEvents(),
|
||||
]);
|
||||
await this.ingestNewBlockEvents();
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user