From fb0d0eaa93a0614c54cfa28464fe5df25ac9c7dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Wed, 25 Oct 2023 10:48:03 -0600 Subject: [PATCH] fix: move nft custody view into a table (#1741) * fix: move to table * fix: old nft events query * fix: nft-custody-table migration * fix: no longer rename old views, just remove them --------- Co-authored-by: Matt --- .../1696872367486_nft-custody-tables.js | 215 ++++++++++++++++++ src/datastore/common.ts | 14 ++ src/datastore/pg-store.ts | 7 +- src/datastore/pg-write-store.ts | 161 +++++++++---- src/tests/search-tests.ts | 4 +- src/tests/token-tests.ts | 1 - 6 files changed, 349 insertions(+), 53 deletions(-) create mode 100644 migrations/1696872367486_nft-custody-tables.js diff --git a/migrations/1696872367486_nft-custody-tables.js b/migrations/1696872367486_nft-custody-tables.js new file mode 100644 index 00000000..f74a748f --- /dev/null +++ b/migrations/1696872367486_nft-custody-tables.js @@ -0,0 +1,215 @@ +/* eslint-disable camelcase */ + +exports.shorthands = undefined; + +exports.up = pgm => { + pgm.dropMaterializedView('nft_custody'); + pgm.createTable('nft_custody', { + asset_identifier: { + type: 'string', + notNull: true, + }, + value: { + type: 'bytea', + notNull: true, + }, + recipient: { + type: 'text', + }, + block_height: { + type: 'integer', + notNull: true, + }, + index_block_hash: { + type: 'bytea', + notNull: true, + }, + parent_index_block_hash: { + type: 'bytea', + notNull: true, + }, + microblock_hash: { + type: 'bytea', + notNull: true, + }, + microblock_sequence: { + type: 'integer', + notNull: true, + }, + tx_id: { + type: 'bytea', + notNull: true, + }, + tx_index: { + type: 'smallint', + notNull: true, + }, + event_index: { + type: 'integer', + notNull: true, + }, + }); + pgm.createConstraint('nft_custody', 'nft_custody_unique', 'UNIQUE(asset_identifier, value)'); + pgm.createIndex('nft_custody', ['recipient', 'asset_identifier']); + pgm.createIndex('nft_custody', 'value'); + pgm.createIndex('nft_custody', [ + { name: 'block_height', sort: 'DESC' }, + { name: 'microblock_sequence', sort: 'DESC' }, + { name: 'tx_index', sort: 'DESC' }, + { name: 'event_index', sort: 'DESC' } + ]); + pgm.sql(` + INSERT INTO nft_custody (asset_identifier, value, recipient, tx_id, block_height, index_block_hash, parent_index_block_hash, microblock_hash, microblock_sequence, tx_index, event_index) ( + SELECT + DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient, tx_id, nft.block_height, + nft.index_block_hash, nft.parent_index_block_hash, nft.microblock_hash, nft.microblock_sequence, nft.tx_index, nft.event_index + FROM + nft_events AS nft + INNER JOIN + txs USING (tx_id) + WHERE + txs.canonical = true + AND txs.microblock_canonical = true + AND nft.canonical = true + AND nft.microblock_canonical = true + ORDER BY + asset_identifier, + value, + txs.block_height DESC, + txs.microblock_sequence DESC, + txs.tx_index DESC, + nft.event_index DESC + ) + `); + + pgm.dropMaterializedView('nft_custody_unanchored'); + pgm.createTable('nft_custody_unanchored', { + asset_identifier: { + type: 'string', + notNull: true, + }, + value: { + type: 'bytea', + notNull: true, + }, + recipient: { + type: 'text', + }, + block_height: { + type: 'integer', + notNull: true, + }, + index_block_hash: { + type: 'bytea', + notNull: true, + }, + parent_index_block_hash: { + type: 'bytea', + notNull: true, + }, + microblock_hash: { + type: 'bytea', + notNull: true, + }, + microblock_sequence: { + type: 'integer', + notNull: true, + }, + tx_id: { + type: 'bytea', + notNull: true, + }, + tx_index: { + type: 'smallint', + notNull: true, + }, + event_index: { + type: 'integer', + notNull: true, + }, + }); + pgm.createConstraint('nft_custody_unanchored', 'nft_custody_unanchored_unique', 'UNIQUE(asset_identifier, value)'); + pgm.createIndex('nft_custody_unanchored', ['recipient', 'asset_identifier']); + pgm.createIndex('nft_custody_unanchored', 'value'); + pgm.createIndex('nft_custody_unanchored', [ + { name: 'block_height', sort: 'DESC' }, + { name: 'microblock_sequence', sort: 'DESC' }, + { name: 'tx_index', sort: 'DESC' }, + { name: 'event_index', sort: 'DESC' } + ]); + pgm.sql(` + INSERT INTO nft_custody_unanchored (asset_identifier, value, recipient, tx_id, block_height, index_block_hash, parent_index_block_hash, microblock_hash, microblock_sequence, tx_index, event_index) ( + SELECT + DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient, tx_id, nft.block_height, + nft.index_block_hash, nft.parent_index_block_hash, nft.microblock_hash, nft.microblock_sequence, nft.tx_index, nft.event_index + FROM + nft_events AS nft + INNER JOIN + txs USING (tx_id) + WHERE + txs.canonical = true + AND txs.microblock_canonical = true + AND nft.canonical = true + AND nft.microblock_canonical = true + ORDER BY + asset_identifier, + value, + txs.block_height DESC, + txs.microblock_sequence DESC, + txs.tx_index DESC, + nft.event_index DESC + ) + `); +}; + +exports.down = pgm => { + pgm.dropTable('nft_custody'); + pgm.createMaterializedView('nft_custody', { data: true }, ` + SELECT + DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient, tx_id, nft.block_height + FROM + nft_events AS nft + INNER JOIN + txs USING (tx_id) + WHERE + txs.canonical = true + AND txs.microblock_canonical = true + AND nft.canonical = true + AND nft.microblock_canonical = true + ORDER BY + asset_identifier, + value, + txs.block_height DESC, + txs.microblock_sequence DESC, + txs.tx_index DESC, + nft.event_index DESC + `); + pgm.createIndex('nft_custody', ['recipient', 'asset_identifier']); + pgm.createIndex('nft_custody', ['asset_identifier', 'value'], { unique: true }); + pgm.createIndex('nft_custody', 'value'); + + pgm.dropTable('nft_custody_unanchored'); + pgm.createMaterializedView('nft_custody_unanchored', { data: true }, ` + SELECT + DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient, tx_id, nft.block_height + FROM + nft_events AS nft + INNER JOIN + txs USING (tx_id) + WHERE + txs.canonical = true + AND txs.microblock_canonical = true + AND nft.canonical = true + AND nft.microblock_canonical = true + ORDER BY + asset_identifier, + value, + txs.block_height DESC, + txs.microblock_sequence DESC, + txs.tx_index DESC, + nft.event_index DESC + `); + pgm.createIndex('nft_custody_unanchored', ['recipient', 'asset_identifier']); + pgm.createIndex('nft_custody_unanchored', ['asset_identifier', 'value'], { unique: true }); + pgm.createIndex('nft_custody_unanchored', 'value'); +}; diff --git a/src/datastore/common.ts b/src/datastore/common.ts index 7a0a2062..57e6caff 100644 --- a/src/datastore/common.ts +++ b/src/datastore/common.ts @@ -1357,6 +1357,20 @@ export interface NftEventInsertValues { value: PgBytea; } +export interface NftCustodyInsertValues { + event_index: number; + tx_id: PgBytea; + tx_index: number; + block_height: number; + index_block_hash: PgBytea; + parent_index_block_hash: PgBytea; + microblock_hash: PgBytea; + microblock_sequence: number; + recipient: string | null; + asset_identifier: string; + value: PgBytea; +} + export interface FtEventInsertValues { event_index: number; tx_id: PgBytea; diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index 29ab0139..b3513238 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -3318,6 +3318,7 @@ export class PgStore { FROM ${nftCustody} AS nft WHERE nft.recipient = ${args.principal} ${assetIdFilter} + ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC LIMIT ${args.limit} OFFSET ${args.offset} ) @@ -3519,11 +3520,11 @@ export class PgStore { AND block_height <= ${args.blockHeight} ORDER BY asset_identifier, value, block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC ) - SELECT sender, recipient, asset_identifier, value, event_index, asset_event_type_id, address_transfers.block_height, address_transfers.tx_id, (COUNT(*) OVER())::INTEGER AS count - FROM address_transfers + SELECT sender, recipient, asset_identifier, value, at.event_index, asset_event_type_id, at.block_height, at.tx_id, (COUNT(*) OVER())::INTEGER AS count + FROM address_transfers AS at INNER JOIN ${args.includeUnanchored ? this.sql`last_nft_transfers` : this.sql`nft_custody`} USING (asset_identifier, value, recipient) - ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC + ORDER BY at.block_height DESC, at.microblock_sequence DESC, at.tx_index DESC, event_index DESC LIMIT ${args.limit} OFFSET ${args.offset} `; diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 07b16140..29bb435d 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -9,7 +9,6 @@ import { DbSmartContractEvent, DbSmartContract, DataStoreBlockUpdateData, - DbMempoolTx, DbStxLockEvent, DbMinerReward, DbBurnchainReward, @@ -63,6 +62,7 @@ import { DbMempoolTxRaw, DbChainTip, DbPox3Event, + NftCustodyInsertValues, } from './common'; import { ClarityAbi } from '@stacks/transactions'; import { @@ -407,7 +407,7 @@ export class PgWriteStore extends PgStore { await this.updateFtEvent(sql, entry.tx, ftEvent); } for (const nftEvent of entry.nftEvents) { - await this.updateNftEvent(sql, entry.tx, nftEvent); + await this.updateNftEvent(sql, entry.tx, nftEvent, false); } deployedSmartContracts.push(...entry.smartContracts); for (const smartContract of entry.smartContracts) { @@ -466,7 +466,6 @@ export class PgWriteStore extends PgStore { const ibdHeight = getIbdBlockHeight(); this.isIbdBlockHeightReached = ibdHeight ? data.block.block_height > ibdHeight : true; - await this.refreshNftCustody(batchedTxData); await this.refreshMaterializedView('chain_tip'); await this.refreshMaterializedView('mempool_digest'); @@ -746,7 +745,6 @@ export class PgWriteStore extends PgStore { } }); - await this.refreshNftCustody(txData, true); await this.refreshMaterializedView('chain_tip'); await this.refreshMaterializedView('mempool_digest'); @@ -1313,27 +1311,65 @@ export class PgWriteStore extends PgStore { `; } - async updateNftEvent(sql: PgSqlClient, tx: DbTx, event: DbNftEvent) { - const values: NftEventInsertValues = { + async updateNftEvent(sql: PgSqlClient, tx: DbTx, event: DbNftEvent, microblock: boolean) { + const custody: NftCustodyInsertValues = { + asset_identifier: event.asset_identifier, + value: event.value, tx_id: event.tx_id, index_block_hash: tx.index_block_hash, parent_index_block_hash: tx.parent_index_block_hash, microblock_hash: tx.microblock_hash, microblock_sequence: tx.microblock_sequence, - microblock_canonical: tx.microblock_canonical, - sender: event.sender ?? null, recipient: event.recipient ?? null, event_index: event.event_index, tx_index: event.tx_index, block_height: event.block_height, + }; + const values: NftEventInsertValues = { + ...custody, + microblock_canonical: tx.microblock_canonical, canonical: event.canonical, + sender: event.sender ?? null, asset_event_type_id: event.asset_event_type_id, - asset_identifier: event.asset_identifier, - value: event.value, }; await sql` INSERT INTO nft_events ${sql(values)} `; + if (tx.canonical && tx.microblock_canonical && event.canonical) { + const table = microblock ? sql`nft_custody_unanchored` : sql`nft_custody`; + await sql` + INSERT INTO ${table} ${sql(custody)} + ON CONFLICT ON CONSTRAINT ${table}_unique DO UPDATE SET + tx_id = EXCLUDED.tx_id, + index_block_hash = EXCLUDED.index_block_hash, + parent_index_block_hash = EXCLUDED.parent_index_block_hash, + microblock_hash = EXCLUDED.microblock_hash, + microblock_sequence = EXCLUDED.microblock_sequence, + recipient = EXCLUDED.recipient, + event_index = EXCLUDED.event_index, + tx_index = EXCLUDED.tx_index, + block_height = EXCLUDED.block_height + WHERE + ( + EXCLUDED.block_height > ${table}.block_height + ) + OR ( + EXCLUDED.block_height = ${table}.block_height + AND EXCLUDED.microblock_sequence > ${table}.microblock_sequence + ) + OR ( + EXCLUDED.block_height = ${table}.block_height + AND EXCLUDED.microblock_sequence = ${table}.microblock_sequence + AND EXCLUDED.tx_index > ${table}.tx_index + ) + OR ( + EXCLUDED.block_height = ${table}.block_height + AND EXCLUDED.microblock_sequence = ${table}.microblock_sequence + AND EXCLUDED.tx_index = ${table}.tx_index + AND EXCLUDED.event_index > ${table}.event_index + ) + `; + } } async updateBatchSmartContractEvent(sql: PgSqlClient, tx: DbTx, events: DbSmartContractEvent[]) { @@ -2206,7 +2242,7 @@ export class PgWriteStore extends PgStore { await this.updateFtEvent(sql, entry.tx, ftEvent); } for (const nftEvent of entry.nftEvents) { - await this.updateNftEvent(sql, entry.tx, nftEvent); + await this.updateNftEvent(sql, entry.tx, nftEvent, true); } for (const smartContract of entry.smartContracts) { await this.updateSmartContract(sql, entry.tx, smartContract); @@ -2288,11 +2324,74 @@ export class PgWriteStore extends PgStore { AND (index_block_hash = ${args.indexBlockHash} OR index_block_hash = '\\x'::bytea) AND tx_id IN ${sql(txIds)} `; + await this.updateNftCustodyFromReOrg(sql, { + index_block_hash: args.indexBlockHash, + microblocks: args.microblocks, + }); } return { updatedTxs: updatedMbTxs }; } + /** + * Refreshes NFT custody data for events within a block or series of microblocks. + * @param sql - SQL client + * @param args - Block and microblock hashes + */ + async updateNftCustodyFromReOrg( + sql: PgSqlClient, + args: { + index_block_hash: string; + microblocks: string[]; + } + ): Promise { + for (const table of [sql`nft_custody`, sql`nft_custody_unanchored`]) { + await sql` + INSERT INTO ${table} + (asset_identifier, value, tx_id, index_block_hash, parent_index_block_hash, microblock_hash, + microblock_sequence, recipient, event_index, tx_index, block_height) + ( + SELECT + DISTINCT ON(asset_identifier, value) asset_identifier, value, tx_id, txs.index_block_hash, + txs.parent_index_block_hash, txs.microblock_hash, txs.microblock_sequence, recipient, + nft.event_index, txs.tx_index, txs.block_height + FROM + nft_events AS nft + INNER JOIN + txs USING (tx_id) + WHERE + txs.canonical = true + AND txs.microblock_canonical = true + AND nft.canonical = true + AND nft.microblock_canonical = true + AND nft.index_block_hash = ${args.index_block_hash} + ${ + args.microblocks.length > 0 + ? sql`AND nft.microblock_hash IN ${sql(args.microblocks)}` + : sql`` + } + ORDER BY + asset_identifier, + value, + txs.block_height DESC, + txs.microblock_sequence DESC, + txs.tx_index DESC, + nft.event_index DESC + ) + ON CONFLICT ON CONSTRAINT ${table}_unique DO UPDATE SET + tx_id = EXCLUDED.tx_id, + index_block_hash = EXCLUDED.index_block_hash, + parent_index_block_hash = EXCLUDED.parent_index_block_hash, + microblock_hash = EXCLUDED.microblock_hash, + microblock_sequence = EXCLUDED.microblock_sequence, + recipient = EXCLUDED.recipient, + event_index = EXCLUDED.event_index, + tx_index = EXCLUDED.tx_index, + block_height = EXCLUDED.block_height + `; + } + } + /** * Fetches from the `microblocks` table with a given `parent_index_block_hash` and a known * latest unanchored microblock tip. Microblocks that are chained to the given tip are @@ -2554,6 +2653,10 @@ export class PgWriteStore extends PgStore { } else { updatedEntities.markedNonCanonical.nftEvents += nftResult.count; } + await this.updateNftCustodyFromReOrg(sql, { + index_block_hash: indexBlockHash, + microblocks: [], + }); // todo: do we still need pox2 marking here? const pox2Result = await sql` @@ -2922,40 +3025,6 @@ export class PgWriteStore extends PgStore { await sql`REFRESH MATERIALIZED VIEW ${isProdEnv ? sql`CONCURRENTLY` : sql``} ${sql(viewName)}`; } - /** - * Refreshes the `nft_custody` and `nft_custody_unanchored` materialized views if necessary. - * @param sql - DB client - * @param txs - Transaction event data - * @param unanchored - If this refresh is requested from a block or microblock - */ - async refreshNftCustody(txs: DataStoreTxEventData[], unanchored: boolean = false) { - await this.sqlWriteTransaction(async sql => { - const newNftEventCount = txs - .map(tx => tx.nftEvents.length) - .reduce((prev, cur) => prev + cur, 0); - if (newNftEventCount > 0) { - // Always refresh unanchored view since even if we're in a new anchored block we should update the - // unanchored state to the current one. - await this.refreshMaterializedView('nft_custody_unanchored', sql); - if (!unanchored) { - await this.refreshMaterializedView('nft_custody', sql); - } - } else if (!unanchored) { - // Even if we didn't receive new NFT events in a new anchor block, we should check if we need to - // update the anchored view to reflect any changes made by previous microblocks. - const result = await sql<{ outdated: boolean }[]>` - WITH anchored_height AS (SELECT MAX(block_height) AS anchored FROM nft_custody), - unanchored_height AS (SELECT MAX(block_height) AS unanchored FROM nft_custody_unanchored) - SELECT unanchored > anchored AS outdated - FROM anchored_height CROSS JOIN unanchored_height - `; - if (result.length > 0 && result[0].outdated) { - await this.refreshMaterializedView('nft_custody', sql); - } - } - }); - } - /** * Called when a full event import is complete. */ @@ -2964,8 +3033,6 @@ export class PgWriteStore extends PgStore { return; } await this.sqlWriteTransaction(async sql => { - await this.refreshMaterializedView('nft_custody', sql, false); - await this.refreshMaterializedView('nft_custody_unanchored', sql, false); await this.refreshMaterializedView('chain_tip', sql, false); await this.refreshMaterializedView('mempool_digest', sql, false); }); diff --git a/src/tests/search-tests.ts b/src/tests/search-tests.ts index 32b0ee3b..c994399e 100644 --- a/src/tests/search-tests.ts +++ b/src/tests/search-tests.ts @@ -820,7 +820,7 @@ describe('search tests', () => { recipient: addr7, sender: 'none', }; - await db.updateNftEvent(client, stxTx1, nftEvent1); + await db.updateNftEvent(client, stxTx1, nftEvent1, false); // test address as a nft event recipient const searchResult7 = await supertest(api.server).get(`/extended/v1/search/${addr7}`); @@ -848,7 +848,7 @@ describe('search tests', () => { recipient: 'none', sender: addr8, }; - await db.updateNftEvent(client, stxTx1, nftEvent2); + await db.updateNftEvent(client, stxTx1, nftEvent2, false); // test address as a nft event sender const searchResult8 = await supertest(api.server).get(`/extended/v1/search/${addr8}`); diff --git a/src/tests/token-tests.ts b/src/tests/token-tests.ts index d93ee302..12d6d5ae 100644 --- a/src/tests/token-tests.ts +++ b/src/tests/token-tests.ts @@ -3,7 +3,6 @@ import { ChainID } from '@stacks/transactions'; import { ApiServer, startApiServer } from '../api/init'; import { TestBlockBuilder, TestMicroblockStreamBuilder } from '../test-utils/test-builders'; import { DbAssetEventTypeId } from '../datastore/common'; -import { hexToBuffer } from '../helpers'; import { PgWriteStore } from '../datastore/pg-write-store'; import { cycleMigrations, runMigrations } from '../datastore/migrations';