mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 16:53:19 +08:00
feat: event-replay remainder events handling
This commit is contained in:
@@ -135,4 +135,24 @@ export class DatasetStore {
|
|||||||
resolve(res);
|
resolve(res);
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
//
|
||||||
|
// REMAINDER EVENTS
|
||||||
|
//
|
||||||
|
|
||||||
|
remainderEvents = (): Promise<QueryResult> => {
|
||||||
|
return new Promise(resolve => {
|
||||||
|
const con = this.db.connect();
|
||||||
|
con.all(
|
||||||
|
`SELECT method, payload FROM READ_PARQUET('events/remainder/*.parquet') ORDER BY id`,
|
||||||
|
(err: any, res: any) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
|
||||||
|
resolve(res);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
});
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,22 +42,25 @@ export const processAttachmentNewEvents = async (db: PgWriteStore, dataset: Data
|
|||||||
blockHeights.push(el.attachment!.blockHeight);
|
blockHeights.push(el.attachment!.blockHeight);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// get events from block heights
|
|
||||||
const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights);
|
|
||||||
|
|
||||||
for (const event of blockEvents) {
|
if (blockHeights.length > 0) {
|
||||||
for (const ds of ary) {
|
// get events from block heights
|
||||||
if (ds.blockData?.index_block_hash === event.index_block_hash) {
|
const blockEvents = await dataset.getNewBlockEventsInBlockHeights(blockHeights);
|
||||||
const txs = JSON.parse(event.payload).transactions;
|
|
||||||
for (const tx of txs) {
|
for (const event of blockEvents) {
|
||||||
if (ds.attachment!.txId === tx.txid) {
|
for (const ds of ary) {
|
||||||
ds.blockData!.microblock_hash = tx.microblock_hash || '';
|
if (ds.blockData?.index_block_hash === event.index_block_hash) {
|
||||||
ds.blockData!.microblock_sequence = tx.microblock_sequence || I32_MAX;
|
const txs = JSON.parse(event.payload).transactions;
|
||||||
|
for (const tx of txs) {
|
||||||
|
if (ds.attachment!.txId === tx.txid) {
|
||||||
|
ds.blockData!.microblock_hash = tx.microblock_hash || '';
|
||||||
|
ds.blockData!.microblock_sequence = tx.microblock_sequence || I32_MAX;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
ds.blockData!.index_block_hash = event.index_block_hash;
|
ds.blockData!.index_block_hash = event.index_block_hash;
|
||||||
ds.blockData!.parent_index_block_hash = event.parent_index_block_hash;
|
ds.blockData!.parent_index_block_hash = event.parent_index_block_hash;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,48 @@
|
|||||||
|
import { Readable, Writable } from 'stream';
|
||||||
|
import { pipeline } from 'stream/promises';
|
||||||
|
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||||
|
import { logger } from '../../../logger';
|
||||||
|
import { DatasetStore } from '../dataset/store';
|
||||||
|
import { EventStreamServer, startEventServer } from '../../../event-stream/event-server';
|
||||||
|
import { getApiConfiguredChainID, httpPostRequest } from '../../../helpers';
|
||||||
|
|
||||||
|
const chainID = getApiConfiguredChainID();
|
||||||
|
|
||||||
|
const processRequests = (eventServer: EventStreamServer) => {
|
||||||
|
return new Writable({
|
||||||
|
objectMode: true,
|
||||||
|
write: async (data, _encoding, next) => {
|
||||||
|
await httpPostRequest({
|
||||||
|
host: '127.0.0.1',
|
||||||
|
port: eventServer.serverAddress.port,
|
||||||
|
path: data.method,
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: data.payload,
|
||||||
|
throwOnNotOK: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
next();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
export const processRemainderEvents = async (db: PgWriteStore, dataset: DatasetStore) => {
|
||||||
|
logger.info({ component: 'event-replay' }, 'REMAINDER events processing started');
|
||||||
|
|
||||||
|
const eventServer = await startEventServer({
|
||||||
|
datastore: db,
|
||||||
|
chainId: chainID,
|
||||||
|
serverHost: '127.0.0.1',
|
||||||
|
serverPort: 0,
|
||||||
|
});
|
||||||
|
|
||||||
|
const eventStream = await dataset.remainderEvents();
|
||||||
|
const process = processRequests(eventServer);
|
||||||
|
|
||||||
|
await pipeline(
|
||||||
|
Readable.from(eventStream),
|
||||||
|
process.on('finish', async () => {
|
||||||
|
await eventServer.closeAsync();
|
||||||
|
})
|
||||||
|
);
|
||||||
|
};
|
||||||
@@ -6,6 +6,7 @@ import { createTimeTracker, genIdsFiles } from './helpers';
|
|||||||
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
|
import { processNewBurnBlockEvents } from './importers/new-burn-block-importer';
|
||||||
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
|
import { processAttachmentNewEvents } from './importers/attachment-new-importer';
|
||||||
import { processRawEvents } from './importers/raw-importer';
|
import { processRawEvents } from './importers/raw-importer';
|
||||||
|
import { processRemainderEvents } from './importers/remainder-importer';
|
||||||
import { DatasetStore } from './dataset/store';
|
import { DatasetStore } from './dataset/store';
|
||||||
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
||||||
import { IndexesState } from '../../datastore/common';
|
import { IndexesState } from '../../datastore/common';
|
||||||
@@ -183,6 +184,28 @@ export class ReplayController {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private ingestRemainderEvents = async () => {
|
||||||
|
const timeTracker = createTimeTracker();
|
||||||
|
|
||||||
|
try {
|
||||||
|
await timeTracker.track('REMAINDER_EVENTS', async () => {
|
||||||
|
await processRemainderEvents(this.db, this.dataset);
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
throw err;
|
||||||
|
} finally {
|
||||||
|
if (true || tty.isatty(1)) {
|
||||||
|
console.log('Tracked function times:');
|
||||||
|
console.table(timeTracker.getDurations(3));
|
||||||
|
} else {
|
||||||
|
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@@ -209,24 +232,38 @@ export class ReplayController {
|
|||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
teardown = async () => {
|
finalize = async () => {
|
||||||
// Re-enabling indexes
|
// Re-enabling indexes
|
||||||
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
|
logger.info({ component: 'event-replay' }, 'Re-enabling indexes and constraints on tables');
|
||||||
await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On);
|
await this.db.toggleAllTableIndexes(this.db.sql, IndexesState.On);
|
||||||
|
|
||||||
|
// to be replayed with regular HTTP POSTs
|
||||||
|
await this.ingestRemainderEvents();
|
||||||
|
|
||||||
// Refreshing materialized views
|
// Refreshing materialized views
|
||||||
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
|
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
|
||||||
await this.db.finishEventReplay();
|
await this.db.finishEventReplay();
|
||||||
|
|
||||||
|
// Close DB
|
||||||
|
logger.info({ component: 'event-replay' }, 'Closing DB connection');
|
||||||
await this.db.close();
|
await this.db.close();
|
||||||
|
|
||||||
|
// Exit with success
|
||||||
|
logger.info({ component: 'event-replay' }, 'Finishing event-replay with success');
|
||||||
|
process.exit(0);
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
do = async () => {
|
do = async () => {
|
||||||
|
// NEW_BURN_BLOCK and ATTACHMENTS/NEW events
|
||||||
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
|
await Promise.all([this.ingestNewBurnBlockEvents(), this.ingestAttachmentNewEvents()]);
|
||||||
|
|
||||||
|
// RAW events to event_observer_requests table
|
||||||
await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]);
|
await Promise.all([this.ingestRawEvents(), this.ingestRawNewBlockEvents()]);
|
||||||
|
|
||||||
|
// NEW_BLOCK events
|
||||||
await this.ingestNewBlockEvents();
|
await this.ingestNewBlockEvents();
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -304,7 +304,7 @@ async function handleProgramArgs() {
|
|||||||
const replay = await ReplayController.init();
|
const replay = await ReplayController.init();
|
||||||
await replay.prepare();
|
await replay.prepare();
|
||||||
await replay.do();
|
await replay.do();
|
||||||
await replay.teardown();
|
await replay.finalize();
|
||||||
} else if (parsedOpts._[0]) {
|
} else if (parsedOpts._[0]) {
|
||||||
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
|
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user