feat: event-replay remainder events handling

This commit is contained in:
Chris Guimaraes
2023-07-27 20:49:28 +01:00
parent 7a6f241923
commit 3ede07f134
5 changed files with 123 additions and 15 deletions

View File

@@ -135,4 +135,24 @@ export class DatasetStore {
resolve(res); resolve(res);
}); });
}; };
//
// REMAINDER EVENTS
//
remainderEvents = (): Promise<QueryResult> => {
return new Promise(resolve => {
const con = this.db.connect();
con.all(
`SELECT method, payload FROM READ_PARQUET('events/remainder/*.parquet') ORDER BY id`,
(err: any, res: any) => {
if (err) {
throw err;
}
resolve(res);
}
);
});
};
} }

View File

@@ -42,22 +42,25 @@ export const processAttachmentNewEvents = async (db: PgWriteStore, dataset: Data
blockHeights.push(el.attachment!.blockHeight); blockHeights.push(el.attachment!.blockHeight);
} }
} }
// get events from block heights
const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights);
for (const event of blockEvents) { if (blockHeights.length > 0) {
for (const ds of ary) { // get events from block heights
if (ds.blockData?.index_block_hash === event.index_block_hash) { const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights);
const txs = JSON.parse(event.payload).transactions;
for (const tx of txs) { for (const event of blockEvents) {
if (ds.attachment!.txId === tx.txid) { for (const ds of ary) {
ds.blockData!.microblock_hash = tx.microblock_hash || ''; if (ds.blockData?.index_block_hash === event.index_block_hash) {
ds.blockData!.microblock_sequence = tx.microblock_sequence || I32_MAX; const txs = JSON.parse(event.payload).transactions;
for (const tx of txs) {
if (ds.attachment!.txId === tx.txid) {
ds.blockData!.microblock_hash = tx.microblock_hash || '';
ds.blockData!.microblock_sequence = tx.microblock_sequence || I32_MAX;
}
} }
}
ds.blockData!.index_block_hash = event.index_block_hash; ds.blockData!.index_block_hash = event.index_block_hash;
ds.blockData!.parent_index_block_hash = event.parent_index_block_hash; ds.blockData!.parent_index_block_hash = event.parent_index_block_hash;
}
} }
} }
} }

View File

@@ -0,0 +1,48 @@
import { Readable, Writable } from 'stream';
import { pipeline } from 'stream/promises';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { logger } from '../../../logger';
import { DatasetStore } from '../dataset/store';
import { EventStreamServer, startEventServer } from '../../../event-stream/event-server';
import { getApiConfiguredChainID, httpPostRequest } from '../../../helpers';
const chainID = getApiConfiguredChainID();
const processRequests = (eventServer: EventStreamServer) => {
return new Writable({
objectMode: true,
write: async (data, _encoding, next) => {
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: data.method,
headers: { 'Content-Type': 'application/json' },
body: data.payload,
throwOnNotOK: true,
});
next();
},
});
};
export const processRemainderEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
logger.info({ component: 'event-replay' }, 'REMAINDER events processing started');
const eventServer = await startEventServer({
datastore: db,
chainId: chainID,
serverHost: '127.0.0.1',
serverPort: 0,
});
const eventStream = await dataset.remainderEvents();
const process = processRequests(eventServer);
await pipeline(
Readable.from(eventStream),
process.on('finish', async () => {
await eventServer.closeAsync();
})
);
};

View File

@@ -6,6 +6,7 @@ import { createTimeTracker, genIdsFiles } from './helpers';
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer'; import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
import { processAttachmentNewEvents } from './importers/attachment-new-importer'; import { processAttachmentNewEvents } from './importers/attachment-new-importer';
import { processRawEvents } from './importers/raw-importer'; import { processRawEvents } from './importers/raw-importer';
import { processRemainderEvents } from './importers/remainder-importer';
import { DatasetStore } from './dataset/store'; import { DatasetStore } from './dataset/store';
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations'; import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
import { IndexesState } from '../../datastore/common'; import { IndexesState } from '../../datastore/common';
@@ -183,6 +184,28 @@ export class ReplayController {
}); });
}; };
/**
*
*/
private ingestRemainderEvents = async () => {
const timeTracker = createTimeTracker();
try {
await timeTracker.track('REMAINDER_EVENTS', async () => {
await processRemainderEvents(this.db, this.dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};
/** /**
* *
*/ */
@@ -209,24 +232,38 @@ export class ReplayController {
/** /**
* *
*/ */
teardown = async () => { finalize = async () => {
// Re-enabling indexes // Re-enabling indexes
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables'); logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On); await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On);
// to be replayed with regular HTTP POSTs
await this.ingestRemainderEvents();
// Refreshing materialized views // Refreshing materialized views
logger.info({ component: 'event-replay' }, `Refreshing materialized views`); logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
await this.db.finishEventReplay(); await this.db.finishEventReplay();
// Close DB
logger.info({ component: 'event-replay' }, 'Closing DB connection');
await this.db.close(); await this.db.close();
// Exit with success
logger.info({ component: 'event-replay' }, 'Finishing event-replay with success');
process.exit(0);
}; };
/** /**
* *
*/ */
do = async () => { do = async () => {
// NEW_BURN_BLOCK and ATTACHMENTS/NEW events
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]); await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
// RAW events to event_observer_requests table
await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]); await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]);
// NEW_BLOCK events
await this.ingestNewBlockEvents(); await this.ingestNewBlockEvents();
}; };
} }

View File

@@ -304,7 +304,7 @@ async function handleProgramArgs() {
const replay = await ReplayController.init(); const replay = await ReplayController.init();
await replay.prepare(); await replay.prepare();
await replay.do(); await replay.do();
await replay.teardown(); await replay.finalize();
} else if (parsedOpts._[0]) { } else if (parsedOpts._[0]) {
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`); throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
} else { } else {