fix: move nft custody view into a table (#1741)

* fix: move to table

* fix: old nft events query

* fix: nft-custody-table migration

* fix: no longer rename old views, just remove them

---------

Co-authored-by: Matt <zone117x@gmail.com>
This commit is contained in:
Rafael Cárdenas
2023-10-25 10:48:03 -06:00
committed by GitHub
parent cc4d0e5eea
commit fb0d0eaa93
6 changed files with 349 additions and 53 deletions

View File

@@ -0,0 +1,215 @@
/* eslint-disable camelcase */
exports.shorthands = undefined;
exports.up = pgm => {
pgm.dropMaterializedView('nft_custody');
pgm.createTable('nft_custody', {
asset_identifier: {
type: 'string',
notNull: true,
},
value: {
type: 'bytea',
notNull: true,
},
recipient: {
type: 'text',
},
block_height: {
type: 'integer',
notNull: true,
},
index_block_hash: {
type: 'bytea',
notNull: true,
},
parent_index_block_hash: {
type: 'bytea',
notNull: true,
},
microblock_hash: {
type: 'bytea',
notNull: true,
},
microblock_sequence: {
type: 'integer',
notNull: true,
},
tx_id: {
type: 'bytea',
notNull: true,
},
tx_index: {
type: 'smallint',
notNull: true,
},
event_index: {
type: 'integer',
notNull: true,
},
});
pgm.createConstraint('nft_custody', 'nft_custody_unique', 'UNIQUE(asset_identifier, value)');
pgm.createIndex('nft_custody', ['recipient', 'asset_identifier']);
pgm.createIndex('nft_custody', 'value');
pgm.createIndex('nft_custody', [
{ name: 'block_height', sort: 'DESC' },
{ name: 'microblock_sequence', sort: 'DESC' },
{ name: 'tx_index', sort: 'DESC' },
{ name: 'event_index', sort: 'DESC' }
]);
pgm.sql(`
INSERT INTO nft_custody (asset_identifier, value, recipient, tx_id, block_height, index_block_hash, parent_index_block_hash, microblock_hash, microblock_sequence, tx_index, event_index) (
SELECT
DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient, tx_id, nft.block_height,
nft.index_block_hash, nft.parent_index_block_hash, nft.microblock_hash, nft.microblock_sequence, nft.tx_index, nft.event_index
FROM
nft_events AS nft
INNER JOIN
txs USING (tx_id)
WHERE
txs.canonical = true
AND txs.microblock_canonical = true
AND nft.canonical = true
AND nft.microblock_canonical = true
ORDER BY
asset_identifier,
value,
txs.block_height DESC,
txs.microblock_sequence DESC,
txs.tx_index DESC,
nft.event_index DESC
)
`);
pgm.dropMaterializedView('nft_custody_unanchored');
pgm.createTable('nft_custody_unanchored', {
asset_identifier: {
type: 'string',
notNull: true,
},
value: {
type: 'bytea',
notNull: true,
},
recipient: {
type: 'text',
},
block_height: {
type: 'integer',
notNull: true,
},
index_block_hash: {
type: 'bytea',
notNull: true,
},
parent_index_block_hash: {
type: 'bytea',
notNull: true,
},
microblock_hash: {
type: 'bytea',
notNull: true,
},
microblock_sequence: {
type: 'integer',
notNull: true,
},
tx_id: {
type: 'bytea',
notNull: true,
},
tx_index: {
type: 'smallint',
notNull: true,
},
event_index: {
type: 'integer',
notNull: true,
},
});
pgm.createConstraint('nft_custody_unanchored', 'nft_custody_unanchored_unique', 'UNIQUE(asset_identifier, value)');
pgm.createIndex('nft_custody_unanchored', ['recipient', 'asset_identifier']);
pgm.createIndex('nft_custody_unanchored', 'value');
pgm.createIndex('nft_custody_unanchored', [
{ name: 'block_height', sort: 'DESC' },
{ name: 'microblock_sequence', sort: 'DESC' },
{ name: 'tx_index', sort: 'DESC' },
{ name: 'event_index', sort: 'DESC' }
]);
pgm.sql(`
INSERT INTO nft_custody_unanchored (asset_identifier, value, recipient, tx_id, block_height, index_block_hash, parent_index_block_hash, microblock_hash, microblock_sequence, tx_index, event_index) (
SELECT
DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient, tx_id, nft.block_height,
nft.index_block_hash, nft.parent_index_block_hash, nft.microblock_hash, nft.microblock_sequence, nft.tx_index, nft.event_index
FROM
nft_events AS nft
INNER JOIN
txs USING (tx_id)
WHERE
txs.canonical = true
AND txs.microblock_canonical = true
AND nft.canonical = true
AND nft.microblock_canonical = true
ORDER BY
asset_identifier,
value,
txs.block_height DESC,
txs.microblock_sequence DESC,
txs.tx_index DESC,
nft.event_index DESC
)
`);
};
exports.down = pgm => {
pgm.dropTable('nft_custody');
pgm.createMaterializedView('nft_custody', { data: true }, `
SELECT
DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient, tx_id, nft.block_height
FROM
nft_events AS nft
INNER JOIN
txs USING (tx_id)
WHERE
txs.canonical = true
AND txs.microblock_canonical = true
AND nft.canonical = true
AND nft.microblock_canonical = true
ORDER BY
asset_identifier,
value,
txs.block_height DESC,
txs.microblock_sequence DESC,
txs.tx_index DESC,
nft.event_index DESC
`);
pgm.createIndex('nft_custody', ['recipient', 'asset_identifier']);
pgm.createIndex('nft_custody', ['asset_identifier', 'value'], { unique: true });
pgm.createIndex('nft_custody', 'value');
pgm.dropTable('nft_custody_unanchored');
pgm.createMaterializedView('nft_custody_unanchored', { data: true }, `
SELECT
DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient, tx_id, nft.block_height
FROM
nft_events AS nft
INNER JOIN
txs USING (tx_id)
WHERE
txs.canonical = true
AND txs.microblock_canonical = true
AND nft.canonical = true
AND nft.microblock_canonical = true
ORDER BY
asset_identifier,
value,
txs.block_height DESC,
txs.microblock_sequence DESC,
txs.tx_index DESC,
nft.event_index DESC
`);
pgm.createIndex('nft_custody_unanchored', ['recipient', 'asset_identifier']);
pgm.createIndex('nft_custody_unanchored', ['asset_identifier', 'value'], { unique: true });
pgm.createIndex('nft_custody_unanchored', 'value');
};

View File

@@ -1357,6 +1357,20 @@ export interface NftEventInsertValues {
value: PgBytea;
}
export interface NftCustodyInsertValues {
event_index: number;
tx_id: PgBytea;
tx_index: number;
block_height: number;
index_block_hash: PgBytea;
parent_index_block_hash: PgBytea;
microblock_hash: PgBytea;
microblock_sequence: number;
recipient: string | null;
asset_identifier: string;
value: PgBytea;
}
export interface FtEventInsertValues {
event_index: number;
tx_id: PgBytea;

View File

@@ -3318,6 +3318,7 @@ export class PgStore {
FROM ${nftCustody} AS nft
WHERE nft.recipient = ${args.principal}
${assetIdFilter}
ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC
LIMIT ${args.limit}
OFFSET ${args.offset}
)
@@ -3519,11 +3520,11 @@ export class PgStore {
AND block_height <= ${args.blockHeight}
ORDER BY asset_identifier, value, block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC
)
SELECT sender, recipient, asset_identifier, value, event_index, asset_event_type_id, address_transfers.block_height, address_transfers.tx_id, (COUNT(*) OVER())::INTEGER AS count
FROM address_transfers
SELECT sender, recipient, asset_identifier, value, at.event_index, asset_event_type_id, at.block_height, at.tx_id, (COUNT(*) OVER())::INTEGER AS count
FROM address_transfers AS at
INNER JOIN ${args.includeUnanchored ? this.sql`last_nft_transfers` : this.sql`nft_custody`}
USING (asset_identifier, value, recipient)
ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC
ORDER BY at.block_height DESC, at.microblock_sequence DESC, at.tx_index DESC, event_index DESC
LIMIT ${args.limit} OFFSET ${args.offset}
`;

View File

@@ -9,7 +9,6 @@ import {
DbSmartContractEvent,
DbSmartContract,
DataStoreBlockUpdateData,
DbMempoolTx,
DbStxLockEvent,
DbMinerReward,
DbBurnchainReward,
@@ -63,6 +62,7 @@ import {
DbMempoolTxRaw,
DbChainTip,
DbPox3Event,
NftCustodyInsertValues,
} from './common';
import { ClarityAbi } from '@stacks/transactions';
import {
@@ -407,7 +407,7 @@ export class PgWriteStore extends PgStore {
await this.updateFtEvent(sql, entry.tx, ftEvent);
}
for (const nftEvent of entry.nftEvents) {
await this.updateNftEvent(sql, entry.tx, nftEvent);
await this.updateNftEvent(sql, entry.tx, nftEvent, false);
}
deployedSmartContracts.push(...entry.smartContracts);
for (const smartContract of entry.smartContracts) {
@@ -466,7 +466,6 @@ export class PgWriteStore extends PgStore {
const ibdHeight = getIbdBlockHeight();
this.isIbdBlockHeightReached = ibdHeight ? data.block.block_height > ibdHeight : true;
await this.refreshNftCustody(batchedTxData);
await this.refreshMaterializedView('chain_tip');
await this.refreshMaterializedView('mempool_digest');
@@ -746,7 +745,6 @@ export class PgWriteStore extends PgStore {
}
});
await this.refreshNftCustody(txData, true);
await this.refreshMaterializedView('chain_tip');
await this.refreshMaterializedView('mempool_digest');
@@ -1313,27 +1311,65 @@ export class PgWriteStore extends PgStore {
`;
}
async updateNftEvent(sql: PgSqlClient, tx: DbTx, event: DbNftEvent) {
const values: NftEventInsertValues = {
async updateNftEvent(sql: PgSqlClient, tx: DbTx, event: DbNftEvent, microblock: boolean) {
const custody: NftCustodyInsertValues = {
asset_identifier: event.asset_identifier,
value: event.value,
tx_id: event.tx_id,
index_block_hash: tx.index_block_hash,
parent_index_block_hash: tx.parent_index_block_hash,
microblock_hash: tx.microblock_hash,
microblock_sequence: tx.microblock_sequence,
microblock_canonical: tx.microblock_canonical,
sender: event.sender ?? null,
recipient: event.recipient ?? null,
event_index: event.event_index,
tx_index: event.tx_index,
block_height: event.block_height,
};
const values: NftEventInsertValues = {
...custody,
microblock_canonical: tx.microblock_canonical,
canonical: event.canonical,
sender: event.sender ?? null,
asset_event_type_id: event.asset_event_type_id,
asset_identifier: event.asset_identifier,
value: event.value,
};
await sql`
INSERT INTO nft_events ${sql(values)}
`;
if (tx.canonical && tx.microblock_canonical && event.canonical) {
const table = microblock ? sql`nft_custody_unanchored` : sql`nft_custody`;
await sql`
INSERT INTO ${table} ${sql(custody)}
ON CONFLICT ON CONSTRAINT ${table}_unique DO UPDATE SET
tx_id = EXCLUDED.tx_id,
index_block_hash = EXCLUDED.index_block_hash,
parent_index_block_hash = EXCLUDED.parent_index_block_hash,
microblock_hash = EXCLUDED.microblock_hash,
microblock_sequence = EXCLUDED.microblock_sequence,
recipient = EXCLUDED.recipient,
event_index = EXCLUDED.event_index,
tx_index = EXCLUDED.tx_index,
block_height = EXCLUDED.block_height
WHERE
(
EXCLUDED.block_height > ${table}.block_height
)
OR (
EXCLUDED.block_height = ${table}.block_height
AND EXCLUDED.microblock_sequence > ${table}.microblock_sequence
)
OR (
EXCLUDED.block_height = ${table}.block_height
AND EXCLUDED.microblock_sequence = ${table}.microblock_sequence
AND EXCLUDED.tx_index > ${table}.tx_index
)
OR (
EXCLUDED.block_height = ${table}.block_height
AND EXCLUDED.microblock_sequence = ${table}.microblock_sequence
AND EXCLUDED.tx_index = ${table}.tx_index
AND EXCLUDED.event_index > ${table}.event_index
)
`;
}
}
async updateBatchSmartContractEvent(sql: PgSqlClient, tx: DbTx, events: DbSmartContractEvent[]) {
@@ -2206,7 +2242,7 @@ export class PgWriteStore extends PgStore {
await this.updateFtEvent(sql, entry.tx, ftEvent);
}
for (const nftEvent of entry.nftEvents) {
await this.updateNftEvent(sql, entry.tx, nftEvent);
await this.updateNftEvent(sql, entry.tx, nftEvent, true);
}
for (const smartContract of entry.smartContracts) {
await this.updateSmartContract(sql, entry.tx, smartContract);
@@ -2288,11 +2324,74 @@ export class PgWriteStore extends PgStore {
AND (index_block_hash = ${args.indexBlockHash} OR index_block_hash = '\\x'::bytea)
AND tx_id IN ${sql(txIds)}
`;
await this.updateNftCustodyFromReOrg(sql, {
index_block_hash: args.indexBlockHash,
microblocks: args.microblocks,
});
}
return { updatedTxs: updatedMbTxs };
}
/**
* Refreshes NFT custody data for events within a block or series of microblocks.
* @param sql - SQL client
* @param args - Block and microblock hashes
*/
async updateNftCustodyFromReOrg(
sql: PgSqlClient,
args: {
index_block_hash: string;
microblocks: string[];
}
): Promise<void> {
for (const table of [sql`nft_custody`, sql`nft_custody_unanchored`]) {
await sql`
INSERT INTO ${table}
(asset_identifier, value, tx_id, index_block_hash, parent_index_block_hash, microblock_hash,
microblock_sequence, recipient, event_index, tx_index, block_height)
(
SELECT
DISTINCT ON(asset_identifier, value) asset_identifier, value, tx_id, txs.index_block_hash,
txs.parent_index_block_hash, txs.microblock_hash, txs.microblock_sequence, recipient,
nft.event_index, txs.tx_index, txs.block_height
FROM
nft_events AS nft
INNER JOIN
txs USING (tx_id)
WHERE
txs.canonical = true
AND txs.microblock_canonical = true
AND nft.canonical = true
AND nft.microblock_canonical = true
AND nft.index_block_hash = ${args.index_block_hash}
${
args.microblocks.length > 0
? sql`AND nft.microblock_hash IN ${sql(args.microblocks)}`
: sql``
}
ORDER BY
asset_identifier,
value,
txs.block_height DESC,
txs.microblock_sequence DESC,
txs.tx_index DESC,
nft.event_index DESC
)
ON CONFLICT ON CONSTRAINT ${table}_unique DO UPDATE SET
tx_id = EXCLUDED.tx_id,
index_block_hash = EXCLUDED.index_block_hash,
parent_index_block_hash = EXCLUDED.parent_index_block_hash,
microblock_hash = EXCLUDED.microblock_hash,
microblock_sequence = EXCLUDED.microblock_sequence,
recipient = EXCLUDED.recipient,
event_index = EXCLUDED.event_index,
tx_index = EXCLUDED.tx_index,
block_height = EXCLUDED.block_height
`;
}
}
/**
* Fetches from the `microblocks` table with a given `parent_index_block_hash` and a known
* latest unanchored microblock tip. Microblocks that are chained to the given tip are
@@ -2554,6 +2653,10 @@ export class PgWriteStore extends PgStore {
} else {
updatedEntities.markedNonCanonical.nftEvents += nftResult.count;
}
await this.updateNftCustodyFromReOrg(sql, {
index_block_hash: indexBlockHash,
microblocks: [],
});
// todo: do we still need pox2 marking here?
const pox2Result = await sql`
@@ -2922,40 +3025,6 @@ export class PgWriteStore extends PgStore {
await sql`REFRESH MATERIALIZED VIEW ${isProdEnv ? sql`CONCURRENTLY` : sql``} ${sql(viewName)}`;
}
/**
* Refreshes the `nft_custody` and `nft_custody_unanchored` materialized views if necessary.
* @param sql - DB client
* @param txs - Transaction event data
* @param unanchored - If this refresh is requested from a block or microblock
*/
async refreshNftCustody(txs: DataStoreTxEventData[], unanchored: boolean = false) {
await this.sqlWriteTransaction(async sql => {
const newNftEventCount = txs
.map(tx => tx.nftEvents.length)
.reduce((prev, cur) => prev + cur, 0);
if (newNftEventCount > 0) {
// Always refresh unanchored view since even if we're in a new anchored block we should update the
// unanchored state to the current one.
await this.refreshMaterializedView('nft_custody_unanchored', sql);
if (!unanchored) {
await this.refreshMaterializedView('nft_custody', sql);
}
} else if (!unanchored) {
// Even if we didn't receive new NFT events in a new anchor block, we should check if we need to
// update the anchored view to reflect any changes made by previous microblocks.
const result = await sql<{ outdated: boolean }[]>`
WITH anchored_height AS (SELECT MAX(block_height) AS anchored FROM nft_custody),
unanchored_height AS (SELECT MAX(block_height) AS unanchored FROM nft_custody_unanchored)
SELECT unanchored > anchored AS outdated
FROM anchored_height CROSS JOIN unanchored_height
`;
if (result.length > 0 && result[0].outdated) {
await this.refreshMaterializedView('nft_custody', sql);
}
}
});
}
/**
* Called when a full event import is complete.
*/
@@ -2964,8 +3033,6 @@ export class PgWriteStore extends PgStore {
return;
}
await this.sqlWriteTransaction(async sql => {
await this.refreshMaterializedView('nft_custody', sql, false);
await this.refreshMaterializedView('nft_custody_unanchored', sql, false);
await this.refreshMaterializedView('chain_tip', sql, false);
await this.refreshMaterializedView('mempool_digest', sql, false);
});

View File

@@ -820,7 +820,7 @@ describe('search tests', () => {
recipient: addr7,
sender: 'none',
};
await db.updateNftEvent(client, stxTx1, nftEvent1);
await db.updateNftEvent(client, stxTx1, nftEvent1, false);
// test address as a nft event recipient
const searchResult7 = await supertest(api.server).get(`/extended/v1/search/${addr7}`);
@@ -848,7 +848,7 @@ describe('search tests', () => {
recipient: 'none',
sender: addr8,
};
await db.updateNftEvent(client, stxTx1, nftEvent2);
await db.updateNftEvent(client, stxTx1, nftEvent2, false);
// test address as a nft event sender
const searchResult8 = await supertest(api.server).get(`/extended/v1/search/${addr8}`);

View File

@@ -3,7 +3,6 @@ import { ChainID } from '@stacks/transactions';
import { ApiServer, startApiServer } from '../api/init';
import { TestBlockBuilder, TestMicroblockStreamBuilder } from '../test-utils/test-builders';
import { DbAssetEventTypeId } from '../datastore/common';
import { hexToBuffer } from '../helpers';
import { PgWriteStore } from '../datastore/pg-write-store';
import { cycleMigrations, runMigrations } from '../datastore/migrations';