feat: event-replay new_burn_block events handling

This commit is contained in:
Chris Guimaraes
2023-06-30 12:11:52 +01:00
parent 2d08e18760
commit 6c0f4481c0
5 changed files with 1580 additions and 5050 deletions

6418
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1607,10 +1607,25 @@ export class PgWriteStore extends PgStore {
});
}
async insertBurnchainRewardsAndSlotHoldersBatch(
rewards: DbBurnchainReward[],
slotHolders: DbRewardSlotHolder[]): Promise<void> {
return await this.sqlWriteTransaction(async sql => {
async insertSlotHoldersBatch(sql: PgSqlClient, slotHolders: DbRewardSlotHolder[]): Promise<void> {
const slotValues: RewardSlotHolderInsertValues[] = slotHolders.map(slot => ({
canonical: true,
burn_block_hash: slot.burn_block_hash,
burn_block_height: slot.burn_block_height,
address: slot.address,
slot_index: slot.slot_index,
}));
const result = await sql`
INSERT INTO reward_slot_holders ${sql(slotValues)}
`;
if (result.count !== slotValues.length) {
throw new Error(`Failed to insert slot holder for ${slotValues}`);
}
};
async insertBurnchainRewardsBatch(sql: PgSqlClient, rewards: DbBurnchainReward[]): Promise<void> {
const rewardValues: BurnchainRewardInsertValues[] = rewards.map(reward => ({
canonical: true,
burn_block_hash: reward.burn_block_hash,
@@ -1628,24 +1643,7 @@ export class PgWriteStore extends PgStore {
if(res.count !== rewardValues.length) {
throw new Error(`Failed to insert burnchain reward for ${rewardValues}`);
}
const slotValues: RewardSlotHolderInsertValues[] = slotHolders.map(slot => ({
canonical: true,
burn_block_hash: slot.burn_block_hash,
burn_block_height: slot.burn_block_height,
address: slot.address,
slot_index: slot.slot_index,
}));
const result = await sql`
INSERT INTO reward_slot_holders ${sql(slotValues)}
`;
if (result.count !== slotValues.length) {
throw new Error(`Failed to insert slot holder for ${slotValues}`);
}
});
}
};
async updateTx(sql: PgSqlClient, tx: DbTxRaw): Promise<number> {
const values: TxInsertValues = {

View File

@@ -1,11 +1,18 @@
import * as tty from 'tty';
import { PgWriteStore } from '../../datastore/pg-write-store';
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
import { logger } from '../../logger';
import { createTimeTracker } from './helpers';
import { insertNewBurnBlockEvents } from './importers/new_burn_block_importer';
import { insertNewBlockEvents } from './importers/new_block_importer';
const MIGRATIONS_TABLE = 'pgmigrations';
const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) => {
const timeTracker = createTimeTracker();
const db = await PgWriteStore.connect({
usageName: 'import-events',
skipMigrations: true,
@@ -49,7 +56,8 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) =>
try {
await Promise.all([
insertNewBurnBlockEvents(db),
insertNewBurnBlockEvents(db, timeTracker),
// insertNewBlockEvents(db, timeTracker)
]);
} catch (err) {
throw err;
@@ -58,8 +66,14 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) =>
logger.info(`Enable indexes on tables: ${tables.join(', ')}`);
db.toggleTableIndexes(db.sql, tables, true);
}
}
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
}
export { run };

View File

@@ -51,38 +51,14 @@ const createTimeTracker = (): TimeTracker => {
};
}
export interface Stopwatch {
/** Milliseconds since stopwatch was created. */
getElapsed: () => number;
/** Seconds since stopwatch was created. */
getElapsedSeconds: (roundDecimals?: number) => number;
getElapsedAndRestart: () => number;
restart(): void;
function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
for (let i = 0; i < arr.length; i += n) {
yield arr.slice(i, i + n);
}
}
export function stopwatch(): Stopwatch {
let start = process.hrtime.bigint();
const result: Stopwatch = {
getElapsedSeconds: (roundDecimals?: number) => {
const elapsedMs = result.getElapsed();
const seconds = elapsedMs / 1000;
return roundDecimals === undefined ? seconds : +seconds.toFixed(roundDecimals);
},
getElapsed: () => {
const end = process.hrtime.bigint();
return Number((end - start) / 1_000_000n);
},
getElapsedAndRestart: () => {
const end = process.hrtime.bigint();
const result = Number((end - start) / 1_000_000n);
start = process.hrtime.bigint();
return result;
},
restart: () => {
start = process.hrtime.bigint();
},
};
return result;
}
const splitIntoChunks = async (data: object[], chunk_size: number) => {
return [...chunks(data, chunk_size)];
};
export { TimeTracker, createTimeTracker };
export { TimeTracker, createTimeTracker, splitIntoChunks };

View File

@@ -1,12 +1,13 @@
import * as duckdb from 'duckdb';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { DbBurnchainReward, DbRewardSlotHolder } from '../../../datastore/common';
import { CoreNodeBurnBlockMessage } from '../../../event-stream/core-node-message';
import { logger } from '../../../logger';
import { TimeTracker, splitIntoChunks } from '../helpers';
import { DatasetStore } from '../dataset/store';
const INSERT_BATCH_SIZE = 500;
const parsePayload = (payload: CoreNodeBurnBlockMessage) => {
const DbBurnchainRewardParse = (payload: CoreNodeBurnBlockMessage) => {
const rewards = payload.reward_recipients.map((r, index) => {
const dbReward: DbBurnchainReward = {
canonical: true,
@@ -17,9 +18,14 @@ const parsePayload = (payload: CoreNodeBurnBlockMessage) => {
reward_amount: BigInt(r.amt),
reward_index: index,
};
return dbReward;
});
return rewards;
};
const DbRewardSlotHolderParse = (payload: CoreNodeBurnBlockMessage) => {
const slotHolders = payload.reward_slot_holders.map((r, index) => {
const slotHolder: DbRewardSlotHolder = {
canonical: true,
@@ -31,51 +37,37 @@ const parsePayload = (payload: CoreNodeBurnBlockMessage) => {
return slotHolder;
});
return { rewards, slotHolders };
}
function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
for (let i = 0; i < arr.length; i += n) {
yield arr.slice(i, i + n);
}
}
const fromCanonicalDataset = (process: any) => {
var inMemoryDB = new duckdb.Database(':memory:');
inMemoryDB.all(
"SELECT * FROM READ_PARQUET('events/new_burn_block/canonical/*.parquet')",
(err: any, res: any) => {
if (err) {
throw err;
}
process(res);
});
}
const fromDatasetAndInsert = async (db: PgWriteStore) => {
fromCanonicalDataset((events: any) => {
[...chunks(events, INSERT_BATCH_SIZE)].forEach(async (chunk: any) => {
let burnchainRewards: DbBurnchainReward[] = [];
let slotHolders: DbRewardSlotHolder[] = [];
chunk.forEach((ev: any) => {
const payload: CoreNodeBurnBlockMessage = JSON.parse(ev['payload']);
const burnBlockData = parsePayload(payload);
burnBlockData.rewards.forEach(reward => burnchainRewards.push(reward));
burnBlockData.slotHolders.forEach(holder => slotHolders.push(holder));
});
if (burnchainRewards.length !== 0 && slotHolders.length !== 0) {
await db.insertBurnchainRewardsAndSlotHoldersBatch(burnchainRewards, slotHolders);
}
});
});
}
const insertNewBurnBlockEvents = (db: PgWriteStore) => {
return new Promise((resolve) => {
logger.info(`Inserting NEW_BURN_BLOCK events to db...`);
fromDatasetAndInsert(db);
});
return slotHolders;
};
export { insertNewBurnBlockEvents };
const insertBurnchainRewardsAndSlotHolders = async (db: PgWriteStore, chunks: any) => {
for (const chunk of chunks) {
let burnchainRewards: DbBurnchainReward[] = [];
let slotHolders: DbRewardSlotHolder[] = [];
for (const event of chunk) {
const payload: CoreNodeBurnBlockMessage = JSON.parse(event['payload']);
const burnchainRewardsData = DbBurnchainRewardParse(payload);
const slotHoldersData = DbRewardSlotHolderParse(payload);
burnchainRewardsData.forEach(reward => burnchainRewards.push(reward));
slotHoldersData.forEach(slotHolder => slotHolders.push(slotHolder));
};
if (burnchainRewards.length !== 0) {
await db.insertBurnchainRewardsBatch(db.sql, burnchainRewards);
}
if (slotHolders.length !== 0) {
await db.insertSlotHoldersBatch(db.sql, slotHolders);
}
};
};
export const insertNewBurnBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, timeTracker: TimeTracker) => {
logger.info(`Inserting NEW_BURN_BLOCK events to db...`);
await timeTracker.track('insertNewBurnBlockEvents', async () => {
return dataset.newBurnBlockEventsOrdered()
.then(async (data: any) => await splitIntoChunks(data, INSERT_BATCH_SIZE))
.then(async (chunks: any) => await insertBurnchainRewardsAndSlotHolders(db, chunks));
});
};