mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 08:34:40 +08:00
feat: event-replay new_burn_block events handling
This commit is contained in:
6418
package-lock.json
generated
6418
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -1607,45 +1607,43 @@ export class PgWriteStore extends PgStore {
|
||||
});
|
||||
}
|
||||
|
||||
async insertBurnchainRewardsAndSlotHoldersBatch(
|
||||
rewards: DbBurnchainReward[],
|
||||
slotHolders: DbRewardSlotHolder[]): Promise<void> {
|
||||
return await this.sqlWriteTransaction(async sql => {
|
||||
const rewardValues: BurnchainRewardInsertValues[] = rewards.map(reward => ({
|
||||
canonical: true,
|
||||
burn_block_hash: reward.burn_block_hash,
|
||||
burn_block_height: reward.burn_block_height,
|
||||
burn_amount: reward.burn_amount.toString(),
|
||||
reward_recipient: reward.reward_recipient,
|
||||
reward_amount: reward.reward_amount,
|
||||
reward_index: reward.reward_index,
|
||||
}));
|
||||
async insertSlotHoldersBatch(sql: PgSqlClient, slotHolders: DbRewardSlotHolder[]): Promise<void> {
|
||||
const slotValues: RewardSlotHolderInsertValues[] = slotHolders.map(slot => ({
|
||||
canonical: true,
|
||||
burn_block_hash: slot.burn_block_hash,
|
||||
burn_block_height: slot.burn_block_height,
|
||||
address: slot.address,
|
||||
slot_index: slot.slot_index,
|
||||
}));
|
||||
|
||||
const res = await sql`
|
||||
INSERT into burnchain_rewards ${sql(rewardValues)}
|
||||
`;
|
||||
const result = await sql`
|
||||
INSERT INTO reward_slot_holders ${sql(slotValues)}
|
||||
`;
|
||||
|
||||
if(res.count !== rewardValues.length) {
|
||||
throw new Error(`Failed to insert burnchain reward for ${rewardValues}`);
|
||||
}
|
||||
if (result.count !== slotValues.length) {
|
||||
throw new Error(`Failed to insert slot holder for ${slotValues}`);
|
||||
}
|
||||
};
|
||||
|
||||
const slotValues: RewardSlotHolderInsertValues[] = slotHolders.map(slot => ({
|
||||
canonical: true,
|
||||
burn_block_hash: slot.burn_block_hash,
|
||||
burn_block_height: slot.burn_block_height,
|
||||
address: slot.address,
|
||||
slot_index: slot.slot_index,
|
||||
}));
|
||||
async insertBurnchainRewardsBatch(sql: PgSqlClient, rewards: DbBurnchainReward[]): Promise<void> {
|
||||
const rewardValues: BurnchainRewardInsertValues[] = rewards.map(reward => ({
|
||||
canonical: true,
|
||||
burn_block_hash: reward.burn_block_hash,
|
||||
burn_block_height: reward.burn_block_height,
|
||||
burn_amount: reward.burn_amount.toString(),
|
||||
reward_recipient: reward.reward_recipient,
|
||||
reward_amount: reward.reward_amount,
|
||||
reward_index: reward.reward_index,
|
||||
}));
|
||||
|
||||
const result = await sql`
|
||||
INSERT INTO reward_slot_holders ${sql(slotValues)}
|
||||
`;
|
||||
const res = await sql`
|
||||
INSERT into burnchain_rewards ${sql(rewardValues)}
|
||||
`;
|
||||
|
||||
if (result.count !== slotValues.length) {
|
||||
throw new Error(`Failed to insert slot holder for ${slotValues}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
if(res.count !== rewardValues.length) {
|
||||
throw new Error(`Failed to insert burnchain reward for ${rewardValues}`);
|
||||
}
|
||||
};
|
||||
|
||||
async updateTx(sql: PgSqlClient, tx: DbTxRaw): Promise<number> {
|
||||
const values: TxInsertValues = {
|
||||
|
||||
@@ -1,11 +1,18 @@
|
||||
|
||||
import * as tty from 'tty';
|
||||
|
||||
import { PgWriteStore } from '../../datastore/pg-write-store';
|
||||
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
|
||||
import { logger } from '../../logger';
|
||||
import { createTimeTracker } from './helpers';
|
||||
import { insertNewBurnBlockEvents } from './importers/new_burn_block_importer';
|
||||
import { insertNewBlockEvents } from './importers/new_block_importer';
|
||||
|
||||
const MIGRATIONS_TABLE = 'pgmigrations';
|
||||
|
||||
const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) => {
|
||||
const timeTracker = createTimeTracker();
|
||||
|
||||
const db = await PgWriteStore.connect({
|
||||
usageName: 'import-events',
|
||||
skipMigrations: true,
|
||||
@@ -49,7 +56,8 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) =>
|
||||
|
||||
try {
|
||||
await Promise.all([
|
||||
insertNewBurnBlockEvents(db),
|
||||
insertNewBurnBlockEvents(db, timeTracker),
|
||||
// insertNewBlockEvents(db, timeTracker)
|
||||
]);
|
||||
} catch (err) {
|
||||
throw err;
|
||||
@@ -58,8 +66,14 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) =>
|
||||
logger.info(`Enable indexes on tables: ${tables.join(', ')}`);
|
||||
db.toggleTableIndexes(db.sql, tables, true);
|
||||
}
|
||||
}
|
||||
|
||||
if (true || tty.isatty(1)) {
|
||||
console.log('Tracked function times:');
|
||||
console.table(timeTracker.getDurations(3));
|
||||
} else {
|
||||
logger.info(`Tracked function times`, timeTracker.getDurations(3));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export { run };
|
||||
|
||||
@@ -51,38 +51,14 @@ const createTimeTracker = (): TimeTracker => {
|
||||
};
|
||||
}
|
||||
|
||||
export interface Stopwatch {
|
||||
/** Milliseconds since stopwatch was created. */
|
||||
getElapsed: () => number;
|
||||
/** Seconds since stopwatch was created. */
|
||||
getElapsedSeconds: (roundDecimals?: number) => number;
|
||||
getElapsedAndRestart: () => number;
|
||||
restart(): void;
|
||||
function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
|
||||
for (let i = 0; i < arr.length; i += n) {
|
||||
yield arr.slice(i, i + n);
|
||||
}
|
||||
}
|
||||
|
||||
export function stopwatch(): Stopwatch {
|
||||
let start = process.hrtime.bigint();
|
||||
const result: Stopwatch = {
|
||||
getElapsedSeconds: (roundDecimals?: number) => {
|
||||
const elapsedMs = result.getElapsed();
|
||||
const seconds = elapsedMs / 1000;
|
||||
return roundDecimals === undefined ? seconds : +seconds.toFixed(roundDecimals);
|
||||
},
|
||||
getElapsed: () => {
|
||||
const end = process.hrtime.bigint();
|
||||
return Number((end - start) / 1_000_000n);
|
||||
},
|
||||
getElapsedAndRestart: () => {
|
||||
const end = process.hrtime.bigint();
|
||||
const result = Number((end - start) / 1_000_000n);
|
||||
start = process.hrtime.bigint();
|
||||
return result;
|
||||
},
|
||||
restart: () => {
|
||||
start = process.hrtime.bigint();
|
||||
},
|
||||
};
|
||||
return result;
|
||||
}
|
||||
const splitIntoChunks = async (data: object[], chunk_size: number) => {
|
||||
return [...chunks(data, chunk_size)];
|
||||
};
|
||||
|
||||
export { TimeTracker, createTimeTracker };
|
||||
export { TimeTracker, createTimeTracker, splitIntoChunks };
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import * as duckdb from 'duckdb';
|
||||
import { PgWriteStore } from '../../../datastore/pg-write-store';
|
||||
import { DbBurnchainReward, DbRewardSlotHolder } from '../../../datastore/common';
|
||||
import { CoreNodeBurnBlockMessage } from '../../../event-stream/core-node-message';
|
||||
import { logger } from '../../../logger';
|
||||
import { TimeTracker, splitIntoChunks } from '../helpers';
|
||||
import { DatasetStore } from '../dataset/store';
|
||||
|
||||
const INSERT_BATCH_SIZE = 500;
|
||||
|
||||
const parsePayload = (payload: CoreNodeBurnBlockMessage) => {
|
||||
const DbBurnchainRewardParse = (payload: CoreNodeBurnBlockMessage) => {
|
||||
const rewards = payload.reward_recipients.map((r, index) => {
|
||||
const dbReward: DbBurnchainReward = {
|
||||
canonical: true,
|
||||
@@ -17,9 +18,14 @@ const parsePayload = (payload: CoreNodeBurnBlockMessage) => {
|
||||
reward_amount: BigInt(r.amt),
|
||||
reward_index: index,
|
||||
};
|
||||
|
||||
return dbReward;
|
||||
});
|
||||
|
||||
return rewards;
|
||||
};
|
||||
|
||||
const DbRewardSlotHolderParse = (payload: CoreNodeBurnBlockMessage) => {
|
||||
const slotHolders = payload.reward_slot_holders.map((r, index) => {
|
||||
const slotHolder: DbRewardSlotHolder = {
|
||||
canonical: true,
|
||||
@@ -31,51 +37,37 @@ const parsePayload = (payload: CoreNodeBurnBlockMessage) => {
|
||||
return slotHolder;
|
||||
});
|
||||
|
||||
return { rewards, slotHolders };
|
||||
}
|
||||
|
||||
function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
|
||||
for (let i = 0; i < arr.length; i += n) {
|
||||
yield arr.slice(i, i + n);
|
||||
}
|
||||
}
|
||||
|
||||
const fromCanonicalDataset = (process: any) => {
|
||||
var inMemoryDB = new duckdb.Database(':memory:');
|
||||
inMemoryDB.all(
|
||||
"SELECT * FROM READ_PARQUET('events/new_burn_block/canonical/*.parquet')",
|
||||
(err: any, res: any) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
process(res);
|
||||
});
|
||||
}
|
||||
|
||||
const fromDatasetAndInsert = async (db: PgWriteStore) => {
|
||||
fromCanonicalDataset((events: any) => {
|
||||
[...chunks(events, INSERT_BATCH_SIZE)].forEach(async (chunk: any) => {
|
||||
let burnchainRewards: DbBurnchainReward[] = [];
|
||||
let slotHolders: DbRewardSlotHolder[] = [];
|
||||
chunk.forEach((ev: any) => {
|
||||
const payload: CoreNodeBurnBlockMessage = JSON.parse(ev['payload']);
|
||||
const burnBlockData = parsePayload(payload);
|
||||
burnBlockData.rewards.forEach(reward => burnchainRewards.push(reward));
|
||||
burnBlockData.slotHolders.forEach(holder => slotHolders.push(holder));
|
||||
});
|
||||
|
||||
if (burnchainRewards.length !== 0 && slotHolders.length !== 0) {
|
||||
await db.insertBurnchainRewardsAndSlotHoldersBatch(burnchainRewards, slotHolders);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const insertNewBurnBlockEvents = (db: PgWriteStore) => {
|
||||
return new Promise((resolve) => {
|
||||
logger.info(`Inserting NEW_BURN_BLOCK events to db...`);
|
||||
fromDatasetAndInsert(db);
|
||||
});
|
||||
return slotHolders;
|
||||
};
|
||||
|
||||
export { insertNewBurnBlockEvents };
|
||||
const insertBurnchainRewardsAndSlotHolders = async (db: PgWriteStore, chunks: any) => {
|
||||
for (const chunk of chunks) {
|
||||
let burnchainRewards: DbBurnchainReward[] = [];
|
||||
let slotHolders: DbRewardSlotHolder[] = [];
|
||||
for (const event of chunk) {
|
||||
const payload: CoreNodeBurnBlockMessage = JSON.parse(event['payload']);
|
||||
const burnchainRewardsData = DbBurnchainRewardParse(payload);
|
||||
const slotHoldersData = DbRewardSlotHolderParse(payload);
|
||||
burnchainRewardsData.forEach(reward => burnchainRewards.push(reward));
|
||||
slotHoldersData.forEach(slotHolder => slotHolders.push(slotHolder));
|
||||
};
|
||||
|
||||
if (burnchainRewards.length !== 0) {
|
||||
await db.insertBurnchainRewardsBatch(db.sql, burnchainRewards);
|
||||
}
|
||||
|
||||
if (slotHolders.length !== 0) {
|
||||
await db.insertSlotHoldersBatch(db.sql, slotHolders);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
export const insertNewBurnBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, timeTracker: TimeTracker) => {
|
||||
logger.info(`Inserting NEW_BURN_BLOCK events to db...`);
|
||||
|
||||
await timeTracker.track('insertNewBurnBlockEvents', async () => {
|
||||
return dataset.newBurnBlockEventsOrdered()
|
||||
.then(async (data: any) => await splitIntoChunks(data, INSERT_BATCH_SIZE))
|
||||
.then(async (chunks: any) => await insertBurnchainRewardsAndSlotHolders(db, chunks));
|
||||
});
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user