working new_burn_block

This commit is contained in:
Chris Guimaraes
2023-06-26 17:21:43 +01:00
parent 76df6a0f9a
commit 2d08e18760
7 changed files with 1689 additions and 116 deletions

1500
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -160,6 +160,7 @@
"cross-env": "7.0.3",
"dotenv": "8.6.0",
"dotenv-flow": "3.2.0",
"duckdb": "0.8.1",
"ecpair": "2.1.0",
"elliptic": "6.5.4",
"escape-goat": "3.0.0",

View File

@@ -1580,6 +1580,7 @@ export class PgWriteStore extends PgStore {
(burn_block_hash = ${burnchainBlockHash}
OR burn_block_height >= ${burnchainBlockHeight})
`;
if (existingRewards.count > 0) {
logger.warn(
`Invalidated ${existingRewards.count} burnchain rewards after fork detected at burnchain block ${burnchainBlockHash}`
@@ -1606,6 +1607,46 @@ export class PgWriteStore extends PgStore {
});
}
async insertBurnchainRewardsAndSlotHoldersBatch(
rewards: DbBurnchainReward[],
slotHolders: DbRewardSlotHolder[]): Promise<void> {
return await this.sqlWriteTransaction(async sql => {
const rewardValues: BurnchainRewardInsertValues[] = rewards.map(reward => ({
canonical: true,
burn_block_hash: reward.burn_block_hash,
burn_block_height: reward.burn_block_height,
burn_amount: reward.burn_amount.toString(),
reward_recipient: reward.reward_recipient,
reward_amount: reward.reward_amount,
reward_index: reward.reward_index,
}));
const res = await sql`
INSERT into burnchain_rewards ${sql(rewardValues)}
`;
if(res.count !== rewardValues.length) {
throw new Error(`Failed to insert burnchain reward for ${rewardValues}`);
}
const slotValues: RewardSlotHolderInsertValues[] = slotHolders.map(slot => ({
canonical: true,
burn_block_hash: slot.burn_block_hash,
burn_block_height: slot.burn_block_height,
address: slot.address,
slot_index: slot.slot_index,
}));
const result = await sql`
INSERT INTO reward_slot_holders ${sql(slotValues)}
`;
if (result.count !== slotValues.length) {
throw new Error(`Failed to insert slot holder for ${slotValues}`);
}
});
}
async updateTx(sql: PgSqlClient, tx: DbTxRaw): Promise<number> {
const values: TxInsertValues = {
tx_id: tx.tx_id,
@@ -2970,4 +3011,23 @@ export class PgWriteStore extends PgStore {
await this.refreshMaterializedView('mempool_digest', sql, false);
});
}
/** Enable or disable indexes for the provided set of tables. */
async toggleTableIndexes(sql: PgSqlClient, tables: string[], enabled: boolean): Promise<void> {
const tableSchema = this.sql.options.connection.search_path ?? 'public';
const result = await sql`
UPDATE pg_index
SET ${sql({ indisready: enabled, indisvalid: enabled })}
WHERE indrelid = ANY (
SELECT oid FROM pg_class
WHERE relname IN ${sql(tables)}
AND relnamespace = (
SELECT oid FROM pg_namespace WHERE nspname = ${tableSchema}
)
)
`;
if (result.count === 0) {
throw new Error(`No updates made while toggling table indexes`);
}
}
}

View File

@@ -0,0 +1,65 @@
import { PgWriteStore } from '../../datastore/pg-write-store';
import { cycleMigrations, dangerousDropAllTables } from '../../datastore/migrations';
import { logger } from '../../logger';
import { insertNewBurnBlockEvents } from './importers/new_burn_block_importer';
const MIGRATIONS_TABLE = 'pgmigrations';
const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) => {
const db = await PgWriteStore.connect({
usageName: 'import-events',
skipMigrations: true,
withNotifier: false,
isEventReplay: true,
});
if (wipeDB) {
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
}
try {
await cycleMigrations({ dangerousAllowDataLoss: true, checkForEmptyData: true });
} catch (error) {
logger.error(error);
throw new Error(
`DB migration cycle failed, possibly due to an incompatible API version upgrade. Add --wipe-db --force or perform a manual DB wipe before importing.`
);
}
let tables: string[] = [];
if (disableIndexes) {
// Get DB tables
const dbName = db.sql.options.database; // stacks-blockchain-api
const tableSchema = db.sql.options.connection.search_path ?? 'public';
const tablesQuery = await db.sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
logger.error(errorMsg);
throw new Error(errorMsg);
}
tables = tablesQuery.map(r => r.tablename);
// Disable indexing and constraints on tables to speed up insertion
logger.info(`Disable indexes on tables: ${tables.join(', ')}`);
db.toggleTableIndexes(db.sql, tables, false);
}
try {
await Promise.all([
insertNewBurnBlockEvents(db),
]);
} catch (err) {
throw err;
} finally {
if (disableIndexes) {
logger.info(`Enable indexes on tables: ${tables.join(', ')}`);
db.toggleTableIndexes(db.sql, tables, true);
}
}
}
export { run };

View File

@@ -0,0 +1,88 @@
interface TimeTracker {
track<T = void>(name: string, fn: () => Promise<T>): Promise<T>;
trackSync<T = void>(name: string, fn: () => T): T;
getDurations: (
roundDecimals?: number
) => {
name: string;
seconds: string;
}[];
}
const createTimeTracker = (): TimeTracker => {
const durations = new Map<string, { totalTime: bigint }>();
return {
track<T = void>(name: string, fn: () => Promise<T>) {
let duration = durations.get(name);
if (duration === undefined) {
duration = { totalTime: 0n };
durations.set(name, duration);
}
const start = process.hrtime.bigint();
return fn().finally(() => {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
duration!.totalTime += process.hrtime.bigint() - start;
});
},
trackSync<T = void>(name: string, fn: () => T) {
let duration = durations.get(name);
if (duration === undefined) {
duration = { totalTime: 0n };
durations.set(name, duration);
}
const start = process.hrtime.bigint();
try {
return fn();
} finally {
duration.totalTime += process.hrtime.bigint() - start;
}
},
getDurations: (roundDecimals?: number) => {
return [...durations.entries()]
.sort((a, b) => Number(b[1].totalTime - a[1].totalTime))
.map(entry => {
const seconds = Number(entry[1].totalTime) / 1e9;
return {
name: entry[0],
seconds: roundDecimals ? seconds.toFixed(roundDecimals) : seconds.toString(),
};
});
},
};
}
export interface Stopwatch {
/** Milliseconds since stopwatch was created. */
getElapsed: () => number;
/** Seconds since stopwatch was created. */
getElapsedSeconds: (roundDecimals?: number) => number;
getElapsedAndRestart: () => number;
restart(): void;
}
export function stopwatch(): Stopwatch {
let start = process.hrtime.bigint();
const result: Stopwatch = {
getElapsedSeconds: (roundDecimals?: number) => {
const elapsedMs = result.getElapsed();
const seconds = elapsedMs / 1000;
return roundDecimals === undefined ? seconds : +seconds.toFixed(roundDecimals);
},
getElapsed: () => {
const end = process.hrtime.bigint();
return Number((end - start) / 1_000_000n);
},
getElapsedAndRestart: () => {
const end = process.hrtime.bigint();
const result = Number((end - start) / 1_000_000n);
start = process.hrtime.bigint();
return result;
},
restart: () => {
start = process.hrtime.bigint();
},
};
return result;
}
export { TimeTracker, createTimeTracker };

View File

@@ -0,0 +1,81 @@
import * as duckdb from 'duckdb';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { DbBurnchainReward, DbRewardSlotHolder } from '../../../datastore/common';
import { CoreNodeBurnBlockMessage } from '../../../event-stream/core-node-message';
import { logger } from '../../../logger';
const INSERT_BATCH_SIZE = 500;
const parsePayload = (payload: CoreNodeBurnBlockMessage) => {
const rewards = payload.reward_recipients.map((r, index) => {
const dbReward: DbBurnchainReward = {
canonical: true,
burn_block_hash: payload.burn_block_hash,
burn_block_height: payload.burn_block_height,
burn_amount: BigInt(payload.burn_amount),
reward_recipient: r.recipient,
reward_amount: BigInt(r.amt),
reward_index: index,
};
return dbReward;
});
const slotHolders = payload.reward_slot_holders.map((r, index) => {
const slotHolder: DbRewardSlotHolder = {
canonical: true,
burn_block_hash: payload.burn_block_hash,
burn_block_height: payload.burn_block_height,
address: r,
slot_index: index,
};
return slotHolder;
});
return { rewards, slotHolders };
}
function* chunks<T>(arr: T[], n: number): Generator<T[], void> {
for (let i = 0; i < arr.length; i += n) {
yield arr.slice(i, i + n);
}
}
const fromCanonicalDataset = (process: any) => {
var inMemoryDB = new duckdb.Database(':memory:');
inMemoryDB.all(
"SELECT * FROM READ_PARQUET('events/new_burn_block/canonical/*.parquet')",
(err: any, res: any) => {
if (err) {
throw err;
}
process(res);
});
}
const fromDatasetAndInsert = async (db: PgWriteStore) => {
fromCanonicalDataset((events: any) => {
[...chunks(events, INSERT_BATCH_SIZE)].forEach(async (chunk: any) => {
let burnchainRewards: DbBurnchainReward[] = [];
let slotHolders: DbRewardSlotHolder[] = [];
chunk.forEach((ev: any) => {
const payload: CoreNodeBurnBlockMessage = JSON.parse(ev['payload']);
const burnBlockData = parsePayload(payload);
burnBlockData.rewards.forEach(reward => burnchainRewards.push(reward));
burnBlockData.slotHolders.forEach(holder => slotHolders.push(holder));
});
if (burnchainRewards.length !== 0 && slotHolders.length !== 0) {
await db.insertBurnchainRewardsAndSlotHoldersBatch(burnchainRewards, slotHolders);
}
});
});
}
const insertNewBurnBlockEvents = (db: PgWriteStore) => {
return new Promise((resolve) => {
logger.info(`Inserting NEW_BURN_BLOCK events to db...`);
fromDatasetAndInsert(db);
});
};
export { insertNewBurnBlockEvents };

View File

@@ -27,6 +27,7 @@ import { isFtMetadataEnabled, isNftMetadataEnabled } from './token-metadata/help
import { TokensProcessorQueue } from './token-metadata/tokens-processor-queue';
import { registerMempoolPromStats } from './datastore/helpers';
import { logger } from './logger';
import { run } from './event-replay/parquet-based/event-replay';
enum StacksApiMode {
/**
@@ -275,6 +276,13 @@ function getProgramArgs() {
['wipe-db']?: boolean;
['force']?: boolean;
};
}
| {
operand: 'from-parquet-events';
options: {
['wipe-db']?: boolean;
['disable-indexes']?: boolean;
}
};
return { args, parsedOpts };
}
@@ -290,6 +298,8 @@ async function handleProgramArgs() {
args.options['wipe-db'],
args.options.force
);
} else if (args.operand === 'from-parquet-events') {
await run(args.options['wipe-db'], args.options['disable-indexes']);
} else if (parsedOpts._[0]) {
throw new Error(`Unexpected program argument: ${parsedOpts._[0]}`);
} else {