From aaafb5ae2feb35354339267c5ba4b53e13079250 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Fri, 12 Nov 2021 08:40:58 -0600 Subject: [PATCH] feat: add nft_custody pg materialized view to speed up nft event lookup * feat: add nft_custody materialized view * fix: nft events unit test * fix: perform full query if user wants unanchored results * feat: only refresh materialized views when not in event replay --- src/datastore/postgres-store.ts | 41 ++++++- src/index.ts | 3 +- src/migrations/1636130197558_nft_custody.ts | 25 ++++ src/tests/api-tests.ts | 124 ++++++++++++++------ 4 files changed, 150 insertions(+), 43 deletions(-) create mode 100644 src/migrations/1636130197558_nft_custody.ts diff --git a/src/datastore/postgres-store.ts b/src/datastore/postgres-store.ts index 11398540..7e9b75a7 100644 --- a/src/datastore/postgres-store.ts +++ b/src/datastore/postgres-store.ts @@ -582,11 +582,17 @@ export class PgDataStore implements DataStore { readonly pool: Pool; readonly notifier?: PgNotifier; - private constructor(pool: Pool, notifier: PgNotifier | undefined = undefined) { + readonly eventReplay: boolean; + private constructor( + pool: Pool, + notifier: PgNotifier | undefined = undefined, + eventReplay: boolean = false + ) { // eslint-disable-next-line constructor-super super(); this.pool = pool; this.notifier = notifier; + this.eventReplay = eventReplay; } /** @@ -1186,6 +1192,7 @@ export class PgDataStore const blocksUpdated = await this.updateBlock(client, data.block); if (blocksUpdated !== 0) { + let newNftEvents = false; for (const minerRewards of data.minerRewards) { await this.updateMinerReward(client, minerRewards); } @@ -1200,6 +1207,7 @@ export class PgDataStore await this.updateFtEvent(client, entry.tx, ftEvent); } for (const nftEvent of entry.nftEvents) { + newNftEvents = true; await this.updateNftEvent(client, entry.tx, nftEvent); } for (const smartContract of entry.smartContracts) { @@ -1212,6 +1220,9 @@ export class PgDataStore await this.updateNamespaces(client, entry.tx, namespace); } } + if (newNftEvents && !this.eventReplay) { + await client.query(`REFRESH MATERIALIZED VIEW nft_custody`); + } const tokenContractDeployments = data.txs .filter(entry => entry.tx.type_id === DbTxTypeId.SmartContract) @@ -2368,7 +2379,11 @@ export class PgDataStore logger.verbose(`Entities marked as non-canonical: ${markedNonCanonical}`); } - static async connect(skipMigrations = false, withNotifier = true): Promise { + static async connect( + skipMigrations = false, + withNotifier = true, + eventReplay = false + ): Promise { const clientConfig = getPgClientConfig(); const initTimer = stopwatch(); @@ -2424,10 +2439,10 @@ export class PgDataStore try { poolClient = await pool.connect(); if (!withNotifier) { - return new PgDataStore(pool); + return new PgDataStore(pool, undefined, eventReplay); } const notifier = new PgNotifier(clientConfig); - const store = new PgDataStore(pool, notifier); + const store = new PgDataStore(pool, notifier, eventReplay); await store.connectPgNotifier(); return store; } catch (error) { @@ -5757,6 +5772,7 @@ export class PgDataStore includeUnanchored: args.includeUnanchored, }); const result = await client.query( + // Join against `nft_custody` materialized view only if we're looking for canonical results. ` WITH address_transfers AS ( SELECT asset_identifier, value, sender, recipient, block_height, microblock_sequence, tx_index, event_index, tx_id @@ -5772,7 +5788,9 @@ export class PgDataStore ORDER BY asset_identifier, value, block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC ) SELECT sender, recipient, asset_identifier, value, block_height, tx_id, COUNT(*) OVER() AS count - FROM address_transfers INNER JOIN last_nft_transfers USING (asset_identifier, value, recipient) + FROM address_transfers + INNER JOIN ${args.includeUnanchored ? 'last_nft_transfers' : 'nft_custody'} + USING (asset_identifier, value, recipient) ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC LIMIT $2 OFFSET $3 `, @@ -6771,6 +6789,19 @@ export class PgDataStore }); } + /** + * Called when a full event import is complete. + */ + async finishEventReplay() { + if (!this.eventReplay) { + return; + } + await this.queryTx(async client => { + // Refresh postgres materialized views. + await client.query(`REFRESH MATERIALIZED VIEW nft_custody`); + }); + } + async close(): Promise { await this.notifier?.close(); await this.pool.end(); diff --git a/src/index.ts b/src/index.ts index 4a97faa2..07870017 100644 --- a/src/index.ts +++ b/src/index.ts @@ -285,7 +285,7 @@ async function handleProgramArgs() { // or the `--force` option can be used. await cycleMigrations({ dangerousAllowDataLoss: true }); - const db = await PgDataStore.connect(true, false); + const db = await PgDataStore.connect(true, false, true); const eventServer = await startEventServer({ datastore: db, chainId: getConfiguredChainID(), @@ -315,6 +315,7 @@ async function handleProgramArgs() { }); } } + await db.finishEventReplay(); console.log(`Event import and playback successful.`); await eventServer.closeAsync(); await db.close(); diff --git a/src/migrations/1636130197558_nft_custody.ts b/src/migrations/1636130197558_nft_custody.ts new file mode 100644 index 00000000..19d51b78 --- /dev/null +++ b/src/migrations/1636130197558_nft_custody.ts @@ -0,0 +1,25 @@ +/* eslint-disable @typescript-eslint/camelcase */ +import { MigrationBuilder, ColumnDefinitions } from 'node-pg-migrate'; + +export const shorthands: ColumnDefinitions | undefined = undefined; + +export async function up(pgm: MigrationBuilder): Promise { + pgm.createMaterializedView('nft_custody', {}, ` + SELECT + DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient + FROM + nft_events + WHERE + canonical = true AND microblock_canonical = true + ORDER BY + asset_identifier DESC, + value DESC, + block_height DESC, + microblock_sequence DESC, + tx_index DESC, + event_index DESC + `); + + pgm.createIndex('nft_custody', ['asset_identifier', 'value']); + pgm.createIndex('nft_custody', 'recipient'); +} diff --git a/src/tests/api-tests.ts b/src/tests/api-tests.ts index eb1bd2e4..403fe1e5 100644 --- a/src/tests/api-tests.ts +++ b/src/tests/api-tests.ts @@ -4600,7 +4600,11 @@ describe('api tests', () => { const searchResult = await supertest(api.server).get(`/extended/v1/tx/0x1234/raw`); expect(searchResult.status).toBe(404); }); + test('Success: nft events for address', async () => { + const addr1 = 'ST3J8EVYHVKH6XXPD61EE8XEHW4Y2K83861225AB1'; + const addr2 = 'ST1HB64MAJ1MBV4CQ80GF01DZS4T1DSMX20ADCRA4'; + const dbBlock: DbBlock = { block_hash: '0xff', index_block_hash: '0x1234', @@ -4620,11 +4624,6 @@ describe('api tests', () => { execution_cost_write_count: 0, execution_cost_write_length: 0, }; - await db.updateBlock(client, dbBlock); - - const addr1 = 'ST3J8EVYHVKH6XXPD61EE8XEHW4Y2K83861225AB1'; - const addr2 = 'ST1HB64MAJ1MBV4CQ80GF01DZS4T1DSMX20ADCRA4'; - const stxTx: DbTx = { tx_id: '0x1111000000000000000000000000000000000000000000000000000000000000', tx_index: 0, @@ -4661,27 +4660,44 @@ describe('api tests', () => { execution_cost_write_count: 0, execution_cost_write_length: 0, }; - await db.updateTx(client, stxTx); - - const nftEvent1: DbNftEvent = { - canonical: true, - event_type: DbEventTypeId.NonFungibleTokenAsset, - asset_event_type_id: DbAssetEventTypeId.Transfer, - event_index: 0, - tx_id: '0x1111000000000000000000000000000000000000000000000000000000000000', - tx_index: 1, - block_height: dbBlock.block_height, - asset_identifier: 'some-asset', - value: serializeCV(intCV(0)), - recipient: addr1, - sender: 'none', - }; + const nftEvents: DbNftEvent[] = []; for (let i = 0; i < 10; i++) { - await db.updateNftEvent(client, stxTx, nftEvent1); + nftEvents.push({ + canonical: true, + event_type: DbEventTypeId.NonFungibleTokenAsset, + asset_event_type_id: DbAssetEventTypeId.Transfer, + event_index: 0, + tx_id: stxTx.tx_id, + tx_index: 1, + block_height: dbBlock.block_height, + asset_identifier: 'some-asset', + value: serializeCV(intCV(0)), + recipient: addr1, + sender: 'none', + }); } + + await db.update({ + block: dbBlock, + microblocks: [], + minerRewards: [], + txs: [ + { + tx: stxTx, + stxLockEvents: [], + stxEvents: [], + ftEvents: [], + nftEvents: nftEvents, + contractLogEvents: [], + smartContracts: [], + names: [], + namespaces: [], + }, + ], + }); + const limit = 2; const offset = 0; - // test nft for given addresses const result = await supertest(api.server).get( `/extended/v1/address/${addr1}/nft_events?limit=${limit}&offset=${offset}` @@ -4697,17 +4713,36 @@ describe('api tests', () => { expect(result.body.nft_events[0].block_height).toBe(1); expect(result.body.nft_events[0].value.repr).toBe('0'); + const dbBlock2: DbBlock = { + block_hash: '0xffff', + index_block_hash: '0x123466', + parent_index_block_hash: '0x1234', + parent_block_hash: '0xff', + parent_microblock_hash: '', + parent_microblock_sequence: 0, + block_height: 2, + burn_block_time: 1594649995, + burn_block_hash: '0x123456', + burn_block_height: 124, + miner_txid: '0x4321', + canonical: true, + execution_cost_read_count: 0, + execution_cost_read_length: 0, + execution_cost_runtime: 0, + execution_cost_write_count: 0, + execution_cost_write_length: 0, + }; const stxTx1: DbTx = { - tx_id: '0x1111100000000000000000000000000000000000000000000000000000000000', + tx_id: '0x1111100000000000000000000000000000000000000000000000000000000001', tx_index: 0, anchor_mode: 3, nonce: 0, raw_tx: Buffer.alloc(0), - index_block_hash: dbBlock.index_block_hash, - block_hash: dbBlock.block_hash, - block_height: dbBlock.block_height, - burn_block_time: dbBlock.burn_block_time, - parent_burn_block_time: 1626122935, + index_block_hash: dbBlock2.index_block_hash, + block_hash: dbBlock2.block_hash, + block_height: dbBlock2.block_height, + burn_block_time: dbBlock2.burn_block_time, + parent_burn_block_time: 1626124935, type_id: DbTxTypeId.TokenTransfer, token_transfer_amount: 1n, token_transfer_memo: Buffer.from('hi'), @@ -4718,8 +4753,8 @@ describe('api tests', () => { microblock_canonical: true, microblock_sequence: I32_MAX, microblock_hash: '', - parent_index_block_hash: dbBlock.parent_index_block_hash, - parent_block_hash: dbBlock.parent_block_hash, + parent_index_block_hash: dbBlock2.parent_index_block_hash, + parent_block_hash: dbBlock2.parent_block_hash, post_conditions: Buffer.from([0x01, 0xf5]), fee_rate: 1234n, sponsored: false, @@ -4733,22 +4768,37 @@ describe('api tests', () => { execution_cost_write_count: 0, execution_cost_write_length: 0, }; - await db.updateTx(client, stxTx1); - const nftEvent2: DbNftEvent = { canonical: true, event_type: DbEventTypeId.NonFungibleTokenAsset, asset_event_type_id: DbAssetEventTypeId.Transfer, event_index: 1, - tx_id: '0x1111100000000000000000000000000000000000000000000000000000000000', + tx_id: stxTx1.tx_id, tx_index: 2, - block_height: dbBlock.block_height, + block_height: dbBlock2.block_height, asset_identifier: 'some-asset', value: serializeCV(intCV(0)), recipient: addr2, sender: 'none', }; - await db.updateNftEvent(client, stxTx, nftEvent2); + await db.update({ + block: dbBlock2, + microblocks: [], + minerRewards: [], + txs: [ + { + tx: stxTx1, + stxLockEvents: [], + stxEvents: [], + ftEvents: [], + nftEvents: [nftEvent2], + contractLogEvents: [], + smartContracts: [], + names: [], + namespaces: [], + }, + ], + }); const result1 = await supertest(api.server).get(`/extended/v1/address/${addr2}/nft_events`); expect(result1.status).toBe(200); @@ -4757,9 +4807,9 @@ describe('api tests', () => { expect(result1.body.nft_events.length).toEqual(1); expect(result1.body.nft_events[0].recipient).toBe(addr2); expect(result1.body.nft_events[0].tx_id).toBe( - '0x1111100000000000000000000000000000000000000000000000000000000000' + '0x1111100000000000000000000000000000000000000000000000000000000001' ); - expect(result1.body.nft_events[0].block_height).toBe(1); + expect(result1.body.nft_events[0].block_height).toBe(2); expect(result.body.nft_events[0].value.repr).toBe('0'); //check ownership for addr