feat: event-replay remainder events handling

This commit is contained in:
Chris Guimaraes
2023-07-27 20:49:28 +01:00
parent 7a6f241923
commit 3ede07f134
5 changed files with 123 additions and 15 deletions

View File

@@ -135,4 +135,24 @@ export class DatasetStore {
resolve(res);
});
};
//
// REMAINDER EVENTS
//
remainderEvents = (): Promise<QueryResult> => {
return new Promise(resolve => {
const con = this.db.connect();
con.all(
`SELECT method, payload FROM READ_PARQUET('events/remainder/*.parquet') ORDER BY id`,
(err: any, res: any) => {
if (err) {
throw err;
}
resolve(res);
}
);
});
};
}

View File

@@ -42,6 +42,8 @@ export const processAttachmentNewEvents = async (db: PgWriteStore, dataset: Data
blockHeights.push(el.attachment!.blockHeight);
}
}
if (blockHeights.length > 0) {
// get events from block heights
const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights);
@@ -61,6 +63,7 @@ export const processAttachmentNewEvents = async (db: PgWriteStore, dataset: Data
}
}
}
}
await db.updateBatchSubdomainsEventReplay(db.sql, ary);
};

View File

@@ -0,0 +1,48 @@
import { Readable, Writable } from 'stream';
import { pipeline } from 'stream/promises';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { logger } from '../../../logger';
import { DatasetStore } from '../dataset/store';
import { EventStreamServer, startEventServer } from '../../../event-stream/event-server';
import { getApiConfiguredChainID, httpPostRequest } from '../../../helpers';
const chainID = getApiConfiguredChainID();
const processRequests = (eventServer: EventStreamServer) => {
return new Writable({
objectMode: true,
write: async (data, _encoding, next) => {
await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: data.method,
headers: { 'Content-Type': 'application/json' },
body: data.payload,
throwOnNotOK: true,
});
next();
},
});
};
export const processRemainderEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
logger.info({ component: 'event-replay' }, 'REMAINDER events processing started');
const eventServer = await startEventServer({
datastore: db,
chainId: chainID,
serverHost: '127.0.0.1',
serverPort: 0,
});
const eventStream = await dataset.remainderEvents();
const process = processRequests(eventServer);
await pipeline(
Readable.from(eventStream),
process.on('finish', async () => {
await eventServer.closeAsync();
})
);
};

View File

@@ -6,6 +6,7 @@ import { createTimeTracker, genIdsFiles } from './helpers';
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
import { processRawEvents } from './importers/raw-importer';
import { processRemainderEvents } from './importers/remainder-importer';
import { DatasetStore } from './dataset/store';
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
import { IndexesState } from '../../datastore/common';
@@ -183,6 +184,28 @@ export class ReplayController {
});
};
/**
*
*/
private ingestRemainderEvents = async () => {
const timeTracker = createTimeTracker();
try {
await timeTracker.track('REMAINDER_EVENTS', async () => {
await processRemainderEvents(this.db, this.dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};
/**
*
*/
@@ -209,24 +232,38 @@ export class ReplayController {
/**
*
*/
teardown = async () => {
finalize = async () => {
// Re-enabling indexes
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On);
// to be replayed with regular HTTP POSTs
await this.ingestRemainderEvents();
// Refreshing materialized views
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
await this.db.finishEventReplay();
// Close DB
logger.info({ component: 'event-replay' }, 'Closing DB connection');
await this.db.close();
// Exit with success
logger.info({ component: 'event-replay' }, 'Finishing event-replay with success');
process.exit(0);
};
/**
*
*/
do = async () => {
// NEW_BURN_BLOCK and ATTACHMENTS/NEW events
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
// RAW events to event_observer_requests table
await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]);
// NEW_BLOCK events
await this.ingestNewBlockEvents();
};
}

View File

@@ -304,7 +304,7 @@ async function handleProgramArgs() {
const replay = await ReplayController.init();
await replay.prepare();
await replay.do();
await replay.teardown();
await replay.finalize();
} else if (parsedOpts._[0]) {
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
} else {