diff --git a/package-lock.json b/package-lock.json index 1f7613d2..f52f83a8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "license": "GPL-3.0", "dependencies": { "@apidevtools/json-schema-ref-parser": "9.0.9", - "@hirosystems/api-toolkit": "1.2.2", + "@hirosystems/api-toolkit": "1.3.3", "@promster/express": "6.0.0", "@promster/server": "6.0.6", "@promster/types": "3.2.3", @@ -128,48 +128,6 @@ "utf-8-validate": "5.0.7" } }, - "../api-toolkit": { - "name": "@hirosystems/api-toolkit", - "version": "1.1.0", - "extraneous": true, - "license": "Apache 2.0", - "dependencies": { - "@fastify/cors": "^8.0.0", - "@fastify/swagger": "^8.3.1", - "@fastify/type-provider-typebox": "^3.2.0", - "fastify": "^4.3.0", - "fastify-metrics": "^10.2.0", - "node-pg-migrate": "^6.2.2", - "pino": "^8.11.0", - "postgres": "^3.3.4" - }, - "bin": { - "api-toolkit-git-info": "bin/api-toolkit-git-info.js" - }, - "devDependencies": { - "@commitlint/cli": "^17.5.0", - "@commitlint/config-conventional": "^17.4.4", - "@stacks/eslint-config": "^1.2.0", - "@types/jest": "^29.5.0", - "@typescript-eslint/eslint-plugin": "^5.56.0", - "@typescript-eslint/parser": "^5.56.0", - "babel-jest": "^29.5.0", - "copyfiles": "^2.4.1", - "eslint": "^8.36.0", - "eslint-plugin-prettier": "^4.2.1", - "eslint-plugin-tsdoc": "^0.2.17", - "husky": "^8.0.3", - "jest": "^29.5.0", - "prettier": "^2.8.6", - "rimraf": "^4.4.1", - "ts-jest": "^29.0.5", - "ts-node": "^10.9.1", - "typescript": "^5.0.2" - }, - "engines": { - "node": ">=18" - } - }, "node_modules/@aashutoshrathi/word-wrap": { "version": "1.2.6", "resolved": "https://registry.npmjs.org/@aashutoshrathi/word-wrap/-/word-wrap-1.2.6.tgz", @@ -1234,9 +1192,9 @@ } }, "node_modules/@fastify/cors": { - "version": "8.4.0", - "resolved": "https://registry.npmjs.org/@fastify/cors/-/cors-8.4.0.tgz", - "integrity": "sha512-MlVvMTenltToByTpLwlWtO+7dQ3l2J+1OpmGrx9JpSNWo1d+dhfNCOi23zHhxdFhtpDzfwGwCsKu9DTeG7k7nQ==", + "version": "8.4.1", + "resolved": "https://registry.npmjs.org/@fastify/cors/-/cors-8.4.1.tgz", + "integrity": "sha512-iYQJtrY3pFiDS5mo5zRaudzg2OcUdJ96PD6xfkKOOEilly5nnrFZx/W6Sce2T79xxlEn2qpU3t5+qS2phS369w==", "dependencies": { "fastify-plugin": "^4.0.0", "mnemonist": "0.39.5" @@ -1273,9 +1231,9 @@ } }, "node_modules/@fastify/swagger/node_modules/yaml": { - "version": "2.3.3", - "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.3.3.tgz", - "integrity": "sha512-zw0VAJxgeZ6+++/su5AFoqBbZbrEakwu+X0M5HmcwUiBL7AzcuPKjj5we4xfQLp78LkEMpD0cOnUhmgOVy3KdQ==", + "version": "2.3.4", + "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.3.4.tgz", + "integrity": "sha512-8aAvwVUSHpfEqTQ4w/KMlf3HcRdt50E5ODIQJBw1fQ5RL34xabzxtUlzTXVqc4rkZsPbvrXKWnABCD7kWSmocA==", "engines": { "node": ">= 14" } @@ -1289,9 +1247,9 @@ } }, "node_modules/@hirosystems/api-toolkit": { - "version": "1.2.2", - "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.2.2.tgz", - "integrity": "sha512-HLaodPN6dHUAkrOwECsMwQicBMhJn8CYlS0QwC4SC3smclno3fB0oKm1QYZFQEDy2KV3IPTAh70B/vkpib/Kkw==", + "version": "1.3.3", + "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.3.3.tgz", + "integrity": "sha512-0/JjQ54twLdVqf8+hB+8IAKn8JdCdlMfT3BqUWha5qMrjlC3KX+kAl+88+CqpoibY/lgYJ9fs+70KG/weHt3LQ==", "dependencies": { "@fastify/cors": "^8.0.0", "@fastify/swagger": "^8.3.1", @@ -1378,9 +1336,9 @@ } }, "node_modules/@hirosystems/api-toolkit/node_modules/postgres": { - "version": "3.4.2", - "resolved": "https://registry.npmjs.org/postgres/-/postgres-3.4.2.tgz", - "integrity": "sha512-0UcCGvDBSaAzLeWwiq+QVmiGfOPTosFb+sxJUUtd+7Pi/ByFPuz6Gq05LbS0sM1ghMWC5atuks3pfl34g0qmFw==", + "version": "3.4.3", + "resolved": "https://registry.npmjs.org/postgres/-/postgres-3.4.3.tgz", + "integrity": "sha512-iHJn4+M9vbTdHSdDzNkC0crHq+1CUdFhx+YqCE+SqWxPjm+Zu63jq7yZborOBF64c8pc58O5uMudyL1FQcHacA==", "engines": { "node": ">=12" }, @@ -6964,9 +6922,9 @@ } }, "node_modules/fastify-metrics": { - "version": "10.3.2", - "resolved": "https://registry.npmjs.org/fastify-metrics/-/fastify-metrics-10.3.2.tgz", - "integrity": "sha512-02SEIGH02zfguqRMho0LB8L7YVAj5cIgWM0iqZslIErqaUWc1iHVAOC+YXYG3S2DZU6VHdFaMyuxjEOCQHAETA==", + "version": "10.3.3", + "resolved": "https://registry.npmjs.org/fastify-metrics/-/fastify-metrics-10.3.3.tgz", + "integrity": "sha512-TmMcfrMWBSbA7yk31tFtJnWKtNXLSO7jmTRIjPX9HKC4pLmyd0JnOQ3r9XCYnev6NL9/eVRXxNfrsqQdKTLZkw==", "dependencies": { "fastify-plugin": "^4.3.0", "prom-client": "^14.2.0" diff --git a/package.json b/package.json index 5f39f710..a2973ec0 100644 --- a/package.json +++ b/package.json @@ -85,7 +85,7 @@ }, "dependencies": { "@apidevtools/json-schema-ref-parser": "9.0.9", - "@hirosystems/api-toolkit": "1.2.2", + "@hirosystems/api-toolkit": "1.3.3", "@promster/express": "6.0.0", "@promster/server": "6.0.6", "@promster/types": "3.2.3", diff --git a/src/datastore/common.ts b/src/datastore/common.ts index 83b4f85c..773051bb 100644 --- a/src/datastore/common.ts +++ b/src/datastore/common.ts @@ -632,6 +632,11 @@ export interface DataStoreBnsBlockData { microblock_canonical: boolean; } +export type DataStoreBnsBlockTxData = DataStoreBnsBlockData & { + tx_id: string; + tx_index: number; +}; + export interface DataStoreAttachmentSubdomainData { attachment?: DataStoreAttachmentData; blockData?: DataStoreBnsBlockData; @@ -976,7 +981,7 @@ export interface FaucetRequestQueryResult { occurred_at: string; } -export interface UpdatedEntities { +export interface ReOrgUpdatedEntities { markedCanonical: { blocks: number; microblocks: number; diff --git a/src/datastore/helpers.ts b/src/datastore/helpers.ts index ca833140..af92983f 100644 --- a/src/datastore/helpers.ts +++ b/src/datastore/helpers.ts @@ -2,6 +2,7 @@ import { parseEnum, unwrapOptionalProp } from '../helpers'; import { BlockQueryResult, ContractTxQueryResult, + DataStoreBlockUpdateData, DbBlock, DbEvent, DbEventBase, @@ -42,6 +43,7 @@ import { PoxSyntheticEventQueryResult, TxQueryResult, DbPoxSyntheticRevokeDelegateStxEvent, + ReOrgUpdatedEntities, } from './common'; import { CoreNodeDropMempoolTxReasonType, @@ -1229,3 +1231,163 @@ export function convertTxQueryResultToDbMempoolTx(txs: TxQueryResult[]): DbMempo } return dbMempoolTxs; } + +export function setTotalBlockUpdateDataExecutionCost(data: DataStoreBlockUpdateData) { + const cost = data.txs.reduce( + (previousValue, currentValue) => { + const { + execution_cost_read_count, + execution_cost_read_length, + execution_cost_runtime, + execution_cost_write_count, + execution_cost_write_length, + } = previousValue; + return { + execution_cost_read_count: + execution_cost_read_count + currentValue.tx.execution_cost_read_count, + execution_cost_read_length: + execution_cost_read_length + currentValue.tx.execution_cost_read_length, + execution_cost_runtime: execution_cost_runtime + currentValue.tx.execution_cost_runtime, + execution_cost_write_count: + execution_cost_write_count + currentValue.tx.execution_cost_write_count, + execution_cost_write_length: + execution_cost_write_length + currentValue.tx.execution_cost_write_length, + }; + }, + { + execution_cost_read_count: 0, + execution_cost_read_length: 0, + execution_cost_runtime: 0, + execution_cost_write_count: 0, + execution_cost_write_length: 0, + } + ); + data.block.execution_cost_read_count = cost.execution_cost_read_count; + data.block.execution_cost_read_length = cost.execution_cost_read_length; + data.block.execution_cost_runtime = cost.execution_cost_runtime; + data.block.execution_cost_write_count = cost.execution_cost_write_count; + data.block.execution_cost_write_length = cost.execution_cost_write_length; +} + +export function markBlockUpdateDataAsNonCanonical(data: DataStoreBlockUpdateData): void { + data.block = { ...data.block, canonical: false }; + data.microblocks = data.microblocks.map(mb => ({ ...mb, canonical: false })); + data.txs = data.txs.map(tx => ({ + tx: { ...tx.tx, canonical: false }, + stxLockEvents: tx.stxLockEvents.map(e => ({ ...e, canonical: false })), + stxEvents: tx.stxEvents.map(e => ({ ...e, canonical: false })), + ftEvents: tx.ftEvents.map(e => ({ ...e, canonical: false })), + nftEvents: tx.nftEvents.map(e => ({ ...e, canonical: false })), + contractLogEvents: tx.contractLogEvents.map(e => ({ ...e, canonical: false })), + smartContracts: tx.smartContracts.map(e => ({ ...e, canonical: false })), + names: tx.names.map(e => ({ ...e, canonical: false })), + namespaces: tx.namespaces.map(e => ({ ...e, canonical: false })), + pox2Events: tx.pox2Events.map(e => ({ ...e, canonical: false })), + pox3Events: tx.pox3Events.map(e => ({ ...e, canonical: false })), + pox4Events: tx.pox4Events.map(e => ({ ...e, canonical: false })), + })); + data.minerRewards = data.minerRewards.map(mr => ({ ...mr, canonical: false })); +} + +export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities { + return { + markedCanonical: { + blocks: 0, + microblocks: 0, + minerRewards: 0, + txs: 0, + stxLockEvents: 0, + stxEvents: 0, + ftEvents: 0, + nftEvents: 0, + pox2Events: 0, + pox3Events: 0, + pox4Events: 0, + contractLogs: 0, + smartContracts: 0, + names: 0, + namespaces: 0, + subdomains: 0, + }, + markedNonCanonical: { + blocks: 0, + microblocks: 0, + minerRewards: 0, + txs: 0, + stxLockEvents: 0, + stxEvents: 0, + ftEvents: 0, + nftEvents: 0, + pox2Events: 0, + pox3Events: 0, + pox4Events: 0, + contractLogs: 0, + smartContracts: 0, + names: 0, + namespaces: 0, + subdomains: 0, + }, + }; +} + +export function logReorgResultInfo(updatedEntities: ReOrgUpdatedEntities) { + const updates = [ + ['blocks', updatedEntities.markedCanonical.blocks, updatedEntities.markedNonCanonical.blocks], + [ + 'microblocks', + updatedEntities.markedCanonical.microblocks, + updatedEntities.markedNonCanonical.microblocks, + ], + ['txs', updatedEntities.markedCanonical.txs, updatedEntities.markedNonCanonical.txs], + [ + 'miner-rewards', + updatedEntities.markedCanonical.minerRewards, + updatedEntities.markedNonCanonical.minerRewards, + ], + [ + 'stx-lock events', + updatedEntities.markedCanonical.stxLockEvents, + updatedEntities.markedNonCanonical.stxLockEvents, + ], + [ + 'stx-token events', + updatedEntities.markedCanonical.stxEvents, + updatedEntities.markedNonCanonical.stxEvents, + ], + [ + 'non-fungible-token events', + updatedEntities.markedCanonical.nftEvents, + updatedEntities.markedNonCanonical.nftEvents, + ], + [ + 'fungible-token events', + updatedEntities.markedCanonical.ftEvents, + updatedEntities.markedNonCanonical.ftEvents, + ], + [ + 'contract logs', + updatedEntities.markedCanonical.contractLogs, + updatedEntities.markedNonCanonical.contractLogs, + ], + [ + 'smart contracts', + updatedEntities.markedCanonical.smartContracts, + updatedEntities.markedNonCanonical.smartContracts, + ], + ['names', updatedEntities.markedCanonical.names, updatedEntities.markedNonCanonical.names], + [ + 'namespaces', + updatedEntities.markedCanonical.namespaces, + updatedEntities.markedNonCanonical.namespaces, + ], + [ + 'subdomains', + updatedEntities.markedCanonical.subdomains, + updatedEntities.markedNonCanonical.subdomains, + ], + ]; + const markedCanonical = updates.map(e => `${e[1]} ${e[0]}`).join(', '); + logger.debug(`Entities marked as canonical: ${markedCanonical}`); + const markedNonCanonical = updates.map(e => `${e[2]} ${e[0]}`).join(', '); + logger.debug(`Entities marked as non-canonical: ${markedNonCanonical}`); +} diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 2380da21..3da53808 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -1,4 +1,4 @@ -import { getOrAdd, batchIterate, I32_MAX, getIbdBlockHeight } from '../helpers'; +import { getOrAdd, I32_MAX, getIbdBlockHeight } from '../helpers'; import { DbBlock, DbTx, @@ -45,7 +45,7 @@ import { FaucetRequestInsertValues, MicroblockInsertValues, TxQueryResult, - UpdatedEntities, + ReOrgUpdatedEntities, BlockQueryResult, DataStoreAttachmentData, DataStoreAttachmentSubdomainData, @@ -57,12 +57,15 @@ import { RawEventRequestInsertValues, IndexesState, NftCustodyInsertValues, + DataStoreBnsBlockTxData, DbPoxSyntheticEvent, PoxSyntheticEventTable, } from './common'; import { BLOCK_COLUMNS, + setTotalBlockUpdateDataExecutionCost, convertTxQueryResultToDbMempoolTx, + markBlockUpdateDataAsNonCanonical, MEMPOOL_TX_COLUMNS, MICROBLOCK_COLUMNS, parseBlockQueryResult, @@ -72,6 +75,8 @@ import { TX_COLUMNS, TX_METADATA_TABLES, validateZonefileHash, + newReOrgUpdatedEntities, + logReorgResultInfo, } from './helpers'; import { PgNotifier } from './pg-notifier'; import { MIGRATIONS_DIR, PgStore } from './pg-store'; @@ -82,13 +87,16 @@ import { logger } from '../logger'; import { PgJsonb, PgSqlClient, + batchIterate, connectPostgres, isProdEnv, + isTestEnv, runMigrations, } from '@hirosystems/api-toolkit'; import { PgServer, getConnectionArgs, getConnectionConfig } from './connection'; const MIGRATIONS_TABLE = 'pgmigrations'; +const INSERT_BATCH_SIZE = 500; class MicroblockGapError extends Error { constructor(message: string) { @@ -167,112 +175,30 @@ export class PgWriteStore extends PgStore { async update(data: DataStoreBlockUpdateData): Promise { let garbageCollectedMempoolTxs: string[] = []; let batchedTxData: DataStoreTxEventData[] = []; - const deployedSmartContracts: DbSmartContract[] = []; - const contractLogEvents: DbSmartContractEvent[] = []; await this.sqlWriteTransaction(async sql => { const chainTip = await this.getChainTip(); await this.handleReorg(sql, data.block, chainTip.block_height); - // If the incoming block is not of greater height than current chain tip, then store data as non-canonical. + const isCanonical = data.block.block_height > chainTip.block_height; if (!isCanonical) { - data.block = { ...data.block, canonical: false }; - data.microblocks = data.microblocks.map(mb => ({ ...mb, canonical: false })); - data.txs = data.txs.map(tx => ({ - tx: { ...tx.tx, canonical: false }, - stxLockEvents: tx.stxLockEvents.map(e => ({ ...e, canonical: false })), - stxEvents: tx.stxEvents.map(e => ({ ...e, canonical: false })), - ftEvents: tx.ftEvents.map(e => ({ ...e, canonical: false })), - nftEvents: tx.nftEvents.map(e => ({ ...e, canonical: false })), - contractLogEvents: tx.contractLogEvents.map(e => ({ ...e, canonical: false })), - smartContracts: tx.smartContracts.map(e => ({ ...e, canonical: false })), - names: tx.names.map(e => ({ ...e, canonical: false })), - namespaces: tx.namespaces.map(e => ({ ...e, canonical: false })), - pox2Events: tx.pox2Events.map(e => ({ ...e, canonical: false })), - pox3Events: tx.pox3Events.map(e => ({ ...e, canonical: false })), - pox4Events: tx.pox4Events.map(e => ({ ...e, canonical: false })), - })); - data.minerRewards = data.minerRewards.map(mr => ({ ...mr, canonical: false })); + markBlockUpdateDataAsNonCanonical(data); } else { - // When storing newly mined canonical txs, remove them from the mempool table. - const candidateTxIds = data.txs.map(d => d.tx.tx_id); - const removedTxsResult = await this.pruneMempoolTxs(sql, candidateTxIds); - if (removedTxsResult.removedTxs.length > 0) { + const txIds = data.txs.map(d => d.tx.tx_id); + const pruneRes = await this.pruneMempoolTxs(sql, txIds); + if (pruneRes.removedTxs.length > 0) logger.debug( - `Removed ${removedTxsResult.removedTxs.length} txs from mempool table during new block ingestion` + `Removed ${pruneRes.removedTxs.length} txs from mempool table during new block ingestion` ); - } } + setTotalBlockUpdateDataExecutionCost(data); - //calculate total execution cost of the block - const totalCost = data.txs.reduce( - (previousValue, currentValue) => { - const { - execution_cost_read_count, - execution_cost_read_length, - execution_cost_runtime, - execution_cost_write_count, - execution_cost_write_length, - } = previousValue; - - return { - execution_cost_read_count: - execution_cost_read_count + currentValue.tx.execution_cost_read_count, - execution_cost_read_length: - execution_cost_read_length + currentValue.tx.execution_cost_read_length, - execution_cost_runtime: execution_cost_runtime + currentValue.tx.execution_cost_runtime, - execution_cost_write_count: - execution_cost_write_count + currentValue.tx.execution_cost_write_count, - execution_cost_write_length: - execution_cost_write_length + currentValue.tx.execution_cost_write_length, - }; - }, - { - execution_cost_read_count: 0, - execution_cost_read_length: 0, - execution_cost_runtime: 0, - execution_cost_write_count: 0, - execution_cost_write_length: 0, - } - ); - - data.block.execution_cost_read_count = totalCost.execution_cost_read_count; - data.block.execution_cost_read_length = totalCost.execution_cost_read_length; - data.block.execution_cost_runtime = totalCost.execution_cost_runtime; - data.block.execution_cost_write_count = totalCost.execution_cost_write_count; - data.block.execution_cost_write_length = totalCost.execution_cost_write_length; - - batchedTxData = data.txs; - - // Find microblocks that weren't already inserted via the unconfirmed microblock event. - // This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time. - if (data.microblocks.length > 0) { - const existingMicroblocksQuery = await sql<{ microblock_hash: string }[]>` - SELECT microblock_hash - FROM microblocks - WHERE parent_index_block_hash = ${data.block.parent_index_block_hash} - AND microblock_hash IN ${sql(data.microblocks.map(mb => mb.microblock_hash))} - `; - const existingMicroblockHashes = new Set( - existingMicroblocksQuery.map(r => r.microblock_hash) - ); - - const missingMicroblocks = data.microblocks.filter( - mb => !existingMicroblockHashes.has(mb.microblock_hash) - ); - if (missingMicroblocks.length > 0) { - const missingMicroblockHashes = new Set(missingMicroblocks.map(mb => mb.microblock_hash)); - const missingTxs = data.txs.filter(entry => - missingMicroblockHashes.has(entry.tx.microblock_hash) - ); - await this.insertMicroblockData(sql, missingMicroblocks, missingTxs); - - // Clear already inserted microblock txs from the anchor-block update data to avoid duplicate inserts. - batchedTxData = batchedTxData.filter(entry => { - return !missingMicroblockHashes.has(entry.tx.microblock_hash); - }); - } - } + // Insert microblocks, if any. Clear already inserted microblock txs from the anchor-block + // update data to avoid duplicate inserts. + const insertedMicroblockHashes = await this.insertMicroblocksFromBlockUpdate(sql, data); + batchedTxData = data.txs.filter(entry => { + return !insertedMicroblockHashes.has(entry.tx.microblock_hash); + }); // When processing an immediately-non-canonical block, do not orphan and possible existing microblocks // which may be still considered canonical by the canonical block at this height. @@ -308,31 +234,8 @@ export class PgWriteStore extends PgStore { const matchingTx = acceptedMicroblockTxs.find(tx => tx.tx_id === entry.tx.tx_id); return !matchingTx; }); - } - if (isCanonical && data.pox_v1_unlock_height !== undefined) { - // update the pox_state.pox_v1_unlock_height singleton - await sql` - UPDATE pox_state - SET pox_v1_unlock_height = ${data.pox_v1_unlock_height} - WHERE pox_v1_unlock_height != ${data.pox_v1_unlock_height} - `; - } - if (isCanonical && data.pox_v2_unlock_height !== undefined) { - // update the pox_state.pox_v2_unlock_height singleton - await sql` - UPDATE pox_state - SET pox_v2_unlock_height = ${data.pox_v2_unlock_height} - WHERE pox_v2_unlock_height != ${data.pox_v2_unlock_height} - `; - } - if (isCanonical && data.pox_v3_unlock_height !== undefined) { - // update the pox_state.pox_v3_unlock_height singleton - await sql` - UPDATE pox_state - SET pox_v3_unlock_height = ${data.pox_v3_unlock_height} - WHERE pox_v3_unlock_height != ${data.pox_v3_unlock_height} - `; + await this.updatePoxStateUnlockHeight(sql, data); } // When receiving first block, check if "block 0" boot data was received, @@ -344,50 +247,22 @@ export class PgWriteStore extends PgStore { await this.fixBlockZeroData(sql, data.block); } } - - // TODO(mb): sanity tests on tx_index on batchedTxData, re-normalize if necessary - - // TODO(mb): copy the batchedTxData to outside the sql transaction fn so they can be emitted in txUpdate event below - - const blocksUpdated = await this.updateBlock(sql, data.block); - if (blocksUpdated !== 0) { - for (const minerRewards of data.minerRewards) { - await this.updateMinerReward(sql, minerRewards); - } + if ((await this.updateBlock(sql, data.block)) !== 0) { + await this.updateMinerRewards(sql, data.minerRewards); for (const entry of batchedTxData) { await this.updateTx(sql, entry.tx); - await this.updateBatchStxEvents(sql, entry.tx, entry.stxEvents); + await this.updateStxEvents(sql, entry.tx, entry.stxEvents); await this.updatePrincipalStxTxs(sql, entry.tx, entry.stxEvents); - contractLogEvents.push(...entry.contractLogEvents); - await this.updateBatchSmartContractEvent(sql, entry.tx, entry.contractLogEvents); - for (const pox2Event of entry.pox2Events) { - await this.updatePoxSyntheticEvent(sql, entry.tx, 'pox2_events', pox2Event); - } - for (const pox3Event of entry.pox3Events) { - await this.updatePoxSyntheticEvent(sql, entry.tx, 'pox3_events', pox3Event); - } - for (const pox4Event of entry.pox4Events) { - await this.updatePoxSyntheticEvent(sql, entry.tx, 'pox4_events', pox4Event); - } - for (const stxLockEvent of entry.stxLockEvents) { - await this.updateStxLockEvent(sql, entry.tx, stxLockEvent); - } - for (const ftEvent of entry.ftEvents) { - await this.updateFtEvent(sql, entry.tx, ftEvent); - } - for (const nftEvent of entry.nftEvents) { - await this.updateNftEvent(sql, entry.tx, nftEvent, false); - } - deployedSmartContracts.push(...entry.smartContracts); - for (const smartContract of entry.smartContracts) { - await this.updateSmartContract(sql, entry.tx, smartContract); - } - for (const namespace of entry.namespaces) { - await this.updateNamespaces(sql, entry.tx, namespace); - } - for (const bnsName of entry.names) { - await this.updateNames(sql, entry.tx, bnsName); - } + await this.updateSmartContractEvents(sql, entry.tx, entry.contractLogEvents); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox2_events', entry.pox2Events); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox3_events', entry.pox3Events); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox4_events', entry.pox4Events); + await this.updateStxLockEvents(sql, entry.tx, entry.stxLockEvents); + await this.updateFtEvents(sql, entry.tx, entry.ftEvents); + await this.updateNftEvents(sql, entry.tx, entry.nftEvents); + await this.updateSmartContracts(sql, entry.tx, entry.smartContracts); + await this.updateNamespaces(sql, entry.tx, entry.namespaces); + await this.updateNames(sql, entry.tx, entry.names); } const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql); if (mempoolGarbageResults.deletedTxs.length > 0) { @@ -418,63 +293,136 @@ export class PgWriteStore extends PgStore { tx_count = (SELECT tx_count FROM new_tx_count), tx_count_unanchored = (SELECT tx_count FROM new_tx_count) `; + + await this.refreshMaterializedView('mempool_digest'); }); // Do we have an IBD height defined in ENV? If so, check if this block update reached it. const ibdHeight = getIbdBlockHeight(); this.isIbdBlockHeightReached = ibdHeight ? data.block.block_height > ibdHeight : true; + // Send block updates but don't block current execution unless we're testing. + if (isTestEnv) await this.sendBlockNotifications({ data, garbageCollectedMempoolTxs }); + else void this.sendBlockNotifications({ data, garbageCollectedMempoolTxs }); + } - await this.refreshMaterializedView('mempool_digest'); - - // Skip sending `PgNotifier` updates altogether if we're in the genesis block since this block is the - // event replay of the v1 blockchain. - if ((data.block.block_height > 1 || !isProdEnv) && this.notifier) { - await this.notifier.sendBlock({ blockHash: data.block.block_hash }); - for (const tx of data.txs) { - await this.notifier.sendTx({ txId: tx.tx.tx_id }); - } - for (const txId of garbageCollectedMempoolTxs) { - await this.notifier.sendTx({ txId: txId }); - } - for (const smartContract of deployedSmartContracts) { + /** + * Send block update via Postgres NOTIFY + * @param args - Block data + */ + private async sendBlockNotifications(args: { + data: DataStoreBlockUpdateData; + garbageCollectedMempoolTxs: string[]; + }): Promise { + // Skip sending `PgNotifier` updates altogether if we're in the genesis block since this block + // is the event replay of the v1 blockchain. + if (!this.notifier || !(args.data.block.block_height > 1 || !isProdEnv)) return; + await this.notifier.sendBlock({ blockHash: args.data.block.block_hash }); + for (const tx of args.data.txs) { + await this.notifier.sendTx({ txId: tx.tx.tx_id }); + for (const smartContract of tx.smartContracts) { await this.notifier.sendSmartContract({ contractId: smartContract.contract_id, }); } - for (const logEvent of contractLogEvents) { + for (const logEvent of tx.contractLogEvents) { await this.notifier.sendSmartContractLog({ txId: logEvent.tx_id, eventIndex: logEvent.event_index, }); } - await this.emitAddressTxUpdates(data.txs); - for (const nftEvent of data.txs.map(tx => tx.nftEvents).flat()) { - await this.notifier.sendNftEvent({ - txId: nftEvent.tx_id, - eventIndex: nftEvent.event_index, - }); - } + } + for (const txId of args.garbageCollectedMempoolTxs) { + await this.notifier.sendTx({ txId: txId }); + } + await this.emitAddressTxUpdates(args.data.txs); + for (const nftEvent of args.data.txs.map(tx => tx.nftEvents).flat()) { + await this.notifier.sendNftEvent({ + txId: nftEvent.tx_id, + eventIndex: nftEvent.event_index, + }); } } - async updateMinerReward(sql: PgSqlClient, minerReward: DbMinerReward): Promise { - const values: MinerRewardInsertValues = { - block_hash: minerReward.block_hash, - index_block_hash: minerReward.index_block_hash, - from_index_block_hash: minerReward.from_index_block_hash, - mature_block_height: minerReward.mature_block_height, - canonical: minerReward.canonical, - recipient: minerReward.recipient, - // If `miner_address` is null then it means pre-Stacks2.1 data, and the `recipient` can be accurately used - miner_address: minerReward.miner_address ?? minerReward.recipient, - coinbase_amount: minerReward.coinbase_amount.toString(), - tx_fees_anchored: minerReward.tx_fees_anchored.toString(), - tx_fees_streamed_confirmed: minerReward.tx_fees_streamed_confirmed.toString(), - tx_fees_streamed_produced: minerReward.tx_fees_streamed_produced.toString(), - }; - const result = await sql` - INSERT INTO miner_rewards ${sql(values)} + /** + * Find and insert microblocks that weren't already inserted via the unconfirmed `/new_microblock` + * event. This happens when a stacks-node is syncing and receives confirmed microblocks with their + * anchor block at the same time. + * @param sql - SQL client + * @param data - Block data to insert + * @returns Set of microblock hashes that were inserted in this update + */ + private async insertMicroblocksFromBlockUpdate( + sql: PgSqlClient, + data: DataStoreBlockUpdateData + ): Promise> { + if (data.microblocks.length == 0) return new Set(); + const existingMicroblocksQuery = await sql<{ microblock_hash: string }[]>` + SELECT DISTINCT microblock_hash + FROM microblocks + WHERE parent_index_block_hash = ${data.block.parent_index_block_hash} + AND microblock_hash IN ${sql(data.microblocks.map(mb => mb.microblock_hash))} `; - return result.count; + const existingHashes = existingMicroblocksQuery.map(i => i.microblock_hash); + const missingMicroblocks = data.microblocks.filter( + mb => !existingHashes.includes(mb.microblock_hash) + ); + if (missingMicroblocks.length > 0) { + const missingMicroblockHashes = new Set(missingMicroblocks.map(mb => mb.microblock_hash)); + const missingTxs = data.txs.filter(entry => + missingMicroblockHashes.has(entry.tx.microblock_hash) + ); + await this.insertMicroblockData(sql, missingMicroblocks, missingTxs); + return missingMicroblockHashes; + } + return new Set(); + } + + private async updatePoxStateUnlockHeight(sql: PgSqlClient, data: DataStoreBlockUpdateData) { + if (data.pox_v1_unlock_height !== undefined) { + // update the pox_state.pox_v1_unlock_height singleton + await sql` + UPDATE pox_state + SET pox_v1_unlock_height = ${data.pox_v1_unlock_height} + WHERE pox_v1_unlock_height != ${data.pox_v1_unlock_height} + `; + } + if (data.pox_v2_unlock_height !== undefined) { + // update the pox_state.pox_v2_unlock_height singleton + await sql` + UPDATE pox_state + SET pox_v2_unlock_height = ${data.pox_v2_unlock_height} + WHERE pox_v2_unlock_height != ${data.pox_v2_unlock_height} + `; + } + if (data.pox_v3_unlock_height !== undefined) { + // update the pox_state.pox_v3_unlock_height singleton + await sql` + UPDATE pox_state + SET pox_v3_unlock_height = ${data.pox_v3_unlock_height} + WHERE pox_v3_unlock_height != ${data.pox_v3_unlock_height} + `; + } + } + + async updateMinerRewards(sql: PgSqlClient, minerRewards: DbMinerReward[]): Promise { + for (const batch of batchIterate(minerRewards, INSERT_BATCH_SIZE)) { + const values: MinerRewardInsertValues[] = batch.map(minerReward => ({ + block_hash: minerReward.block_hash, + index_block_hash: minerReward.index_block_hash, + from_index_block_hash: minerReward.from_index_block_hash, + mature_block_height: minerReward.mature_block_height, + canonical: minerReward.canonical, + recipient: minerReward.recipient, + // If `miner_address` is null then it means pre-Stacks2.1 data, and the `recipient` can be accurately used + miner_address: minerReward.miner_address ?? minerReward.recipient, + coinbase_amount: minerReward.coinbase_amount.toString(), + tx_fees_anchored: minerReward.tx_fees_anchored.toString(), + tx_fees_streamed_confirmed: minerReward.tx_fees_streamed_confirmed.toString(), + tx_fees_streamed_produced: minerReward.tx_fees_streamed_produced.toString(), + })); + await sql` + INSERT INTO miner_rewards ${sql(values)} + `; + } } async updateBlock(sql: PgSqlClient, block: DbBlock): Promise { @@ -828,150 +776,156 @@ export class PgWriteStore extends PgStore { logger.info('Updated block zero boot data', tablesUpdates); } - async updatePoxSyntheticEvent( + async updatePoxSyntheticEvents( sql: PgSqlClient, tx: DbTx, poxTable: PoxSyntheticEventTable, - event: DbPoxSyntheticEvent + events: DbPoxSyntheticEvent[] ) { - const values: PoxSyntheticEventInsertValues = { - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - stacker: event.stacker, - locked: event.locked.toString(), - balance: event.balance.toString(), - burnchain_unlock_height: event.burnchain_unlock_height.toString(), - name: event.name, - pox_addr: event.pox_addr, - pox_addr_raw: event.pox_addr_raw, - first_cycle_locked: null, - first_unlocked_cycle: null, - delegate_to: null, - lock_period: null, - lock_amount: null, - start_burn_height: null, - unlock_burn_height: null, - delegator: null, - increase_by: null, - total_locked: null, - extend_count: null, - reward_cycle: null, - amount_ustx: null, - }; - // Set event-specific columns - switch (event.name) { - case SyntheticPoxEventName.HandleUnlock: { - values.first_cycle_locked = event.data.first_cycle_locked.toString(); - values.first_unlocked_cycle = event.data.first_unlocked_cycle.toString(); - break; - } - case SyntheticPoxEventName.StackStx: { - values.lock_period = event.data.lock_period.toString(); - values.lock_amount = event.data.lock_amount.toString(); - values.start_burn_height = event.data.start_burn_height.toString(); - values.unlock_burn_height = event.data.unlock_burn_height.toString(); - break; - } - case SyntheticPoxEventName.StackIncrease: { - values.increase_by = event.data.increase_by.toString(); - values.total_locked = event.data.total_locked.toString(); - break; - } - case SyntheticPoxEventName.StackExtend: { - values.extend_count = event.data.extend_count.toString(); - values.unlock_burn_height = event.data.unlock_burn_height.toString(); - break; - } - case SyntheticPoxEventName.DelegateStx: { - values.amount_ustx = event.data.amount_ustx.toString(); - values.delegate_to = event.data.delegate_to; - values.unlock_burn_height = event.data.unlock_burn_height?.toString() ?? null; - break; - } - case SyntheticPoxEventName.DelegateStackStx: { - values.lock_period = event.data.lock_period.toString(); - values.lock_amount = event.data.lock_amount.toString(); - values.start_burn_height = event.data.start_burn_height.toString(); - values.unlock_burn_height = event.data.unlock_burn_height.toString(); - values.delegator = event.data.delegator; - break; - } - case SyntheticPoxEventName.DelegateStackIncrease: { - values.increase_by = event.data.increase_by.toString(); - values.total_locked = event.data.total_locked.toString(); - values.delegator = event.data.delegator; - break; - } - case SyntheticPoxEventName.DelegateStackExtend: { - values.extend_count = event.data.extend_count.toString(); - values.unlock_burn_height = event.data.unlock_burn_height.toString(); - values.delegator = event.data.delegator; - break; - } - case SyntheticPoxEventName.StackAggregationCommit: { - values.reward_cycle = event.data.reward_cycle.toString(); - values.amount_ustx = event.data.amount_ustx.toString(); - break; - } - case SyntheticPoxEventName.StackAggregationCommitIndexed: { - values.reward_cycle = event.data.reward_cycle.toString(); - values.amount_ustx = event.data.amount_ustx.toString(); - break; - } - case SyntheticPoxEventName.StackAggregationIncrease: { - values.reward_cycle = event.data.reward_cycle.toString(); - values.amount_ustx = event.data.amount_ustx.toString(); - break; - } - case SyntheticPoxEventName.RevokeDelegateStx: { - values.amount_ustx = event.data.amount_ustx.toString(); - values.delegate_to = event.data.delegate_to; - break; - } - default: { - throw new Error( - `Unexpected Pox synthetic event name: ${(event as DbPoxSyntheticEvent).name}` - ); - } + for (const batch of batchIterate(events, INSERT_BATCH_SIZE)) { + const values = batch.map(event => { + const value: PoxSyntheticEventInsertValues = { + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + stacker: event.stacker, + locked: event.locked.toString(), + balance: event.balance.toString(), + burnchain_unlock_height: event.burnchain_unlock_height.toString(), + name: event.name, + pox_addr: event.pox_addr, + pox_addr_raw: event.pox_addr_raw, + first_cycle_locked: null, + first_unlocked_cycle: null, + delegate_to: null, + lock_period: null, + lock_amount: null, + start_burn_height: null, + unlock_burn_height: null, + delegator: null, + increase_by: null, + total_locked: null, + extend_count: null, + reward_cycle: null, + amount_ustx: null, + }; + // Set event-specific columns + switch (event.name) { + case SyntheticPoxEventName.HandleUnlock: { + value.first_cycle_locked = event.data.first_cycle_locked.toString(); + value.first_unlocked_cycle = event.data.first_unlocked_cycle.toString(); + break; + } + case SyntheticPoxEventName.StackStx: { + value.lock_period = event.data.lock_period.toString(); + value.lock_amount = event.data.lock_amount.toString(); + value.start_burn_height = event.data.start_burn_height.toString(); + value.unlock_burn_height = event.data.unlock_burn_height.toString(); + break; + } + case SyntheticPoxEventName.StackIncrease: { + value.increase_by = event.data.increase_by.toString(); + value.total_locked = event.data.total_locked.toString(); + break; + } + case SyntheticPoxEventName.StackExtend: { + value.extend_count = event.data.extend_count.toString(); + value.unlock_burn_height = event.data.unlock_burn_height.toString(); + break; + } + case SyntheticPoxEventName.DelegateStx: { + value.amount_ustx = event.data.amount_ustx.toString(); + value.delegate_to = event.data.delegate_to; + value.unlock_burn_height = event.data.unlock_burn_height?.toString() ?? null; + break; + } + case SyntheticPoxEventName.DelegateStackStx: { + value.lock_period = event.data.lock_period.toString(); + value.lock_amount = event.data.lock_amount.toString(); + value.start_burn_height = event.data.start_burn_height.toString(); + value.unlock_burn_height = event.data.unlock_burn_height.toString(); + value.delegator = event.data.delegator; + break; + } + case SyntheticPoxEventName.DelegateStackIncrease: { + value.increase_by = event.data.increase_by.toString(); + value.total_locked = event.data.total_locked.toString(); + value.delegator = event.data.delegator; + break; + } + case SyntheticPoxEventName.DelegateStackExtend: { + value.extend_count = event.data.extend_count.toString(); + value.unlock_burn_height = event.data.unlock_burn_height.toString(); + value.delegator = event.data.delegator; + break; + } + case SyntheticPoxEventName.StackAggregationCommit: { + value.reward_cycle = event.data.reward_cycle.toString(); + value.amount_ustx = event.data.amount_ustx.toString(); + break; + } + case SyntheticPoxEventName.StackAggregationCommitIndexed: { + value.reward_cycle = event.data.reward_cycle.toString(); + value.amount_ustx = event.data.amount_ustx.toString(); + break; + } + case SyntheticPoxEventName.StackAggregationIncrease: { + value.reward_cycle = event.data.reward_cycle.toString(); + value.amount_ustx = event.data.amount_ustx.toString(); + break; + } + case SyntheticPoxEventName.RevokeDelegateStx: { + value.amount_ustx = event.data.amount_ustx.toString(); + value.delegate_to = event.data.delegate_to; + break; + } + default: { + throw new Error( + `Unexpected Pox synthetic event name: ${(event as DbPoxSyntheticEvent).name}` + ); + } + } + return value; + }); + await sql` + INSERT INTO ${sql(poxTable)} ${sql(values)} + `; } - await sql` - INSERT INTO ${sql(poxTable)} ${sql(values)} - `; } - async updateStxLockEvent(sql: PgSqlClient, tx: DbTx, event: DbStxLockEvent) { - const values: StxLockEventInsertValues = { - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - locked_amount: event.locked_amount.toString(), - unlock_height: event.unlock_height, - locked_address: event.locked_address, - contract_name: event.contract_name, - }; - await sql` - INSERT INTO stx_lock_events ${sql(values)} - `; + async updateStxLockEvents(sql: PgSqlClient, tx: DbTx, events: DbStxLockEvent[]) { + for (const batch of batchIterate(events, INSERT_BATCH_SIZE)) { + const values: StxLockEventInsertValues[] = batch.map(event => ({ + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + locked_amount: event.locked_amount.toString(), + unlock_height: event.unlock_height, + locked_address: event.locked_address, + contract_name: event.contract_name, + })); + await sql` + INSERT INTO stx_lock_events ${sql(values)} + `; + } } - async updateBatchStxEvents(sql: PgSqlClient, tx: DbTx, events: DbStxEvent[]) { - const batchSize = 500; // (matt) benchmark: 21283 per second (15 seconds) - for (const eventBatch of batchIterate(events, batchSize)) { + async updateStxEvents(sql: PgSqlClient, tx: DbTx, events: DbStxEvent[]) { + for (const eventBatch of batchIterate(events, INSERT_BATCH_SIZE)) { const values: StxEventInsertValues[] = eventBatch.map(event => ({ event_index: event.event_index, tx_id: event.tx_id, @@ -1034,8 +988,7 @@ export class PgWriteStore extends PgStore { ].filter((p): p is string => !!p) // Remove undefined ); // Insert stx_event data - const batchSize = 500; - for (const eventBatch of batchIterate(events, batchSize)) { + for (const eventBatch of batchIterate(events, INSERT_BATCH_SIZE)) { const principals: string[] = []; for (const event of eventBatch) { if (event.sender) principals.push(event.sender); @@ -1183,93 +1136,128 @@ export class PgWriteStore extends PgStore { `; } - async updateFtEvent(sql: PgSqlClient, tx: DbTx, event: DbFtEvent) { - const values: FtEventInsertValues = { - event_index: event.event_index, - tx_id: event.tx_id, - tx_index: event.tx_index, - block_height: event.block_height, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - asset_event_type_id: event.asset_event_type_id, - sender: event.sender ?? null, - recipient: event.recipient ?? null, - asset_identifier: event.asset_identifier, - amount: event.amount.toString(), - }; - await sql` - INSERT INTO ft_events ${sql(values)} - `; - } - - async updateNftEvent(sql: PgSqlClient, tx: DbTx, event: DbNftEvent, microblock: boolean) { - const custody: NftCustodyInsertValues = { - asset_identifier: event.asset_identifier, - value: event.value, - tx_id: event.tx_id, - index_block_hash: tx.index_block_hash, - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - recipient: event.recipient ?? null, - event_index: event.event_index, - tx_index: event.tx_index, - block_height: event.block_height, - }; - const values: NftEventInsertValues = { - ...custody, - microblock_canonical: tx.microblock_canonical, - canonical: event.canonical, - sender: event.sender ?? null, - asset_event_type_id: event.asset_event_type_id, - }; - await sql` - INSERT INTO nft_events ${sql(values)} - `; - if (tx.canonical && tx.microblock_canonical && event.canonical) { - const table = microblock ? sql`nft_custody_unanchored` : sql`nft_custody`; + async updateFtEvents(sql: PgSqlClient, tx: DbTx, events: DbFtEvent[]) { + for (const batch of batchIterate(events, INSERT_BATCH_SIZE)) { + const values: FtEventInsertValues[] = batch.map(event => ({ + event_index: event.event_index, + tx_id: event.tx_id, + tx_index: event.tx_index, + block_height: event.block_height, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + asset_event_type_id: event.asset_event_type_id, + sender: event.sender ?? null, + recipient: event.recipient ?? null, + asset_identifier: event.asset_identifier, + amount: event.amount.toString(), + })); await sql` - INSERT INTO ${table} ${sql(custody)} - ON CONFLICT ON CONSTRAINT ${table}_unique DO UPDATE SET - tx_id = EXCLUDED.tx_id, - index_block_hash = EXCLUDED.index_block_hash, - parent_index_block_hash = EXCLUDED.parent_index_block_hash, - microblock_hash = EXCLUDED.microblock_hash, - microblock_sequence = EXCLUDED.microblock_sequence, - recipient = EXCLUDED.recipient, - event_index = EXCLUDED.event_index, - tx_index = EXCLUDED.tx_index, - block_height = EXCLUDED.block_height - WHERE - ( - EXCLUDED.block_height > ${table}.block_height - ) - OR ( - EXCLUDED.block_height = ${table}.block_height - AND EXCLUDED.microblock_sequence > ${table}.microblock_sequence - ) - OR ( - EXCLUDED.block_height = ${table}.block_height - AND EXCLUDED.microblock_sequence = ${table}.microblock_sequence - AND EXCLUDED.tx_index > ${table}.tx_index - ) - OR ( - EXCLUDED.block_height = ${table}.block_height - AND EXCLUDED.microblock_sequence = ${table}.microblock_sequence - AND EXCLUDED.tx_index = ${table}.tx_index - AND EXCLUDED.event_index > ${table}.event_index - ) + INSERT INTO ft_events ${sql(values)} `; } } - async updateBatchSmartContractEvent(sql: PgSqlClient, tx: DbTx, events: DbSmartContractEvent[]) { - const batchSize = 500; // (matt) benchmark: 21283 per second (15 seconds) - for (const eventBatch of batchIterate(events, batchSize)) { + async updateNftEvents( + sql: PgSqlClient, + tx: DbTx, + events: DbNftEvent[], + microblock: boolean = false + ) { + for (const batch of batchIterate(events, INSERT_BATCH_SIZE)) { + const custodyInsertsMap = new Map(); + const nftEventInserts: NftEventInsertValues[] = []; + for (const event of batch) { + const custodyItem: NftCustodyInsertValues = { + asset_identifier: event.asset_identifier, + value: event.value, + tx_id: event.tx_id, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + recipient: event.recipient ?? null, + event_index: event.event_index, + tx_index: event.tx_index, + block_height: event.block_height, + }; + // Avoid duplicates on NFT custody inserts, because we could run into an `ON CONFLICT DO + // UPDATE command cannot affect row a second time` error otherwise. + const custodyKey = `${event.asset_identifier}_${event.value}`; + const currCustody = custodyInsertsMap.get(custodyKey); + if (currCustody) { + if ( + custodyItem.block_height > currCustody.block_height || + (custodyItem.block_height == currCustody.block_height && + custodyItem.microblock_sequence > currCustody.microblock_sequence) || + (custodyItem.block_height == currCustody.block_height && + custodyItem.microblock_sequence == currCustody.microblock_sequence && + custodyItem.tx_index > currCustody.tx_index) || + (custodyItem.block_height == currCustody.block_height && + custodyItem.microblock_sequence == currCustody.microblock_sequence && + custodyItem.tx_index == currCustody.tx_index && + custodyItem.event_index > currCustody.event_index) + ) { + custodyInsertsMap.set(custodyKey, custodyItem); + } + } else { + custodyInsertsMap.set(custodyKey, custodyItem); + } + const valuesItem: NftEventInsertValues = { + ...custodyItem, + microblock_canonical: tx.microblock_canonical, + canonical: event.canonical, + sender: event.sender ?? null, + asset_event_type_id: event.asset_event_type_id, + }; + nftEventInserts.push(valuesItem); + } + await sql` + INSERT INTO nft_events ${sql(nftEventInserts)} + `; + if (tx.canonical && tx.microblock_canonical) { + const table = microblock ? sql`nft_custody_unanchored` : sql`nft_custody`; + await sql` + INSERT INTO ${table} ${sql(Array.from(custodyInsertsMap.values()))} + ON CONFLICT ON CONSTRAINT ${table}_unique DO UPDATE SET + tx_id = EXCLUDED.tx_id, + index_block_hash = EXCLUDED.index_block_hash, + parent_index_block_hash = EXCLUDED.parent_index_block_hash, + microblock_hash = EXCLUDED.microblock_hash, + microblock_sequence = EXCLUDED.microblock_sequence, + recipient = EXCLUDED.recipient, + event_index = EXCLUDED.event_index, + tx_index = EXCLUDED.tx_index, + block_height = EXCLUDED.block_height + WHERE + ( + EXCLUDED.block_height > ${table}.block_height + ) + OR ( + EXCLUDED.block_height = ${table}.block_height + AND EXCLUDED.microblock_sequence > ${table}.microblock_sequence + ) + OR ( + EXCLUDED.block_height = ${table}.block_height + AND EXCLUDED.microblock_sequence = ${table}.microblock_sequence + AND EXCLUDED.tx_index > ${table}.tx_index + ) + OR ( + EXCLUDED.block_height = ${table}.block_height + AND EXCLUDED.microblock_sequence = ${table}.microblock_sequence + AND EXCLUDED.tx_index = ${table}.tx_index + AND EXCLUDED.event_index > ${table}.event_index + ) + `; + } + } + } + + async updateSmartContractEvents(sql: PgSqlClient, tx: DbTx, events: DbSmartContractEvent[]) { + for (const eventBatch of batchIterate(events, INSERT_BATCH_SIZE)) { const values: SmartContractEventInsertValues[] = eventBatch.map(event => ({ event_index: event.event_index, tx_id: event.tx_id, @@ -1737,202 +1725,193 @@ export class PgWriteStore extends PgStore { } } - async updateSmartContract(sql: PgSqlClient, tx: DbTx, smartContract: DbSmartContract) { - const values: SmartContractInsertValues = { - tx_id: smartContract.tx_id, - canonical: smartContract.canonical, - clarity_version: smartContract.clarity_version, - contract_id: smartContract.contract_id, - block_height: smartContract.block_height, - index_block_hash: tx.index_block_hash, - source_code: smartContract.source_code, - abi: smartContract.abi ? JSON.parse(smartContract.abi) ?? 'null' : 'null', - parent_index_block_hash: tx.parent_index_block_hash, - microblock_hash: tx.microblock_hash, - microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - }; - await sql` - INSERT INTO smart_contracts ${sql(values)} - `; + async updateSmartContracts(sql: PgSqlClient, tx: DbTx, smartContracts: DbSmartContract[]) { + for (const batch of batchIterate(smartContracts, INSERT_BATCH_SIZE)) { + const values: SmartContractInsertValues[] = batch.map(smartContract => ({ + tx_id: smartContract.tx_id, + canonical: smartContract.canonical, + clarity_version: smartContract.clarity_version, + contract_id: smartContract.contract_id, + block_height: smartContract.block_height, + index_block_hash: tx.index_block_hash, + source_code: smartContract.source_code, + abi: smartContract.abi ? JSON.parse(smartContract.abi) ?? 'null' : 'null', + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + })); + await sql` + INSERT INTO smart_contracts ${sql(values)} + `; + } } - async updateNames( - sql: PgSqlClient, - blockData: { - index_block_hash: string; - parent_index_block_hash: string; - microblock_hash: string; - microblock_sequence: number; - microblock_canonical: boolean; - }, - bnsName: DbBnsName - ) { - const { - name, - address, - registered_at, - expire_block, - zonefile, - zonefile_hash, - namespace_id, - tx_id, - tx_index, - event_index, - status, - canonical, - } = bnsName; - // Try to figure out the name's expiration block based on its namespace's lifetime. - let expireBlock = expire_block; - const namespaceLifetime = await sql<{ lifetime: number }[]>` - SELECT lifetime - FROM namespaces - WHERE namespace_id = ${namespace_id} - AND canonical = true AND microblock_canonical = true - ORDER BY namespace_id, ready_block DESC, microblock_sequence DESC, tx_index DESC - LIMIT 1 - `; - if (namespaceLifetime.length > 0) { - expireBlock = registered_at + namespaceLifetime[0].lifetime; - } - // If the name was transferred, keep the expiration from the last register/renewal we had (if - // any). - if (status === 'name-transfer') { - const prevExpiration = await sql<{ expire_block: number }[]>` - SELECT expire_block - FROM names - WHERE name = ${name} - AND canonical = TRUE AND microblock_canonical = TRUE - ORDER BY registered_at DESC, microblock_sequence DESC, tx_index DESC + async updateNames(sql: PgSqlClient, tx: DataStoreBnsBlockTxData, names: DbBnsName[]) { + // TODO: Move these to CTE queries for optimization + for (const bnsName of names) { + const { + name, + address, + registered_at, + expire_block, + zonefile, + zonefile_hash, + namespace_id, + tx_id, + tx_index, + event_index, + status, + canonical, + } = bnsName; + // Try to figure out the name's expiration block based on its namespace's lifetime. + let expireBlock = expire_block; + const namespaceLifetime = await sql<{ lifetime: number }[]>` + SELECT lifetime + FROM namespaces + WHERE namespace_id = ${namespace_id} + AND canonical = true AND microblock_canonical = true + ORDER BY namespace_id, ready_block DESC, microblock_sequence DESC, tx_index DESC LIMIT 1 `; - if (prevExpiration.length > 0) { - expireBlock = prevExpiration[0].expire_block; + if (namespaceLifetime.length > 0) { + expireBlock = registered_at + namespaceLifetime[0].lifetime; } - } - // If we didn't receive a zonefile, keep the last valid one. - let finalZonefile = zonefile; - let finalZonefileHash = zonefile_hash; - if (finalZonefileHash === '') { - const lastZonefile = await sql<{ zonefile: string; zonefile_hash: string }[]>` - SELECT z.zonefile, z.zonefile_hash - FROM zonefiles AS z - INNER JOIN names AS n USING (name, tx_id, index_block_hash) - WHERE z.name = ${name} - AND n.canonical = TRUE - AND n.microblock_canonical = TRUE - ORDER BY n.registered_at DESC, n.microblock_sequence DESC, n.tx_index DESC - LIMIT 1 + // If the name was transferred, keep the expiration from the last register/renewal we had (if + // any). + if (status === 'name-transfer') { + const prevExpiration = await sql<{ expire_block: number }[]>` + SELECT expire_block + FROM names + WHERE name = ${name} + AND canonical = TRUE AND microblock_canonical = TRUE + ORDER BY registered_at DESC, microblock_sequence DESC, tx_index DESC + LIMIT 1 + `; + if (prevExpiration.length > 0) { + expireBlock = prevExpiration[0].expire_block; + } + } + // If we didn't receive a zonefile, keep the last valid one. + let finalZonefile = zonefile; + let finalZonefileHash = zonefile_hash; + if (finalZonefileHash === '') { + const lastZonefile = await sql<{ zonefile: string; zonefile_hash: string }[]>` + SELECT z.zonefile, z.zonefile_hash + FROM zonefiles AS z + INNER JOIN names AS n USING (name, tx_id, index_block_hash) + WHERE z.name = ${name} + AND n.canonical = TRUE + AND n.microblock_canonical = TRUE + ORDER BY n.registered_at DESC, n.microblock_sequence DESC, n.tx_index DESC + LIMIT 1 + `; + if (lastZonefile.length > 0) { + finalZonefile = lastZonefile[0].zonefile; + finalZonefileHash = lastZonefile[0].zonefile_hash; + } + } + const validZonefileHash = validateZonefileHash(finalZonefileHash); + const zonefileValues: BnsZonefileInsertValues = { + name: name, + zonefile: finalZonefile, + zonefile_hash: validZonefileHash, + tx_id: tx_id, + index_block_hash: tx.index_block_hash, + }; + await sql` + INSERT INTO zonefiles ${sql(zonefileValues)} + ON CONFLICT ON CONSTRAINT unique_name_zonefile_hash_tx_id_index_block_hash DO + UPDATE SET zonefile = EXCLUDED.zonefile + `; + const nameValues: BnsNameInsertValues = { + name: name, + address: address, + registered_at: registered_at, + expire_block: expireBlock, + zonefile_hash: validZonefileHash, + namespace_id: namespace_id, + tx_index: tx_index, + tx_id: tx_id, + event_index: event_index ?? null, + status: status ?? null, + canonical: canonical, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + }; + await sql` + INSERT INTO names ${sql(nameValues)} + ON CONFLICT ON CONSTRAINT unique_name_tx_id_index_block_hash_microblock_hash_event_index DO + UPDATE SET + address = EXCLUDED.address, + registered_at = EXCLUDED.registered_at, + expire_block = EXCLUDED.expire_block, + zonefile_hash = EXCLUDED.zonefile_hash, + namespace_id = EXCLUDED.namespace_id, + tx_index = EXCLUDED.tx_index, + event_index = EXCLUDED.event_index, + status = EXCLUDED.status, + canonical = EXCLUDED.canonical, + parent_index_block_hash = EXCLUDED.parent_index_block_hash, + microblock_sequence = EXCLUDED.microblock_sequence, + microblock_canonical = EXCLUDED.microblock_canonical `; - if (lastZonefile.length > 0) { - finalZonefile = lastZonefile[0].zonefile; - finalZonefileHash = lastZonefile[0].zonefile_hash; - } } - const validZonefileHash = validateZonefileHash(finalZonefileHash); - const zonefileValues: BnsZonefileInsertValues = { - name: name, - zonefile: finalZonefile, - zonefile_hash: validZonefileHash, - tx_id: tx_id, - index_block_hash: blockData.index_block_hash, - }; - await sql` - INSERT INTO zonefiles ${sql(zonefileValues)} - ON CONFLICT ON CONSTRAINT unique_name_zonefile_hash_tx_id_index_block_hash DO - UPDATE SET zonefile = EXCLUDED.zonefile - `; - const nameValues: BnsNameInsertValues = { - name: name, - address: address, - registered_at: registered_at, - expire_block: expireBlock, - zonefile_hash: validZonefileHash, - namespace_id: namespace_id, - tx_index: tx_index, - tx_id: tx_id, - event_index: event_index ?? null, - status: status ?? null, - canonical: canonical, - index_block_hash: blockData.index_block_hash, - parent_index_block_hash: blockData.parent_index_block_hash, - microblock_hash: blockData.microblock_hash, - microblock_sequence: blockData.microblock_sequence, - microblock_canonical: blockData.microblock_canonical, - }; - await sql` - INSERT INTO names ${sql(nameValues)} - ON CONFLICT ON CONSTRAINT unique_name_tx_id_index_block_hash_microblock_hash_event_index DO - UPDATE SET - address = EXCLUDED.address, - registered_at = EXCLUDED.registered_at, - expire_block = EXCLUDED.expire_block, - zonefile_hash = EXCLUDED.zonefile_hash, - namespace_id = EXCLUDED.namespace_id, - tx_index = EXCLUDED.tx_index, - event_index = EXCLUDED.event_index, - status = EXCLUDED.status, - canonical = EXCLUDED.canonical, - parent_index_block_hash = EXCLUDED.parent_index_block_hash, - microblock_sequence = EXCLUDED.microblock_sequence, - microblock_canonical = EXCLUDED.microblock_canonical - `; } async updateNamespaces( sql: PgSqlClient, - blockData: { - index_block_hash: string; - parent_index_block_hash: string; - microblock_hash: string; - microblock_sequence: number; - microblock_canonical: boolean; - }, - bnsNamespace: DbBnsNamespace + tx: DataStoreBnsBlockTxData, + namespaces: DbBnsNamespace[] ) { - const values: BnsNamespaceInsertValues = { - namespace_id: bnsNamespace.namespace_id, - launched_at: bnsNamespace.launched_at ?? null, - address: bnsNamespace.address, - reveal_block: bnsNamespace.reveal_block, - ready_block: bnsNamespace.ready_block, - buckets: bnsNamespace.buckets, - base: bnsNamespace.base.toString(), - coeff: bnsNamespace.coeff.toString(), - nonalpha_discount: bnsNamespace.nonalpha_discount.toString(), - no_vowel_discount: bnsNamespace.no_vowel_discount.toString(), - lifetime: bnsNamespace.lifetime, - status: bnsNamespace.status ?? null, - tx_index: bnsNamespace.tx_index, - tx_id: bnsNamespace.tx_id, - canonical: bnsNamespace.canonical, - index_block_hash: blockData.index_block_hash, - parent_index_block_hash: blockData.parent_index_block_hash, - microblock_hash: blockData.microblock_hash, - microblock_sequence: blockData.microblock_sequence, - microblock_canonical: blockData.microblock_canonical, - }; - await sql` - INSERT INTO namespaces ${sql(values)} - ON CONFLICT ON CONSTRAINT unique_namespace_id_tx_id_index_block_hash_microblock_hash DO - UPDATE SET - launched_at = EXCLUDED.launched_at, - address = EXCLUDED.address, - reveal_block = EXCLUDED.reveal_block, - ready_block = EXCLUDED.ready_block, - buckets = EXCLUDED.buckets, - base = EXCLUDED.base, - coeff = EXCLUDED.coeff, - nonalpha_discount = EXCLUDED.nonalpha_discount, - no_vowel_discount = EXCLUDED.no_vowel_discount, - lifetime = EXCLUDED.lifetime, - status = EXCLUDED.status, - tx_index = EXCLUDED.tx_index, - canonical = EXCLUDED.canonical, - parent_index_block_hash = EXCLUDED.parent_index_block_hash, - microblock_sequence = EXCLUDED.microblock_sequence, - microblock_canonical = EXCLUDED.microblock_canonical - `; + for (const batch of batchIterate(namespaces, INSERT_BATCH_SIZE)) { + const values: BnsNamespaceInsertValues[] = batch.map(namespace => ({ + namespace_id: namespace.namespace_id, + launched_at: namespace.launched_at ?? null, + address: namespace.address, + reveal_block: namespace.reveal_block, + ready_block: namespace.ready_block, + buckets: namespace.buckets, + base: namespace.base.toString(), + coeff: namespace.coeff.toString(), + nonalpha_discount: namespace.nonalpha_discount.toString(), + no_vowel_discount: namespace.no_vowel_discount.toString(), + lifetime: namespace.lifetime, + status: namespace.status ?? null, + tx_index: namespace.tx_index, + tx_id: namespace.tx_id, + canonical: namespace.canonical, + index_block_hash: tx.index_block_hash, + parent_index_block_hash: tx.parent_index_block_hash, + microblock_hash: tx.microblock_hash, + microblock_sequence: tx.microblock_sequence, + microblock_canonical: tx.microblock_canonical, + })); + await sql` + INSERT INTO namespaces ${sql(values)} + ON CONFLICT ON CONSTRAINT unique_namespace_id_tx_id_index_block_hash_microblock_hash DO + UPDATE SET + launched_at = EXCLUDED.launched_at, + address = EXCLUDED.address, + reveal_block = EXCLUDED.reveal_block, + ready_block = EXCLUDED.ready_block, + buckets = EXCLUDED.buckets, + base = EXCLUDED.base, + coeff = EXCLUDED.coeff, + nonalpha_discount = EXCLUDED.nonalpha_discount, + no_vowel_discount = EXCLUDED.no_vowel_discount, + lifetime = EXCLUDED.lifetime, + status = EXCLUDED.status, + tx_index = EXCLUDED.tx_index, + canonical = EXCLUDED.canonical, + parent_index_block_hash = EXCLUDED.parent_index_block_hash, + microblock_sequence = EXCLUDED.microblock_sequence, + microblock_canonical = EXCLUDED.microblock_canonical + `; + } } async updateBatchTokenOfferingLocked(sql: PgSqlClient, lockedInfos: DbTokenOfferingLocked[]) { @@ -2077,36 +2056,18 @@ export class PgWriteStore extends PgStore { ); } - await this.updateBatchStxEvents(sql, entry.tx, entry.stxEvents); + await this.updateStxEvents(sql, entry.tx, entry.stxEvents); await this.updatePrincipalStxTxs(sql, entry.tx, entry.stxEvents); - await this.updateBatchSmartContractEvent(sql, entry.tx, entry.contractLogEvents); - for (const pox2Event of entry.pox2Events) { - await this.updatePoxSyntheticEvent(sql, entry.tx, 'pox2_events', pox2Event); - } - for (const pox3Event of entry.pox3Events) { - await this.updatePoxSyntheticEvent(sql, entry.tx, 'pox3_events', pox3Event); - } - for (const pox4Event of entry.pox4Events) { - await this.updatePoxSyntheticEvent(sql, entry.tx, 'pox4_events', pox4Event); - } - for (const stxLockEvent of entry.stxLockEvents) { - await this.updateStxLockEvent(sql, entry.tx, stxLockEvent); - } - for (const ftEvent of entry.ftEvents) { - await this.updateFtEvent(sql, entry.tx, ftEvent); - } - for (const nftEvent of entry.nftEvents) { - await this.updateNftEvent(sql, entry.tx, nftEvent, true); - } - for (const smartContract of entry.smartContracts) { - await this.updateSmartContract(sql, entry.tx, smartContract); - } - for (const namespace of entry.namespaces) { - await this.updateNamespaces(sql, entry.tx, namespace); - } - for (const bnsName of entry.names) { - await this.updateNames(sql, entry.tx, bnsName); - } + await this.updateSmartContractEvents(sql, entry.tx, entry.contractLogEvents); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox2_events', entry.pox2Events); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox3_events', entry.pox3Events); + await this.updatePoxSyntheticEvents(sql, entry.tx, 'pox4_events', entry.pox4Events); + await this.updateStxLockEvents(sql, entry.tx, entry.stxLockEvents); + await this.updateFtEvents(sql, entry.tx, entry.ftEvents); + await this.updateNftEvents(sql, entry.tx, entry.nftEvents, true); + await this.updateSmartContracts(sql, entry.tx, entry.smartContracts); + await this.updateNamespaces(sql, entry.tx, entry.namespaces); + await this.updateNames(sql, entry.tx, entry.names); } } @@ -2383,10 +2344,7 @@ export class PgWriteStore extends PgStore { * @param txIds - List of transactions to update in the mempool */ async pruneMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ removedTxs: string[] }> { - if (txIds.length === 0) { - // Avoid an unnecessary query. - return { removedTxs: [] }; - } + if (txIds.length === 0) return { removedTxs: [] }; for (const txId of txIds) { logger.debug(`Pruning mempool tx: ${txId}`); } @@ -2433,7 +2391,7 @@ export class PgWriteStore extends PgStore { sql: PgSqlClient, indexBlockHash: string, canonical: boolean, - updatedEntities: UpdatedEntities + updatedEntities: ReOrgUpdatedEntities ): Promise<{ txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] }> { const txResult = await sql` UPDATE txs @@ -2615,8 +2573,8 @@ export class PgWriteStore extends PgStore { async restoreOrphanedChain( sql: PgSqlClient, indexBlockHash: string, - updatedEntities: UpdatedEntities - ): Promise { + updatedEntities: ReOrgUpdatedEntities + ): Promise { // Restore the previously orphaned block to canonical const restoredBlockResult = await sql` UPDATE blocks @@ -2742,46 +2700,8 @@ export class PgWriteStore extends PgStore { sql: PgSqlClient, block: DbBlock, chainTipHeight: number - ): Promise { - const updatedEntities: UpdatedEntities = { - markedCanonical: { - blocks: 0, - microblocks: 0, - minerRewards: 0, - txs: 0, - stxLockEvents: 0, - stxEvents: 0, - ftEvents: 0, - nftEvents: 0, - pox2Events: 0, - pox3Events: 0, - pox4Events: 0, - contractLogs: 0, - smartContracts: 0, - names: 0, - namespaces: 0, - subdomains: 0, - }, - markedNonCanonical: { - blocks: 0, - microblocks: 0, - minerRewards: 0, - txs: 0, - stxLockEvents: 0, - stxEvents: 0, - ftEvents: 0, - nftEvents: 0, - pox2Events: 0, - pox3Events: 0, - pox4Events: 0, - contractLogs: 0, - smartContracts: 0, - names: 0, - namespaces: 0, - subdomains: 0, - }, - }; - + ): Promise { + const updatedEntities = newReOrgUpdatedEntities(); // Check if incoming block's parent is canonical if (block.block_height > 1) { const parentResult = await sql< @@ -2796,26 +2716,23 @@ export class PgWriteStore extends PgStore { WHERE block_height = ${block.block_height - 1} AND index_block_hash = ${block.parent_index_block_hash} `; - - if (parentResult.length > 1) { + if (parentResult.length > 1) throw new Error( `DB contains multiple blocks at height ${block.block_height - 1} and index_hash ${ block.parent_index_block_hash }` ); - } - if (parentResult.length === 0) { + if (parentResult.length === 0) throw new Error( `DB does not contain a parent block at height ${block.block_height - 1} with index_hash ${ block.parent_index_block_hash }` ); - } - - // This blocks builds off a previously orphaned chain. Restore canonical status for this chain. + // This blocks builds off a previously orphaned chain. Restore canonical status for this + // chain. if (!parentResult[0].canonical && block.block_height > chainTipHeight) { await this.restoreOrphanedChain(sql, parentResult[0].index_block_hash, updatedEntities); - this.logReorgResultInfo(updatedEntities); + logReorgResultInfo(updatedEntities); } // Reflect updated transaction totals in `chain_tip` table. const txCountDelta = @@ -2829,68 +2746,6 @@ export class PgWriteStore extends PgStore { return updatedEntities; } - logReorgResultInfo(updatedEntities: UpdatedEntities) { - const updates = [ - ['blocks', updatedEntities.markedCanonical.blocks, updatedEntities.markedNonCanonical.blocks], - [ - 'microblocks', - updatedEntities.markedCanonical.microblocks, - updatedEntities.markedNonCanonical.microblocks, - ], - ['txs', updatedEntities.markedCanonical.txs, updatedEntities.markedNonCanonical.txs], - [ - 'miner-rewards', - updatedEntities.markedCanonical.minerRewards, - updatedEntities.markedNonCanonical.minerRewards, - ], - [ - 'stx-lock events', - updatedEntities.markedCanonical.stxLockEvents, - updatedEntities.markedNonCanonical.stxLockEvents, - ], - [ - 'stx-token events', - updatedEntities.markedCanonical.stxEvents, - updatedEntities.markedNonCanonical.stxEvents, - ], - [ - 'non-fungible-token events', - updatedEntities.markedCanonical.nftEvents, - updatedEntities.markedNonCanonical.nftEvents, - ], - [ - 'fungible-token events', - updatedEntities.markedCanonical.ftEvents, - updatedEntities.markedNonCanonical.ftEvents, - ], - [ - 'contract logs', - updatedEntities.markedCanonical.contractLogs, - updatedEntities.markedNonCanonical.contractLogs, - ], - [ - 'smart contracts', - updatedEntities.markedCanonical.smartContracts, - updatedEntities.markedNonCanonical.smartContracts, - ], - ['names', updatedEntities.markedCanonical.names, updatedEntities.markedNonCanonical.names], - [ - 'namespaces', - updatedEntities.markedCanonical.namespaces, - updatedEntities.markedNonCanonical.namespaces, - ], - [ - 'subdomains', - updatedEntities.markedCanonical.subdomains, - updatedEntities.markedNonCanonical.subdomains, - ], - ]; - const markedCanonical = updates.map(e => `${e[1]} ${e[0]}`).join(', '); - logger.debug(`Entities marked as canonical: ${markedCanonical}`); - const markedNonCanonical = updates.map(e => `${e[2]} ${e[0]}`).join(', '); - logger.debug(`Entities marked as non-canonical: ${markedNonCanonical}`); - } - /** * Refreshes a Postgres materialized view. * @param viewName - Materialized view name diff --git a/src/event-replay/helpers.ts b/src/event-replay/helpers.ts index 3cec50ea..6a418f3a 100644 --- a/src/event-replay/helpers.ts +++ b/src/event-replay/helpers.ts @@ -1,14 +1,6 @@ import { PgWriteStore } from '../datastore/pg-write-store'; -import * as fs from 'fs'; -import * as readline from 'readline'; -import { DataStoreBnsBlockData, DbTxTypeId } from '../datastore/common'; +import { DataStoreBnsBlockTxData, DbTxTypeId } from '../datastore/common'; import { readLinesReversed } from './reverse-file-stream'; -import { CoreNodeBlockMessage } from '../event-stream/core-node-message'; - -export type BnsGenesisBlock = DataStoreBnsBlockData & { - tx_id: string; - tx_index: number; -}; /** * Traverse a TSV file in reverse to find the last received `/new_block` node message and return @@ -40,7 +32,7 @@ export async function findTsvBlockHeight(filePath: string): Promise { export async function getBnsGenesisBlockFromBlockMessage( db: PgWriteStore -): Promise { +): Promise { const genesisBlock = await db.getBlock({ height: 1 }); if (!genesisBlock.found) { throw new Error('Could not find genesis block'); diff --git a/src/event-replay/parquet-based/importers/attachment-new-importer.ts b/src/event-replay/parquet-based/importers/attachment-new-importer.ts index 43a5e2a0..76e484c8 100644 --- a/src/event-replay/parquet-based/importers/attachment-new-importer.ts +++ b/src/event-replay/parquet-based/importers/attachment-new-importer.ts @@ -3,7 +3,6 @@ import { Readable, Writable } from 'stream'; import { pipeline } from 'stream/promises'; -import { batchIterate } from '../../../helpers'; import { PgWriteStore } from '../../../datastore/pg-write-store'; import { parseAttachment } from '../../../event-stream/event-server'; import { logger } from '../../../logger'; @@ -11,6 +10,7 @@ import { CoreNodeAttachmentMessage } from '../../../event-stream/core-node-messa import { DataStoreAttachmentSubdomainData } from '../../../datastore/common'; import { DatasetStore } from '../dataset/store'; import { I32_MAX } from '../../../helpers'; +import { batchIterate } from '@hirosystems/api-toolkit'; const batchInserters: BatchInserter[] = []; diff --git a/src/event-replay/parquet-based/importers/new-block-importer.ts b/src/event-replay/parquet-based/importers/new-block-importer.ts index 869d6f3d..ff7e76a0 100644 --- a/src/event-replay/parquet-based/importers/new-block-importer.ts +++ b/src/event-replay/parquet-based/importers/new-block-importer.ts @@ -17,9 +17,10 @@ import { } from '../../../datastore/common'; import { validateZonefileHash } from '../../../datastore/helpers'; import { logger } from '../../../logger'; -import { getApiConfiguredChainID, batchIterate } from '../../../helpers'; +import { getApiConfiguredChainID } from '../../../helpers'; import { CoreNodeBlockMessage } from '../../../event-stream/core-node-message'; import { DatasetStore } from '../dataset/store'; +import { batchIterate } from '@hirosystems/api-toolkit'; const chainID = getApiConfiguredChainID(); @@ -361,9 +362,7 @@ const populateBatchInserters = (db: PgWriteStore) => { const insertSmartContracts = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - for (const smartContract of entry.smartContracts) { - await db.updateSmartContract(db.sql, entry.tx, smartContract); - } + await db.updateSmartContracts(db.sql, entry.tx, entry.smartContracts); } }; @@ -377,39 +376,29 @@ const populateBatchInserters = (db: PgWriteStore) => { const insertStxLockEvents = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - for (const stxLockEvent of entry.stxLockEvents) { - await db.updateStxLockEvent(db.sql, entry.tx, stxLockEvent); - } + await db.updateStxLockEvents(db.sql, entry.tx, entry.stxLockEvents); } }; const insertMinerRewards = async (dbData: DataStoreBlockUpdateData) => { - for (const minerReward of dbData.minerRewards) { - await db.updateMinerReward(db.sql, minerReward); - } + await db.updateMinerRewards(db.sql, dbData.minerRewards); }; const insertPox2Events = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - for (const pox2Event of entry.pox2Events) { - await db.updatePoxSyntheticEvent(db.sql, entry.tx, 'pox2_events', pox2Event); - } + await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox2_events', entry.pox2Events); } }; const insertPox3Events = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - for (const pox3Event of entry.pox3Events) { - await db.updatePoxSyntheticEvent(db.sql, entry.tx, 'pox3_events', pox3Event); - } + await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox3_events', entry.pox3Events); } }; const insertPox4Events = async (dbData: DataStoreBlockUpdateData) => { for (const entry of dbData.txs) { - for (const pox4Event of entry.pox4Events) { - await db.updatePoxSyntheticEvent(db.sql, entry.tx, 'pox4_events', pox4Event); - } + await db.updatePoxSyntheticEvents(db.sql, entry.tx, 'pox4_events', entry.pox4Events); } }; diff --git a/src/event-replay/parquet-based/importers/raw-importer.ts b/src/event-replay/parquet-based/importers/raw-importer.ts index 9fc44805..d86f9663 100644 --- a/src/event-replay/parquet-based/importers/raw-importer.ts +++ b/src/event-replay/parquet-based/importers/raw-importer.ts @@ -3,8 +3,8 @@ import { pipeline } from 'stream/promises'; import { PgWriteStore } from '../../../datastore/pg-write-store'; import { RawEventRequestInsertValues } from '../../../datastore/common'; import { logger } from '../../../logger'; -import { batchIterate } from '../../../helpers'; import { DatasetStore } from '../dataset/store'; +import { batchIterate } from '@hirosystems/api-toolkit'; const batchInserters: BatchInserter[] = []; diff --git a/src/helpers.ts b/src/helpers.ts index ac8ab821..2273814a 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -440,71 +440,6 @@ export function assertNotNullish( return true; } -/** - * Iterate over an array, yielding multiple items at a time. If the size of the given array - * is not divisible by the given batch size, then the length of the last items returned will - * be smaller than the given batch size, i.e.: - * ```typescript - * items.length % batchSize - * ``` - * @param items - The array to iterate over. - * @param batchSize - Maximum number of items to return at a time. - */ -export function* batchIterate( - items: T[], - batchSize: number, - printBenchmark = isDevEnv -): Generator { - if (items.length === 0) { - return; - } - const startTime = Date.now(); - for (let i = 0; i < items.length; ) { - const itemsRemaining = items.length - i; - const sliceSize = Math.min(batchSize, itemsRemaining); - yield items.slice(i, i + sliceSize); - i += sliceSize; - } - - if (printBenchmark) { - const itemsPerSecond = Math.round((items.length / (Date.now() - startTime)) * 1000); - const caller = new Error().stack?.split('at ')[3].trim(); - logger.debug(`Iterated ${itemsPerSecond} items/second at ${caller}`); - } -} - -export async function* asyncBatchIterate( - items: AsyncIterable, - batchSize: number, - printBenchmark = isDevEnv -): AsyncGenerator { - const startTime = Date.now(); - let itemCount = 0; - let itemBatch: T[] = []; - for await (const item of items) { - itemBatch.push(item); - itemCount++; - if (itemBatch.length >= batchSize) { - yield itemBatch; - itemBatch = []; - if (printBenchmark) { - const itemsPerSecond = Math.round((itemCount / (Date.now() - startTime)) * 1000); - const caller = new Error().stack?.split('at ')[3].trim(); - logger.debug(`Iterated ${itemsPerSecond} items/second at ${caller}`); - } - } - } - if (itemBatch.length > 0) { - yield itemBatch; - } -} - -export async function* asyncIterableToGenerator(iter: AsyncIterable) { - for await (const entry of iter) { - yield entry; - } -} - function intMax(args: bigint[]): bigint; function intMax(args: number[]): number; function intMax(args: bigint[] | number[]): any { diff --git a/src/import-v1/index.ts b/src/import-v1/index.ts index 327e0de4..d016578c 100644 --- a/src/import-v1/index.ts +++ b/src/import-v1/index.ts @@ -8,17 +8,18 @@ import * as zlib from 'zlib'; import { bitcoinToStacksAddress } from 'stacks-encoding-native-js'; import * as split2 from 'split2'; import { + DataStoreBnsBlockTxData, DbBnsName, DbBnsNamespace, DbBnsSubdomain, DbConfigState, DbTokenOfferingLocked, } from '../datastore/common'; -import { asyncBatchIterate, asyncIterableToGenerator, I32_MAX, REPO_DIR } from '../helpers'; -import { BnsGenesisBlock, getBnsGenesisBlockFromBlockMessage } from '../event-replay/helpers'; +import { REPO_DIR } from '../helpers'; +import { getBnsGenesisBlockFromBlockMessage } from '../event-replay/helpers'; import { PgWriteStore } from '../datastore/pg-write-store'; import { logger } from '../logger'; -import { PgSqlClient } from '@hirosystems/api-toolkit'; +import { PgSqlClient, asyncBatchIterate, asyncIterableToGenerator } from '@hirosystems/api-toolkit'; const finished = util.promisify(stream.finished); const pipeline = util.promisify(stream.pipeline); @@ -78,13 +79,13 @@ class ChainProcessor extends stream.Writable { namespace: Map; db: PgWriteStore; sql: PgSqlClient; - genesisBlock: BnsGenesisBlock; + genesisBlock: DataStoreBnsBlockTxData; constructor( sql: PgSqlClient, db: PgWriteStore, zhashes: Map, - genesisBlock: BnsGenesisBlock + genesisBlock: DataStoreBnsBlockTxData ) { super(); this.zhashes = zhashes; @@ -159,7 +160,7 @@ class ChainProcessor extends stream.Writable { canonical: true, status: 'name-register', }; - await this.db.updateNames(this.sql, this.genesisBlock, obj); + await this.db.updateNames(this.sql, this.genesisBlock, [obj]); this.rowCount += 1; if (obj.zonefile === '') { logger.debug( @@ -186,7 +187,7 @@ class ChainProcessor extends stream.Writable { canonical: true, }; this.namespace.set(obj.namespace_id, obj); - await this.db.updateNamespaces(this.sql, this.genesisBlock, obj); + await this.db.updateNamespaces(this.sql, this.genesisBlock, [obj]); this.rowCount += 1; } } @@ -230,9 +231,9 @@ function btcToStxAddress(btcAddress: string) { } class SubdomainTransform extends stream.Transform { - genesisBlock: BnsGenesisBlock; + genesisBlock: DataStoreBnsBlockTxData; - constructor(genesisBlock: BnsGenesisBlock) { + constructor(genesisBlock: DataStoreBnsBlockTxData) { super({ objectMode: true, highWaterMark: SUBDOMAIN_BATCH_SIZE }); this.genesisBlock = genesisBlock; } @@ -304,7 +305,7 @@ async function valid(fileName: string): Promise { return true; } -async function* readSubdomains(importDir: string, genesisBlock: BnsGenesisBlock) { +async function* readSubdomains(importDir: string, genesisBlock: DataStoreBnsBlockTxData) { const metaIter = asyncIterableToGenerator( stream.pipeline( fs.createReadStream(path.join(importDir, 'subdomains.csv')), @@ -416,7 +417,7 @@ async function validateBnsImportDir(importDir: string, importFiles: string[]) { export async function importV1BnsNames( db: PgWriteStore, importDir: string, - genesisBlock: BnsGenesisBlock + genesisBlock: DataStoreBnsBlockTxData ) { const configState = await db.getConfigState(); if (configState.bns_names_onchain_imported) { @@ -444,7 +445,7 @@ export async function importV1BnsNames( export async function importV1BnsSubdomains( db: PgWriteStore, importDir: string, - genesisBlock: BnsGenesisBlock + genesisBlock: DataStoreBnsBlockTxData ) { const configState = await db.getConfigState(); if (configState.bns_subdomains_imported) { diff --git a/src/tests-bns/api.ts b/src/tests-bns/api.ts index 28438175..b379839a 100644 --- a/src/tests-bns/api.ts +++ b/src/tests-bns/api.ts @@ -129,8 +129,10 @@ describe('BNS API tests', () => { microblock_hash: '', microblock_sequence: I32_MAX, microblock_canonical: true, + tx_id: '', + tx_index: 0, }, - namespace + [namespace] ); const namespace2: DbBnsNamespace = { namespace_id: 'blockstack', @@ -157,8 +159,10 @@ describe('BNS API tests', () => { microblock_hash: '', microblock_sequence: I32_MAX, microblock_canonical: true, + tx_id: '', + tx_index: 0, }, - namespace2 + [namespace2] ); }); @@ -434,8 +438,10 @@ describe('BNS API tests', () => { microblock_hash: '', microblock_sequence: I32_MAX, microblock_canonical: true, + tx_id: '', + tx_index: 0, }, - dbName + [dbName] ); const query1 = await supertest(api.server).get(`/v1/names/invalid/zonefile/${zonefileHash}`); @@ -531,8 +537,10 @@ describe('BNS API tests', () => { microblock_hash: '', microblock_sequence: I32_MAX, microblock_canonical: true, + tx_id: '', + tx_index: 0, }, - dbName2 + [dbName2] ); const query1 = await supertest(api.server).get(`/v1/addresses/${blockchain}/${address}`); diff --git a/src/tests-bns/v1-import-tests.ts b/src/tests-bns/v1-import-tests.ts index cb67dfaf..accb52ca 100644 --- a/src/tests-bns/v1-import-tests.ts +++ b/src/tests-bns/v1-import-tests.ts @@ -4,8 +4,7 @@ import { ChainID } from '@stacks/transactions'; import { importV1BnsNames, importV1BnsSubdomains } from '../import-v1'; import * as assert from 'assert'; import { TestBlockBuilder } from '../test-utils/test-builders'; -import { DataStoreBlockUpdateData } from '../datastore/common'; -import { BnsGenesisBlock } from '../event-replay/helpers'; +import { DataStoreBlockUpdateData, DataStoreBnsBlockTxData } from '../datastore/common'; import { PgWriteStore } from '../datastore/pg-write-store'; import { migrate } from '../test-utils/test-helpers'; @@ -30,7 +29,7 @@ describe('BNS V1 import', () => { }); test('v1-import', async () => { - const genesis: BnsGenesisBlock = { + const genesis: DataStoreBnsBlockTxData = { index_block_hash: block.block.index_block_hash, parent_index_block_hash: block.block.parent_index_block_hash, microblock_canonical: true, diff --git a/src/tests/datastore-tests.ts b/src/tests/datastore-tests.ts index 683651c0..34804796 100644 --- a/src/tests/datastore-tests.ts +++ b/src/tests/datastore-tests.ts @@ -19,6 +19,7 @@ import { DbBnsSubdomain, DbTokenOfferingLocked, DbTx, + DataStoreBnsBlockTxData, } from '../datastore/common'; import { getBlocksWithMetadata, parseDbEvent } from '../api/controllers/db-controller'; import * as assert from 'assert'; @@ -95,9 +96,7 @@ describe('postgres datastore', () => { createMinerReward('addrB', 0n, 30n, 40n, 7n), createMinerReward('addrB', 99999n, 92n, 93n, 0n, false), ]; - for (const reward of minerRewards) { - await db.updateMinerReward(client, reward); - } + await db.updateMinerRewards(client, minerRewards); const tx: DbTxRaw = { tx_id: '0x1234', @@ -196,9 +195,7 @@ describe('postgres datastore', () => { createStxLockEvent('addrA', 222n, 1), createStxLockEvent('addrB', 333n, 1), ]; - for (const stxLockEvent of stxLockEvents) { - await db.updateStxLockEvent(client, tx, stxLockEvent); - } + await db.updateStxLockEvents(client, tx, stxLockEvents); await db.updateTx(client, tx); await db.updateTx(client, tx2); @@ -3515,9 +3512,7 @@ describe('postgres datastore', () => { } // insert miner rewards directly - for (const minerReward of [minerReward1]) { - await db.updateMinerReward(client, minerReward); - } + await db.updateMinerRewards(client, [minerReward1]); // insert txs directly for (const tx of [tx1, tx2]) { @@ -3525,9 +3520,7 @@ describe('postgres datastore', () => { } // insert stx lock events directly - for (const event of [stxLockEvent1]) { - await db.updateStxLockEvent(client, tx1, event); - } + await db.updateStxLockEvents(client, tx1, [stxLockEvent1]); const block5: DbBlock = { block_hash: '0x55', @@ -4616,8 +4609,8 @@ describe('postgres datastore', () => { microblock_hash: '0x00', microblock_sequence: I32_MAX, microblock_canonical: true, - }, - namespace + } as DataStoreBnsBlockTxData, + [namespace] ); const { results } = await db.getNamespaceList({ includeUnanchored: false }); expect(results.length).toBe(1); @@ -4672,8 +4665,8 @@ describe('postgres datastore', () => { microblock_hash: '0x00', microblock_sequence: I32_MAX, microblock_canonical: true, - }, - name + } as DataStoreBnsBlockTxData, + [name] ); const { results } = await db.getNamespaceNamesList({ namespace: 'abc', diff --git a/src/tests/other-tests.ts b/src/tests/other-tests.ts index 217b59bc..a4a7f9a8 100644 --- a/src/tests/other-tests.ts +++ b/src/tests/other-tests.ts @@ -183,7 +183,7 @@ describe('other tests', () => { tx_fees_streamed_confirmed: 2_000_000_000_000n, tx_fees_streamed_produced: 3_000_000_000_000n, }; - await db.updateMinerReward(client, minerReward1); + await db.updateMinerRewards(client, [minerReward1]); const expectedTotalStx3 = stxMintEvent1.amount + stxMintEvent2.amount - diff --git a/src/tests/search-tests.ts b/src/tests/search-tests.ts index 69b8b62b..ea1d5d0b 100644 --- a/src/tests/search-tests.ts +++ b/src/tests/search-tests.ts @@ -770,7 +770,7 @@ describe('search tests', () => { recipient: addr5, sender: 'none', }; - await db.updateFtEvent(client, stxTx1, ftEvent1); + await db.updateFtEvents(client, stxTx1, [ftEvent1]); // test address as a ft event recipient const searchResult5 = await supertest(api.server).get(`/extended/v1/search/${addr5}`); @@ -798,7 +798,7 @@ describe('search tests', () => { recipient: 'none', sender: addr6, }; - await db.updateFtEvent(client, stxTx1, ftEvent2); + await db.updateFtEvents(client, stxTx1, [ftEvent2]); // test address as a ft event sender const searchResult6 = await supertest(api.server).get(`/extended/v1/search/${addr6}`); @@ -826,7 +826,7 @@ describe('search tests', () => { recipient: addr7, sender: 'none', }; - await db.updateNftEvent(client, stxTx1, nftEvent1, false); + await db.updateNftEvents(client, stxTx1, [nftEvent1], false); // test address as a nft event recipient const searchResult7 = await supertest(api.server).get(`/extended/v1/search/${addr7}`); @@ -854,7 +854,7 @@ describe('search tests', () => { recipient: 'none', sender: addr8, }; - await db.updateNftEvent(client, stxTx1, nftEvent2, false); + await db.updateNftEvents(client, stxTx1, [nftEvent2], false); // test address as a nft event sender const searchResult8 = await supertest(api.server).get(`/extended/v1/search/${addr8}`); diff --git a/src/tests/tx-tests.ts b/src/tests/tx-tests.ts index 8a7ed7b0..445196d5 100644 --- a/src/tests/tx-tests.ts +++ b/src/tests/tx-tests.ts @@ -1045,15 +1045,17 @@ describe('tx tests', () => { fungible_tokens: [], non_fungible_tokens: [], }; - await db.updateSmartContract(client, dbTx, { - tx_id: dbTx.tx_id, - canonical: true, - clarity_version: null, - contract_id: 'ST11NJTTKGVT6D1HY4NJRVQWMQM7TVAR091EJ8P2Y.hello-world', - block_height: dbBlock.block_height, - source_code: '()', - abi: JSON.stringify(contractAbi), - }); + await db.updateSmartContracts(client, dbTx, [ + { + tx_id: dbTx.tx_id, + canonical: true, + clarity_version: null, + contract_id: 'ST11NJTTKGVT6D1HY4NJRVQWMQM7TVAR091EJ8P2Y.hello-world', + block_height: dbBlock.block_height, + source_code: '()', + abi: JSON.stringify(contractAbi), + }, + ]); const txQuery = await getTxFromDataStore(db, { txId: dbTx.tx_id, includeUnanchored: false }); expect(txQuery.found).toBe(true); if (!txQuery.found) {