fix: refresh materialized views in their own pg connection (#1356)

* feat: refresh views in their own conns

* test: proper mempool cache testing
This commit is contained in:
Rafael Cárdenas
2022-10-13 12:38:53 -05:00
committed by GitHub
parent f249ae70e7
commit 9433d3c9c2
2 changed files with 117 additions and 88 deletions

View File

@@ -188,6 +188,8 @@ export class PgWriteStore extends PgStore {
async update(data: DataStoreBlockUpdateData): Promise<void> {
const tokenMetadataQueueEntries: DbTokenMetadataQueueEntry[] = [];
let garbageCollectedMempoolTxs: string[] = [];
let batchedTxData: DataStoreTxEventData[] = [];
await this.sql.begin(async sql => {
const chainTip = await this.getChainTip(sql, false);
await this.handleReorg(sql, data.block, chainTip.blockHeight);
@@ -257,7 +259,7 @@ export class PgWriteStore extends PgStore {
data.block.execution_cost_write_count = totalCost.execution_cost_write_count;
data.block.execution_cost_write_length = totalCost.execution_cost_write_length;
let batchedTxData: DataStoreTxEventData[] = data.txs;
batchedTxData = data.txs;
// Find microblocks that weren't already inserted via the unconfirmed microblock event.
// This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time.
@@ -358,8 +360,6 @@ export class PgWriteStore extends PgStore {
await this.updateNames(sql, entry.tx, bnsName);
}
}
await this.refreshNftCustody(sql, batchedTxData);
await this.refreshMaterializedView(sql, 'chain_tip');
const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql);
if (mempoolGarbageResults.deletedTxs.length > 0) {
logger.verbose(
@@ -399,6 +399,10 @@ export class PgWriteStore extends PgStore {
}
});
await this.refreshNftCustody(batchedTxData);
await this.refreshMaterializedView('chain_tip');
await this.refreshMaterializedView('mempool_digest');
// Skip sending `PgNotifier` updates altogether if we're in the genesis block since this block is the
// event replay of the v1 blockchain.
if ((data.block.block_height > 1 || !isProdEnv) && this.notifier) {
@@ -529,6 +533,9 @@ export class PgWriteStore extends PgStore {
}
async updateMicroblocksInternal(data: DataStoreMicroblockUpdateData): Promise<void> {
const txData: DataStoreTxEventData[] = [];
let dbMicroblocks: DbMicroblock[] = [];
await this.sql.begin(async sql => {
// Sanity check: ensure incoming microblocks have a `parent_index_block_hash` that matches the API's
// current known canonical chain tip. We assume this holds true so incoming microblock data is always
@@ -550,7 +557,7 @@ export class PgWriteStore extends PgStore {
// The block height is just one after the current chain tip height
const blockHeight = chainTip.blockHeight + 1;
const dbMicroblocks = data.microblocks.map(mb => {
dbMicroblocks = data.microblocks.map(mb => {
const dbMicroBlock: DbMicroblock = {
canonical: true,
microblock_canonical: true,
@@ -570,8 +577,6 @@ export class PgWriteStore extends PgStore {
return dbMicroBlock;
});
const txs: DataStoreTxEventData[] = [];
for (const entry of data.txs) {
// Note: the properties block_hash and burn_block_time are empty here because the anchor block with that data doesn't yet exist.
const dbTx: DbTx = {
@@ -582,7 +587,7 @@ export class PgWriteStore extends PgStore {
// Set all the `block_height` properties for the related tx objects, since it wasn't known
// when creating the objects using only the stacks-node message payload.
txs.push({
txData.push({
tx: dbTx,
stxEvents: entry.stxEvents.map(e => ({ ...e, block_height: blockHeight })),
contractLogEvents: entry.contractLogEvents.map(e => ({
@@ -598,7 +603,7 @@ export class PgWriteStore extends PgStore {
});
}
await this.insertMicroblockData(sql, dbMicroblocks, txs);
await this.insertMicroblockData(sql, dbMicroblocks, txData);
// Find any microblocks that have been orphaned by this latest microblock chain tip.
// This function also checks that each microblock parent hash points to an existing microblock in the db.
@@ -646,24 +651,25 @@ export class PgWriteStore extends PgStore {
);
}
await this.refreshNftCustody(sql, txs, true);
await this.refreshMaterializedView(sql, 'chain_tip');
if (!this.isEventReplay) {
const mempoolStats = await this.getMempoolStatsInternal({ sql });
this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats);
}
if (this.notifier) {
for (const microblock of dbMicroblocks) {
await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash });
}
for (const tx of txs) {
await this.notifier.sendTx({ txId: tx.tx.tx_id });
}
await this.emitAddressTxUpdates(txs);
}
});
await this.refreshNftCustody(txData, true);
await this.refreshMaterializedView('chain_tip');
await this.refreshMaterializedView('mempool_digest');
if (this.notifier) {
for (const microblock of dbMicroblocks) {
await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash });
}
for (const tx of txData) {
await this.notifier.sendTx({ txId: tx.tx.tx_id });
}
await this.emitAddressTxUpdates(txData);
}
}
async updateStxLockEvent(sql: PgSqlClient, tx: DbTx, event: DbStxLockEvent) {
@@ -1313,13 +1319,12 @@ export class PgWriteStore extends PgStore {
updatedTxs.push(tx);
}
}
await this.refreshMaterializedView(sql, 'mempool_digest');
if (!this.isEventReplay) {
const mempoolStats = await this.getMempoolStatsInternal({ sql });
this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats);
}
});
await this.refreshMaterializedView('mempool_digest');
for (const tx of updatedTxs) {
await this.notifier?.sendTx({ txId: tx.tx_id });
}
@@ -1335,8 +1340,8 @@ export class PgWriteStore extends PgStore {
RETURNING ${sql(MEMPOOL_TX_COLUMNS)}
`;
updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r));
await this.refreshMaterializedView(sql, 'mempool_digest');
});
await this.refreshMaterializedView('mempool_digest');
for (const tx of updatedTxs) {
await this.notifier?.sendTx({ txId: tx.tx_id });
}
@@ -1964,7 +1969,6 @@ export class PgWriteStore extends PgStore {
WHERE tx_id IN ${sql(txIds)}
RETURNING tx_id
`;
await this.refreshMaterializedView(sql, 'mempool_digest');
const restoredTxs = updateResults.map(r => r.tx_id);
return { restoredTxs: restoredTxs };
}
@@ -1988,7 +1992,6 @@ export class PgWriteStore extends PgStore {
WHERE tx_id IN ${sql(txIds)}
RETURNING tx_id
`;
await this.refreshMaterializedView(sql, 'mempool_digest');
const removedTxs = updateResults.map(r => r.tx_id);
return { removedTxs: removedTxs };
}
@@ -2002,7 +2005,9 @@ export class PgWriteStore extends PgStore {
// Get threshold block.
const blockThreshold = process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? 256;
const cutoffResults = await sql<{ block_height: number }[]>`
SELECT (block_height - ${blockThreshold}) AS block_height FROM chain_tip
SELECT (MAX(block_height) - ${blockThreshold}) AS block_height
FROM blocks
WHERE canonical = TRUE
`;
if (cutoffResults.length != 1) {
return { deletedTxs: [] };
@@ -2016,7 +2021,6 @@ export class PgWriteStore extends PgStore {
WHERE pruned = FALSE AND receipt_block_height < ${cutoffBlockHeight}
RETURNING tx_id
`;
await this.refreshMaterializedView(sql, 'mempool_digest');
const deletedTxs = deletedTxResults.map(r => r.tx_id);
for (const txId of deletedTxs) {
await this.notifier?.sendTx({ txId: txId });
@@ -2437,11 +2441,12 @@ export class PgWriteStore extends PgStore {
/**
* Refreshes a Postgres materialized view.
* @param sql - Pg Client
* @param viewName - Materialized view name
* @param sql - Pg scoped client. Will use the default client if none specified
* @param skipDuringEventReplay - If we should skip refreshing during event replay
*/
async refreshMaterializedView(sql: PgSqlClient, viewName: string, skipDuringEventReplay = true) {
async refreshMaterializedView(viewName: string, sql?: PgSqlClient, skipDuringEventReplay = true) {
sql = sql ?? this.sql;
if (this.isEventReplay && skipDuringEventReplay) {
return;
}
@@ -2454,34 +2459,32 @@ export class PgWriteStore extends PgStore {
* @param txs - Transaction event data
* @param unanchored - If this refresh is requested from a block or microblock
*/
async refreshNftCustody(
sql: PgSqlClient,
txs: DataStoreTxEventData[],
unanchored: boolean = false
) {
const newNftEventCount = txs
.map(tx => tx.nftEvents.length)
.reduce((prev, cur) => prev + cur, 0);
if (newNftEventCount > 0) {
// Always refresh unanchored view since even if we're in a new anchored block we should update the
// unanchored state to the current one.
await this.refreshMaterializedView(sql, 'nft_custody_unanchored');
if (!unanchored) {
await this.refreshMaterializedView(sql, 'nft_custody');
async refreshNftCustody(txs: DataStoreTxEventData[], unanchored: boolean = false) {
await this.sql.begin(async sql => {
const newNftEventCount = txs
.map(tx => tx.nftEvents.length)
.reduce((prev, cur) => prev + cur, 0);
if (newNftEventCount > 0) {
// Always refresh unanchored view since even if we're in a new anchored block we should update the
// unanchored state to the current one.
await this.refreshMaterializedView('nft_custody_unanchored', sql);
if (!unanchored) {
await this.refreshMaterializedView('nft_custody', sql);
}
} else if (!unanchored) {
// Even if we didn't receive new NFT events in a new anchor block, we should check if we need to
// update the anchored view to reflect any changes made by previous microblocks.
const result = await sql<{ outdated: boolean }[]>`
WITH anchored_height AS (SELECT MAX(block_height) AS anchored FROM nft_custody),
unanchored_height AS (SELECT MAX(block_height) AS unanchored FROM nft_custody_unanchored)
SELECT unanchored > anchored AS outdated
FROM anchored_height CROSS JOIN unanchored_height
`;
if (result.length > 0 && result[0].outdated) {
await this.refreshMaterializedView('nft_custody', sql);
}
}
} else if (!unanchored) {
// Even if we didn't receive new NFT events in a new anchor block, we should check if we need to
// update the anchored view to reflect any changes made by previous microblocks.
const result = await sql<{ outdated: boolean }[]>`
WITH anchored_height AS (SELECT MAX(block_height) AS anchored FROM nft_custody),
unanchored_height AS (SELECT MAX(block_height) AS unanchored FROM nft_custody_unanchored)
SELECT unanchored > anchored AS outdated
FROM anchored_height CROSS JOIN unanchored_height
`;
if (result.length > 0 && result[0].outdated) {
await this.refreshMaterializedView(sql, 'nft_custody');
}
}
});
}
/**
@@ -2492,10 +2495,10 @@ export class PgWriteStore extends PgStore {
return;
}
await this.sql.begin(async sql => {
await this.refreshMaterializedView(sql, 'nft_custody', false);
await this.refreshMaterializedView(sql, 'nft_custody_unanchored', false);
await this.refreshMaterializedView(sql, 'chain_tip', false);
await this.refreshMaterializedView(sql, 'mempool_digest', false);
await this.refreshMaterializedView('nft_custody', sql, false);
await this.refreshMaterializedView('nft_custody_unanchored', sql, false);
await this.refreshMaterializedView('chain_tip', sql, false);
await this.refreshMaterializedView('mempool_digest', sql, false);
});
}
}

View File

@@ -342,7 +342,7 @@ describe('cache-control tests', () => {
block_height: 1,
index_block_hash: '0x01',
})
.addTx()
.addTx({ tx_id: '0x0001' })
.build();
await db.update(block1);
@@ -383,41 +383,67 @@ describe('cache-control tests', () => {
expect(request4.headers['etag'] !== etag1).toEqual(true);
const etag2 = request4.headers['etag'];
// Restore dropped tx.
await db.restoreMempoolTxs(client, ['0x1101']);
// Cache with new ETag now a miss, new ETag is the same as the original.
const request5 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag2);
expect(request5.status).toBe(200);
expect(request5.type).toBe('application/json');
expect(request5.headers['etag']).toEqual(etag1);
// Prune same tx.
await db.pruneMempoolTxs(client, ['0x1101']);
// ETag is now the same as when dropped.
const request6 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag1);
expect(request6.status).toBe(200);
expect(request6.type).toBe('application/json');
expect(request6.headers['etag']).toEqual(etag2);
// Garbage collect all txs.
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = '0';
// Prune the other tx from the mempool by confirming it into a block.
const block2 = new TestBlockBuilder({
block_height: 2,
index_block_hash: '0x02',
parent_index_block_hash: '0x01',
})
.addTx()
.addTx({ tx_id: '0x1102' })
.build();
await db.update(block2);
// Cache is now a miss and ETag is zero because mempool is empty.
const request5 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag2);
expect(request5.status).toBe(200);
expect(request5.type).toBe('application/json');
expect(request5.headers['etag']).toEqual('"0"');
const etag3 = request5.headers['etag'];
// Restore a tx back into the mempool by making its anchor block non-canonical.
const block2b = new TestBlockBuilder({
block_height: 2,
index_block_hash: '0x02bb',
parent_index_block_hash: '0x01',
})
.addTx({ tx_id: '0x0002' })
.build();
await db.update(block2b);
const block3 = new TestBlockBuilder({
block_height: 3,
index_block_hash: '0x03',
parent_index_block_hash: '0x02bb',
})
.addTx({ tx_id: '0x0003' })
.build();
await db.update(block3);
// Cache is now a miss and ETag is non-zero because mempool is not empty.
const request6 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag3);
expect(request6.status).toBe(200);
expect(request6.type).toBe('application/json');
expect(request6.headers['etag']).toEqual(etag2);
const etag4 = request6.headers['etag'];
// Garbage collect all txs.
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = '0';
const block4 = new TestBlockBuilder({
block_height: 4,
index_block_hash: '0x04',
parent_index_block_hash: '0x03',
})
.addTx({ tx_id: '0x0004' })
.build();
await db.update(block4);
// ETag zero once again.
const request7 = await supertest(api.server).get('/extended/v1/tx/mempool');
const request7 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag4);
expect(request7.status).toBe(200);
expect(request7.type).toBe('application/json');
expect(request7.headers['etag']).toEqual('"0"');