feat: add nft_custody pg materialized view to speed up nft event lookup

* feat: add nft_custody materialized view

* fix: nft events unit test

* fix: perform full query if user wants unanchored results

* feat: only refresh materialized views when not in event replay
This commit is contained in:
Rafael Cárdenas
2021-11-12 08:40:58 -06:00
committed by GitHub
parent 7a1138452a
commit aaafb5ae2f
4 changed files with 150 additions and 43 deletions

View File

@@ -582,11 +582,17 @@ export class PgDataStore
implements DataStore {
readonly pool: Pool;
readonly notifier?: PgNotifier;
private constructor(pool: Pool, notifier: PgNotifier | undefined = undefined) {
readonly eventReplay: boolean;
private constructor(
pool: Pool,
notifier: PgNotifier | undefined = undefined,
eventReplay: boolean = false
) {
// eslint-disable-next-line constructor-super
super();
this.pool = pool;
this.notifier = notifier;
this.eventReplay = eventReplay;
}
/**
@@ -1186,6 +1192,7 @@ export class PgDataStore
const blocksUpdated = await this.updateBlock(client, data.block);
if (blocksUpdated !== 0) {
let newNftEvents = false;
for (const minerRewards of data.minerRewards) {
await this.updateMinerReward(client, minerRewards);
}
@@ -1200,6 +1207,7 @@ export class PgDataStore
await this.updateFtEvent(client, entry.tx, ftEvent);
}
for (const nftEvent of entry.nftEvents) {
newNftEvents = true;
await this.updateNftEvent(client, entry.tx, nftEvent);
}
for (const smartContract of entry.smartContracts) {
@@ -1212,6 +1220,9 @@ export class PgDataStore
await this.updateNamespaces(client, entry.tx, namespace);
}
}
if (newNftEvents && !this.eventReplay) {
await client.query(`REFRESH MATERIALIZED VIEW nft_custody`);
}
const tokenContractDeployments = data.txs
.filter(entry => entry.tx.type_id === DbTxTypeId.SmartContract)
@@ -2368,7 +2379,11 @@ export class PgDataStore
logger.verbose(`Entities marked as non-canonical: ${markedNonCanonical}`);
}
static async connect(skipMigrations = false, withNotifier = true): Promise<PgDataStore> {
static async connect(
skipMigrations = false,
withNotifier = true,
eventReplay = false
): Promise<PgDataStore> {
const clientConfig = getPgClientConfig();
const initTimer = stopwatch();
@@ -2424,10 +2439,10 @@ export class PgDataStore
try {
poolClient = await pool.connect();
if (!withNotifier) {
return new PgDataStore(pool);
return new PgDataStore(pool, undefined, eventReplay);
}
const notifier = new PgNotifier(clientConfig);
const store = new PgDataStore(pool, notifier);
const store = new PgDataStore(pool, notifier, eventReplay);
await store.connectPgNotifier();
return store;
} catch (error) {
@@ -5757,6 +5772,7 @@ export class PgDataStore
includeUnanchored: args.includeUnanchored,
});
const result = await client.query<AddressNftEventIdentifier & { count: string }>(
// Join against `nft_custody` materialized view only if we're looking for canonical results.
`
WITH address_transfers AS (
SELECT asset_identifier, value, sender, recipient, block_height, microblock_sequence, tx_index, event_index, tx_id
@@ -5772,7 +5788,9 @@ export class PgDataStore
ORDER BY asset_identifier, value, block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC
)
SELECT sender, recipient, asset_identifier, value, block_height, tx_id, COUNT(*) OVER() AS count
FROM address_transfers INNER JOIN last_nft_transfers USING (asset_identifier, value, recipient)
FROM address_transfers
INNER JOIN ${args.includeUnanchored ? 'last_nft_transfers' : 'nft_custody'}
USING (asset_identifier, value, recipient)
ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC
LIMIT $2 OFFSET $3
`,
@@ -6771,6 +6789,19 @@ export class PgDataStore
});
}
/**
* Called when a full event import is complete.
*/
async finishEventReplay() {
if (!this.eventReplay) {
return;
}
await this.queryTx(async client => {
// Refresh postgres materialized views.
await client.query(`REFRESH MATERIALIZED VIEW nft_custody`);
});
}
async close(): Promise<void> {
await this.notifier?.close();
await this.pool.end();

View File

@@ -285,7 +285,7 @@ async function handleProgramArgs() {
// or the `--force` option can be used.
await cycleMigrations({ dangerousAllowDataLoss: true });
const db = await PgDataStore.connect(true, false);
const db = await PgDataStore.connect(true, false, true);
const eventServer = await startEventServer({
datastore: db,
chainId: getConfiguredChainID(),
@@ -315,6 +315,7 @@ async function handleProgramArgs() {
});
}
}
await db.finishEventReplay();
console.log(`Event import and playback successful.`);
await eventServer.closeAsync();
await db.close();

View File

@@ -0,0 +1,25 @@
/* eslint-disable @typescript-eslint/camelcase */
import { MigrationBuilder, ColumnDefinitions } from 'node-pg-migrate';
export const shorthands: ColumnDefinitions | undefined = undefined;
export async function up(pgm: MigrationBuilder): Promise<void> {
pgm.createMaterializedView('nft_custody', {}, `
SELECT
DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient
FROM
nft_events
WHERE
canonical = true AND microblock_canonical = true
ORDER BY
asset_identifier DESC,
value DESC,
block_height DESC,
microblock_sequence DESC,
tx_index DESC,
event_index DESC
`);
pgm.createIndex('nft_custody', ['asset_identifier', 'value']);
pgm.createIndex('nft_custody', 'recipient');
}

View File

@@ -4600,7 +4600,11 @@ describe('api tests', () => {
const searchResult = await supertest(api.server).get(`/extended/v1/tx/0x1234/raw`);
expect(searchResult.status).toBe(404);
});
test('Success: nft events for address', async () => {
const addr1 = 'ST3J8EVYHVKH6XXPD61EE8XEHW4Y2K83861225AB1';
const addr2 = 'ST1HB64MAJ1MBV4CQ80GF01DZS4T1DSMX20ADCRA4';
const dbBlock: DbBlock = {
block_hash: '0xff',
index_block_hash: '0x1234',
@@ -4620,11 +4624,6 @@ describe('api tests', () => {
execution_cost_write_count: 0,
execution_cost_write_length: 0,
};
await db.updateBlock(client, dbBlock);
const addr1 = 'ST3J8EVYHVKH6XXPD61EE8XEHW4Y2K83861225AB1';
const addr2 = 'ST1HB64MAJ1MBV4CQ80GF01DZS4T1DSMX20ADCRA4';
const stxTx: DbTx = {
tx_id: '0x1111000000000000000000000000000000000000000000000000000000000000',
tx_index: 0,
@@ -4661,27 +4660,44 @@ describe('api tests', () => {
execution_cost_write_count: 0,
execution_cost_write_length: 0,
};
await db.updateTx(client, stxTx);
const nftEvent1: DbNftEvent = {
canonical: true,
event_type: DbEventTypeId.NonFungibleTokenAsset,
asset_event_type_id: DbAssetEventTypeId.Transfer,
event_index: 0,
tx_id: '0x1111000000000000000000000000000000000000000000000000000000000000',
tx_index: 1,
block_height: dbBlock.block_height,
asset_identifier: 'some-asset',
value: serializeCV(intCV(0)),
recipient: addr1,
sender: 'none',
};
const nftEvents: DbNftEvent[] = [];
for (let i = 0; i < 10; i++) {
await db.updateNftEvent(client, stxTx, nftEvent1);
nftEvents.push({
canonical: true,
event_type: DbEventTypeId.NonFungibleTokenAsset,
asset_event_type_id: DbAssetEventTypeId.Transfer,
event_index: 0,
tx_id: stxTx.tx_id,
tx_index: 1,
block_height: dbBlock.block_height,
asset_identifier: 'some-asset',
value: serializeCV(intCV(0)),
recipient: addr1,
sender: 'none',
});
}
await db.update({
block: dbBlock,
microblocks: [],
minerRewards: [],
txs: [
{
tx: stxTx,
stxLockEvents: [],
stxEvents: [],
ftEvents: [],
nftEvents: nftEvents,
contractLogEvents: [],
smartContracts: [],
names: [],
namespaces: [],
},
],
});
const limit = 2;
const offset = 0;
// test nft for given addresses
const result = await supertest(api.server).get(
`/extended/v1/address/${addr1}/nft_events?limit=${limit}&offset=${offset}`
@@ -4697,17 +4713,36 @@ describe('api tests', () => {
expect(result.body.nft_events[0].block_height).toBe(1);
expect(result.body.nft_events[0].value.repr).toBe('0');
const dbBlock2: DbBlock = {
block_hash: '0xffff',
index_block_hash: '0x123466',
parent_index_block_hash: '0x1234',
parent_block_hash: '0xff',
parent_microblock_hash: '',
parent_microblock_sequence: 0,
block_height: 2,
burn_block_time: 1594649995,
burn_block_hash: '0x123456',
burn_block_height: 124,
miner_txid: '0x4321',
canonical: true,
execution_cost_read_count: 0,
execution_cost_read_length: 0,
execution_cost_runtime: 0,
execution_cost_write_count: 0,
execution_cost_write_length: 0,
};
const stxTx1: DbTx = {
tx_id: '0x1111100000000000000000000000000000000000000000000000000000000000',
tx_id: '0x1111100000000000000000000000000000000000000000000000000000000001',
tx_index: 0,
anchor_mode: 3,
nonce: 0,
raw_tx: Buffer.alloc(0),
index_block_hash: dbBlock.index_block_hash,
block_hash: dbBlock.block_hash,
block_height: dbBlock.block_height,
burn_block_time: dbBlock.burn_block_time,
parent_burn_block_time: 1626122935,
index_block_hash: dbBlock2.index_block_hash,
block_hash: dbBlock2.block_hash,
block_height: dbBlock2.block_height,
burn_block_time: dbBlock2.burn_block_time,
parent_burn_block_time: 1626124935,
type_id: DbTxTypeId.TokenTransfer,
token_transfer_amount: 1n,
token_transfer_memo: Buffer.from('hi'),
@@ -4718,8 +4753,8 @@ describe('api tests', () => {
microblock_canonical: true,
microblock_sequence: I32_MAX,
microblock_hash: '',
parent_index_block_hash: dbBlock.parent_index_block_hash,
parent_block_hash: dbBlock.parent_block_hash,
parent_index_block_hash: dbBlock2.parent_index_block_hash,
parent_block_hash: dbBlock2.parent_block_hash,
post_conditions: Buffer.from([0x01, 0xf5]),
fee_rate: 1234n,
sponsored: false,
@@ -4733,22 +4768,37 @@ describe('api tests', () => {
execution_cost_write_count: 0,
execution_cost_write_length: 0,
};
await db.updateTx(client, stxTx1);
const nftEvent2: DbNftEvent = {
canonical: true,
event_type: DbEventTypeId.NonFungibleTokenAsset,
asset_event_type_id: DbAssetEventTypeId.Transfer,
event_index: 1,
tx_id: '0x1111100000000000000000000000000000000000000000000000000000000000',
tx_id: stxTx1.tx_id,
tx_index: 2,
block_height: dbBlock.block_height,
block_height: dbBlock2.block_height,
asset_identifier: 'some-asset',
value: serializeCV(intCV(0)),
recipient: addr2,
sender: 'none',
};
await db.updateNftEvent(client, stxTx, nftEvent2);
await db.update({
block: dbBlock2,
microblocks: [],
minerRewards: [],
txs: [
{
tx: stxTx1,
stxLockEvents: [],
stxEvents: [],
ftEvents: [],
nftEvents: [nftEvent2],
contractLogEvents: [],
smartContracts: [],
names: [],
namespaces: [],
},
],
});
const result1 = await supertest(api.server).get(`/extended/v1/address/${addr2}/nft_events`);
expect(result1.status).toBe(200);
@@ -4757,9 +4807,9 @@ describe('api tests', () => {
expect(result1.body.nft_events.length).toEqual(1);
expect(result1.body.nft_events[0].recipient).toBe(addr2);
expect(result1.body.nft_events[0].tx_id).toBe(
'0x1111100000000000000000000000000000000000000000000000000000000000'
'0x1111100000000000000000000000000000000000000000000000000000000001'
);
expect(result1.body.nft_events[0].block_height).toBe(1);
expect(result1.body.nft_events[0].block_height).toBe(2);
expect(result.body.nft_events[0].value.repr).toBe('0');
//check ownership for addr