fix: refresh materialized views in their own pg connection (#1356)

* feat: refresh views in their own conns

* test: proper mempool cache testing
This commit is contained in:
Rafael Cárdenas
2022-10-13 12:38:53 -05:00
committed by GitHub
parent f249ae70e7
commit 9433d3c9c2
2 changed files with 117 additions and 88 deletions

View File

@@ -188,6 +188,8 @@ export class PgWriteStore extends PgStore {
async update(data: DataStoreBlockUpdateData): Promise<void> { async update(data: DataStoreBlockUpdateData): Promise<void> {
const tokenMetadataQueueEntries: DbTokenMetadataQueueEntry[] = []; const tokenMetadataQueueEntries: DbTokenMetadataQueueEntry[] = [];
let garbageCollectedMempoolTxs: string[] = []; let garbageCollectedMempoolTxs: string[] = [];
let batchedTxData: DataStoreTxEventData[] = [];
await this.sql.begin(async sql => { await this.sql.begin(async sql => {
const chainTip = await this.getChainTip(sql, false); const chainTip = await this.getChainTip(sql, false);
await this.handleReorg(sql, data.block, chainTip.blockHeight); await this.handleReorg(sql, data.block, chainTip.blockHeight);
@@ -257,7 +259,7 @@ export class PgWriteStore extends PgStore {
data.block.execution_cost_write_count = totalCost.execution_cost_write_count; data.block.execution_cost_write_count = totalCost.execution_cost_write_count;
data.block.execution_cost_write_length = totalCost.execution_cost_write_length; data.block.execution_cost_write_length = totalCost.execution_cost_write_length;
let batchedTxData: DataStoreTxEventData[] = data.txs; batchedTxData = data.txs;
// Find microblocks that weren't already inserted via the unconfirmed microblock event. // Find microblocks that weren't already inserted via the unconfirmed microblock event.
// This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time. // This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time.
@@ -358,8 +360,6 @@ export class PgWriteStore extends PgStore {
await this.updateNames(sql, entry.tx, bnsName); await this.updateNames(sql, entry.tx, bnsName);
} }
} }
await this.refreshNftCustody(sql, batchedTxData);
await this.refreshMaterializedView(sql, 'chain_tip');
const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql); const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql);
if (mempoolGarbageResults.deletedTxs.length > 0) { if (mempoolGarbageResults.deletedTxs.length > 0) {
logger.verbose( logger.verbose(
@@ -399,6 +399,10 @@ export class PgWriteStore extends PgStore {
} }
}); });
await this.refreshNftCustody(batchedTxData);
await this.refreshMaterializedView('chain_tip');
await this.refreshMaterializedView('mempool_digest');
// Skip sending `PgNotifier` updates altogether if we're in the genesis block since this block is the // Skip sending `PgNotifier` updates altogether if we're in the genesis block since this block is the
// event replay of the v1 blockchain. // event replay of the v1 blockchain.
if ((data.block.block_height > 1 || !isProdEnv) && this.notifier) { if ((data.block.block_height > 1 || !isProdEnv) && this.notifier) {
@@ -529,6 +533,9 @@ export class PgWriteStore extends PgStore {
} }
async updateMicroblocksInternal(data: DataStoreMicroblockUpdateData): Promise<void> { async updateMicroblocksInternal(data: DataStoreMicroblockUpdateData): Promise<void> {
const txData: DataStoreTxEventData[] = [];
let dbMicroblocks: DbMicroblock[] = [];
await this.sql.begin(async sql => { await this.sql.begin(async sql => {
// Sanity check: ensure incoming microblocks have a `parent_index_block_hash` that matches the API's // Sanity check: ensure incoming microblocks have a `parent_index_block_hash` that matches the API's
// current known canonical chain tip. We assume this holds true so incoming microblock data is always // current known canonical chain tip. We assume this holds true so incoming microblock data is always
@@ -550,7 +557,7 @@ export class PgWriteStore extends PgStore {
// The block height is just one after the current chain tip height // The block height is just one after the current chain tip height
const blockHeight = chainTip.blockHeight + 1; const blockHeight = chainTip.blockHeight + 1;
const dbMicroblocks = data.microblocks.map(mb => { dbMicroblocks = data.microblocks.map(mb => {
const dbMicroBlock: DbMicroblock = { const dbMicroBlock: DbMicroblock = {
canonical: true, canonical: true,
microblock_canonical: true, microblock_canonical: true,
@@ -570,8 +577,6 @@ export class PgWriteStore extends PgStore {
return dbMicroBlock; return dbMicroBlock;
}); });
const txs: DataStoreTxEventData[] = [];
for (const entry of data.txs) { for (const entry of data.txs) {
// Note: the properties block_hash and burn_block_time are empty here because the anchor block with that data doesn't yet exist. // Note: the properties block_hash and burn_block_time are empty here because the anchor block with that data doesn't yet exist.
const dbTx: DbTx = { const dbTx: DbTx = {
@@ -582,7 +587,7 @@ export class PgWriteStore extends PgStore {
// Set all the `block_height` properties for the related tx objects, since it wasn't known // Set all the `block_height` properties for the related tx objects, since it wasn't known
// when creating the objects using only the stacks-node message payload. // when creating the objects using only the stacks-node message payload.
txs.push({ txData.push({
tx: dbTx, tx: dbTx,
stxEvents: entry.stxEvents.map(e => ({ ...e, block_height: blockHeight })), stxEvents: entry.stxEvents.map(e => ({ ...e, block_height: blockHeight })),
contractLogEvents: entry.contractLogEvents.map(e => ({ contractLogEvents: entry.contractLogEvents.map(e => ({
@@ -598,7 +603,7 @@ export class PgWriteStore extends PgStore {
}); });
} }
await this.insertMicroblockData(sql, dbMicroblocks, txs); await this.insertMicroblockData(sql, dbMicroblocks, txData);
// Find any microblocks that have been orphaned by this latest microblock chain tip. // Find any microblocks that have been orphaned by this latest microblock chain tip.
// This function also checks that each microblock parent hash points to an existing microblock in the db. // This function also checks that each microblock parent hash points to an existing microblock in the db.
@@ -646,24 +651,25 @@ export class PgWriteStore extends PgStore {
); );
} }
await this.refreshNftCustody(sql, txs, true);
await this.refreshMaterializedView(sql, 'chain_tip');
if (!this.isEventReplay) { if (!this.isEventReplay) {
const mempoolStats = await this.getMempoolStatsInternal({ sql }); const mempoolStats = await this.getMempoolStatsInternal({ sql });
this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats); this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats);
} }
if (this.notifier) {
for (const microblock of dbMicroblocks) {
await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash });
}
for (const tx of txs) {
await this.notifier.sendTx({ txId: tx.tx.tx_id });
}
await this.emitAddressTxUpdates(txs);
}
}); });
await this.refreshNftCustody(txData, true);
await this.refreshMaterializedView('chain_tip');
await this.refreshMaterializedView('mempool_digest');
if (this.notifier) {
for (const microblock of dbMicroblocks) {
await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash });
}
for (const tx of txData) {
await this.notifier.sendTx({ txId: tx.tx.tx_id });
}
await this.emitAddressTxUpdates(txData);
}
} }
async updateStxLockEvent(sql: PgSqlClient, tx: DbTx, event: DbStxLockEvent) { async updateStxLockEvent(sql: PgSqlClient, tx: DbTx, event: DbStxLockEvent) {
@@ -1313,13 +1319,12 @@ export class PgWriteStore extends PgStore {
updatedTxs.push(tx); updatedTxs.push(tx);
} }
} }
await this.refreshMaterializedView(sql, 'mempool_digest');
if (!this.isEventReplay) { if (!this.isEventReplay) {
const mempoolStats = await this.getMempoolStatsInternal({ sql }); const mempoolStats = await this.getMempoolStatsInternal({ sql });
this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats); this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats);
} }
}); });
await this.refreshMaterializedView('mempool_digest');
for (const tx of updatedTxs) { for (const tx of updatedTxs) {
await this.notifier?.sendTx({ txId: tx.tx_id }); await this.notifier?.sendTx({ txId: tx.tx_id });
} }
@@ -1335,8 +1340,8 @@ export class PgWriteStore extends PgStore {
RETURNING ${sql(MEMPOOL_TX_COLUMNS)} RETURNING ${sql(MEMPOOL_TX_COLUMNS)}
`; `;
updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r)); updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r));
await this.refreshMaterializedView(sql, 'mempool_digest');
}); });
await this.refreshMaterializedView('mempool_digest');
for (const tx of updatedTxs) { for (const tx of updatedTxs) {
await this.notifier?.sendTx({ txId: tx.tx_id }); await this.notifier?.sendTx({ txId: tx.tx_id });
} }
@@ -1964,7 +1969,6 @@ export class PgWriteStore extends PgStore {
WHERE tx_id IN ${sql(txIds)} WHERE tx_id IN ${sql(txIds)}
RETURNING tx_id RETURNING tx_id
`; `;
await this.refreshMaterializedView(sql, 'mempool_digest');
const restoredTxs = updateResults.map(r => r.tx_id); const restoredTxs = updateResults.map(r => r.tx_id);
return { restoredTxs: restoredTxs }; return { restoredTxs: restoredTxs };
} }
@@ -1988,7 +1992,6 @@ export class PgWriteStore extends PgStore {
WHERE tx_id IN ${sql(txIds)} WHERE tx_id IN ${sql(txIds)}
RETURNING tx_id RETURNING tx_id
`; `;
await this.refreshMaterializedView(sql, 'mempool_digest');
const removedTxs = updateResults.map(r => r.tx_id); const removedTxs = updateResults.map(r => r.tx_id);
return { removedTxs: removedTxs }; return { removedTxs: removedTxs };
} }
@@ -2002,7 +2005,9 @@ export class PgWriteStore extends PgStore {
// Get threshold block. // Get threshold block.
const blockThreshold = process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? 256; const blockThreshold = process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? 256;
const cutoffResults = await sql<{ block_height: number }[]>` const cutoffResults = await sql<{ block_height: number }[]>`
SELECT (block_height - ${blockThreshold}) AS block_height FROM chain_tip SELECT (MAX(block_height) - ${blockThreshold}) AS block_height
FROM blocks
WHERE canonical = TRUE
`; `;
if (cutoffResults.length != 1) { if (cutoffResults.length != 1) {
return { deletedTxs: [] }; return { deletedTxs: [] };
@@ -2016,7 +2021,6 @@ export class PgWriteStore extends PgStore {
WHERE pruned = FALSE AND receipt_block_height < ${cutoffBlockHeight} WHERE pruned = FALSE AND receipt_block_height < ${cutoffBlockHeight}
RETURNING tx_id RETURNING tx_id
`; `;
await this.refreshMaterializedView(sql, 'mempool_digest');
const deletedTxs = deletedTxResults.map(r => r.tx_id); const deletedTxs = deletedTxResults.map(r => r.tx_id);
for (const txId of deletedTxs) { for (const txId of deletedTxs) {
await this.notifier?.sendTx({ txId: txId }); await this.notifier?.sendTx({ txId: txId });
@@ -2437,11 +2441,12 @@ export class PgWriteStore extends PgStore {
/** /**
* Refreshes a Postgres materialized view. * Refreshes a Postgres materialized view.
* @param sql - Pg Client
* @param viewName - Materialized view name * @param viewName - Materialized view name
* @param sql - Pg scoped client. Will use the default client if none specified
* @param skipDuringEventReplay - If we should skip refreshing during event replay * @param skipDuringEventReplay - If we should skip refreshing during event replay
*/ */
async refreshMaterializedView(sql: PgSqlClient, viewName: string, skipDuringEventReplay = true) { async refreshMaterializedView(viewName: string, sql?: PgSqlClient, skipDuringEventReplay = true) {
sql = sql ?? this.sql;
if (this.isEventReplay && skipDuringEventReplay) { if (this.isEventReplay && skipDuringEventReplay) {
return; return;
} }
@@ -2454,34 +2459,32 @@ export class PgWriteStore extends PgStore {
* @param txs - Transaction event data * @param txs - Transaction event data
* @param unanchored - If this refresh is requested from a block or microblock * @param unanchored - If this refresh is requested from a block or microblock
*/ */
async refreshNftCustody( async refreshNftCustody(txs: DataStoreTxEventData[], unanchored: boolean = false) {
sql: PgSqlClient, await this.sql.begin(async sql => {
txs: DataStoreTxEventData[], const newNftEventCount = txs
unanchored: boolean = false .map(tx => tx.nftEvents.length)
) { .reduce((prev, cur) => prev + cur, 0);
const newNftEventCount = txs if (newNftEventCount > 0) {
.map(tx => tx.nftEvents.length) // Always refresh unanchored view since even if we're in a new anchored block we should update the
.reduce((prev, cur) => prev + cur, 0); // unanchored state to the current one.
if (newNftEventCount > 0) { await this.refreshMaterializedView('nft_custody_unanchored', sql);
// Always refresh unanchored view since even if we're in a new anchored block we should update the if (!unanchored) {
// unanchored state to the current one. await this.refreshMaterializedView('nft_custody', sql);
await this.refreshMaterializedView(sql, 'nft_custody_unanchored'); }
if (!unanchored) { } else if (!unanchored) {
await this.refreshMaterializedView(sql, 'nft_custody'); // Even if we didn't receive new NFT events in a new anchor block, we should check if we need to
// update the anchored view to reflect any changes made by previous microblocks.
const result = await sql<{ outdated: boolean }[]>`
WITH anchored_height AS (SELECT MAX(block_height) AS anchored FROM nft_custody),
unanchored_height AS (SELECT MAX(block_height) AS unanchored FROM nft_custody_unanchored)
SELECT unanchored > anchored AS outdated
FROM anchored_height CROSS JOIN unanchored_height
`;
if (result.length > 0 && result[0].outdated) {
await this.refreshMaterializedView('nft_custody', sql);
}
} }
} else if (!unanchored) { });
// Even if we didn't receive new NFT events in a new anchor block, we should check if we need to
// update the anchored view to reflect any changes made by previous microblocks.
const result = await sql<{ outdated: boolean }[]>`
WITH anchored_height AS (SELECT MAX(block_height) AS anchored FROM nft_custody),
unanchored_height AS (SELECT MAX(block_height) AS unanchored FROM nft_custody_unanchored)
SELECT unanchored > anchored AS outdated
FROM anchored_height CROSS JOIN unanchored_height
`;
if (result.length > 0 && result[0].outdated) {
await this.refreshMaterializedView(sql, 'nft_custody');
}
}
} }
/** /**
@@ -2492,10 +2495,10 @@ export class PgWriteStore extends PgStore {
return; return;
} }
await this.sql.begin(async sql => { await this.sql.begin(async sql => {
await this.refreshMaterializedView(sql, 'nft_custody', false); await this.refreshMaterializedView('nft_custody', sql, false);
await this.refreshMaterializedView(sql, 'nft_custody_unanchored', false); await this.refreshMaterializedView('nft_custody_unanchored', sql, false);
await this.refreshMaterializedView(sql, 'chain_tip', false); await this.refreshMaterializedView('chain_tip', sql, false);
await this.refreshMaterializedView(sql, 'mempool_digest', false); await this.refreshMaterializedView('mempool_digest', sql, false);
}); });
} }
} }

View File

@@ -342,7 +342,7 @@ describe('cache-control tests', () => {
block_height: 1, block_height: 1,
index_block_hash: '0x01', index_block_hash: '0x01',
}) })
.addTx() .addTx({ tx_id: '0x0001' })
.build(); .build();
await db.update(block1); await db.update(block1);
@@ -383,41 +383,67 @@ describe('cache-control tests', () => {
expect(request4.headers['etag'] !== etag1).toEqual(true); expect(request4.headers['etag'] !== etag1).toEqual(true);
const etag2 = request4.headers['etag']; const etag2 = request4.headers['etag'];
// Restore dropped tx. // Prune the other tx from the mempool by confirming it into a block.
await db.restoreMempoolTxs(client, ['0x1101']);
// Cache with new ETag now a miss, new ETag is the same as the original.
const request5 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag2);
expect(request5.status).toBe(200);
expect(request5.type).toBe('application/json');
expect(request5.headers['etag']).toEqual(etag1);
// Prune same tx.
await db.pruneMempoolTxs(client, ['0x1101']);
// ETag is now the same as when dropped.
const request6 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag1);
expect(request6.status).toBe(200);
expect(request6.type).toBe('application/json');
expect(request6.headers['etag']).toEqual(etag2);
// Garbage collect all txs.
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = '0';
const block2 = new TestBlockBuilder({ const block2 = new TestBlockBuilder({
block_height: 2, block_height: 2,
index_block_hash: '0x02', index_block_hash: '0x02',
parent_index_block_hash: '0x01', parent_index_block_hash: '0x01',
}) })
.addTx() .addTx({ tx_id: '0x1102' })
.build(); .build();
await db.update(block2); await db.update(block2);
// Cache is now a miss and ETag is zero because mempool is empty.
const request5 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag2);
expect(request5.status).toBe(200);
expect(request5.type).toBe('application/json');
expect(request5.headers['etag']).toEqual('"0"');
const etag3 = request5.headers['etag'];
// Restore a tx back into the mempool by making its anchor block non-canonical.
const block2b = new TestBlockBuilder({
block_height: 2,
index_block_hash: '0x02bb',
parent_index_block_hash: '0x01',
})
.addTx({ tx_id: '0x0002' })
.build();
await db.update(block2b);
const block3 = new TestBlockBuilder({
block_height: 3,
index_block_hash: '0x03',
parent_index_block_hash: '0x02bb',
})
.addTx({ tx_id: '0x0003' })
.build();
await db.update(block3);
// Cache is now a miss and ETag is non-zero because mempool is not empty.
const request6 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag3);
expect(request6.status).toBe(200);
expect(request6.type).toBe('application/json');
expect(request6.headers['etag']).toEqual(etag2);
const etag4 = request6.headers['etag'];
// Garbage collect all txs.
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = '0';
const block4 = new TestBlockBuilder({
block_height: 4,
index_block_hash: '0x04',
parent_index_block_hash: '0x03',
})
.addTx({ tx_id: '0x0004' })
.build();
await db.update(block4);
// ETag zero once again. // ETag zero once again.
const request7 = await supertest(api.server).get('/extended/v1/tx/mempool'); const request7 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag4);
expect(request7.status).toBe(200); expect(request7.status).toBe(200);
expect(request7.type).toBe('application/json'); expect(request7.type).toBe('application/json');
expect(request7.headers['etag']).toEqual('"0"'); expect(request7.headers['etag']).toEqual('"0"');