mirror of
https://github.com/alexgo-io/stacks-blockchain-api.git
synced 2026-01-12 22:43:34 +08:00
fix: refresh materialized views in their own pg connection (#1356)
* feat: refresh views in their own conns * test: proper mempool cache testing
This commit is contained in:
@@ -188,6 +188,8 @@ export class PgWriteStore extends PgStore {
|
||||
async update(data: DataStoreBlockUpdateData): Promise<void> {
|
||||
const tokenMetadataQueueEntries: DbTokenMetadataQueueEntry[] = [];
|
||||
let garbageCollectedMempoolTxs: string[] = [];
|
||||
let batchedTxData: DataStoreTxEventData[] = [];
|
||||
|
||||
await this.sql.begin(async sql => {
|
||||
const chainTip = await this.getChainTip(sql, false);
|
||||
await this.handleReorg(sql, data.block, chainTip.blockHeight);
|
||||
@@ -257,7 +259,7 @@ export class PgWriteStore extends PgStore {
|
||||
data.block.execution_cost_write_count = totalCost.execution_cost_write_count;
|
||||
data.block.execution_cost_write_length = totalCost.execution_cost_write_length;
|
||||
|
||||
let batchedTxData: DataStoreTxEventData[] = data.txs;
|
||||
batchedTxData = data.txs;
|
||||
|
||||
// Find microblocks that weren't already inserted via the unconfirmed microblock event.
|
||||
// This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time.
|
||||
@@ -358,8 +360,6 @@ export class PgWriteStore extends PgStore {
|
||||
await this.updateNames(sql, entry.tx, bnsName);
|
||||
}
|
||||
}
|
||||
await this.refreshNftCustody(sql, batchedTxData);
|
||||
await this.refreshMaterializedView(sql, 'chain_tip');
|
||||
const mempoolGarbageResults = await this.deleteGarbageCollectedMempoolTxs(sql);
|
||||
if (mempoolGarbageResults.deletedTxs.length > 0) {
|
||||
logger.verbose(
|
||||
@@ -399,6 +399,10 @@ export class PgWriteStore extends PgStore {
|
||||
}
|
||||
});
|
||||
|
||||
await this.refreshNftCustody(batchedTxData);
|
||||
await this.refreshMaterializedView('chain_tip');
|
||||
await this.refreshMaterializedView('mempool_digest');
|
||||
|
||||
// Skip sending `PgNotifier` updates altogether if we're in the genesis block since this block is the
|
||||
// event replay of the v1 blockchain.
|
||||
if ((data.block.block_height > 1 || !isProdEnv) && this.notifier) {
|
||||
@@ -529,6 +533,9 @@ export class PgWriteStore extends PgStore {
|
||||
}
|
||||
|
||||
async updateMicroblocksInternal(data: DataStoreMicroblockUpdateData): Promise<void> {
|
||||
const txData: DataStoreTxEventData[] = [];
|
||||
let dbMicroblocks: DbMicroblock[] = [];
|
||||
|
||||
await this.sql.begin(async sql => {
|
||||
// Sanity check: ensure incoming microblocks have a `parent_index_block_hash` that matches the API's
|
||||
// current known canonical chain tip. We assume this holds true so incoming microblock data is always
|
||||
@@ -550,7 +557,7 @@ export class PgWriteStore extends PgStore {
|
||||
|
||||
// The block height is just one after the current chain tip height
|
||||
const blockHeight = chainTip.blockHeight + 1;
|
||||
const dbMicroblocks = data.microblocks.map(mb => {
|
||||
dbMicroblocks = data.microblocks.map(mb => {
|
||||
const dbMicroBlock: DbMicroblock = {
|
||||
canonical: true,
|
||||
microblock_canonical: true,
|
||||
@@ -570,8 +577,6 @@ export class PgWriteStore extends PgStore {
|
||||
return dbMicroBlock;
|
||||
});
|
||||
|
||||
const txs: DataStoreTxEventData[] = [];
|
||||
|
||||
for (const entry of data.txs) {
|
||||
// Note: the properties block_hash and burn_block_time are empty here because the anchor block with that data doesn't yet exist.
|
||||
const dbTx: DbTx = {
|
||||
@@ -582,7 +587,7 @@ export class PgWriteStore extends PgStore {
|
||||
|
||||
// Set all the `block_height` properties for the related tx objects, since it wasn't known
|
||||
// when creating the objects using only the stacks-node message payload.
|
||||
txs.push({
|
||||
txData.push({
|
||||
tx: dbTx,
|
||||
stxEvents: entry.stxEvents.map(e => ({ ...e, block_height: blockHeight })),
|
||||
contractLogEvents: entry.contractLogEvents.map(e => ({
|
||||
@@ -598,7 +603,7 @@ export class PgWriteStore extends PgStore {
|
||||
});
|
||||
}
|
||||
|
||||
await this.insertMicroblockData(sql, dbMicroblocks, txs);
|
||||
await this.insertMicroblockData(sql, dbMicroblocks, txData);
|
||||
|
||||
// Find any microblocks that have been orphaned by this latest microblock chain tip.
|
||||
// This function also checks that each microblock parent hash points to an existing microblock in the db.
|
||||
@@ -646,24 +651,25 @@ export class PgWriteStore extends PgStore {
|
||||
);
|
||||
}
|
||||
|
||||
await this.refreshNftCustody(sql, txs, true);
|
||||
await this.refreshMaterializedView(sql, 'chain_tip');
|
||||
|
||||
if (!this.isEventReplay) {
|
||||
const mempoolStats = await this.getMempoolStatsInternal({ sql });
|
||||
this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats);
|
||||
}
|
||||
|
||||
if (this.notifier) {
|
||||
for (const microblock of dbMicroblocks) {
|
||||
await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash });
|
||||
}
|
||||
for (const tx of txs) {
|
||||
await this.notifier.sendTx({ txId: tx.tx.tx_id });
|
||||
}
|
||||
await this.emitAddressTxUpdates(txs);
|
||||
}
|
||||
});
|
||||
|
||||
await this.refreshNftCustody(txData, true);
|
||||
await this.refreshMaterializedView('chain_tip');
|
||||
await this.refreshMaterializedView('mempool_digest');
|
||||
|
||||
if (this.notifier) {
|
||||
for (const microblock of dbMicroblocks) {
|
||||
await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash });
|
||||
}
|
||||
for (const tx of txData) {
|
||||
await this.notifier.sendTx({ txId: tx.tx.tx_id });
|
||||
}
|
||||
await this.emitAddressTxUpdates(txData);
|
||||
}
|
||||
}
|
||||
|
||||
async updateStxLockEvent(sql: PgSqlClient, tx: DbTx, event: DbStxLockEvent) {
|
||||
@@ -1313,13 +1319,12 @@ export class PgWriteStore extends PgStore {
|
||||
updatedTxs.push(tx);
|
||||
}
|
||||
}
|
||||
await this.refreshMaterializedView(sql, 'mempool_digest');
|
||||
|
||||
if (!this.isEventReplay) {
|
||||
const mempoolStats = await this.getMempoolStatsInternal({ sql });
|
||||
this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats);
|
||||
}
|
||||
});
|
||||
await this.refreshMaterializedView('mempool_digest');
|
||||
for (const tx of updatedTxs) {
|
||||
await this.notifier?.sendTx({ txId: tx.tx_id });
|
||||
}
|
||||
@@ -1335,8 +1340,8 @@ export class PgWriteStore extends PgStore {
|
||||
RETURNING ${sql(MEMPOOL_TX_COLUMNS)}
|
||||
`;
|
||||
updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r));
|
||||
await this.refreshMaterializedView(sql, 'mempool_digest');
|
||||
});
|
||||
await this.refreshMaterializedView('mempool_digest');
|
||||
for (const tx of updatedTxs) {
|
||||
await this.notifier?.sendTx({ txId: tx.tx_id });
|
||||
}
|
||||
@@ -1964,7 +1969,6 @@ export class PgWriteStore extends PgStore {
|
||||
WHERE tx_id IN ${sql(txIds)}
|
||||
RETURNING tx_id
|
||||
`;
|
||||
await this.refreshMaterializedView(sql, 'mempool_digest');
|
||||
const restoredTxs = updateResults.map(r => r.tx_id);
|
||||
return { restoredTxs: restoredTxs };
|
||||
}
|
||||
@@ -1988,7 +1992,6 @@ export class PgWriteStore extends PgStore {
|
||||
WHERE tx_id IN ${sql(txIds)}
|
||||
RETURNING tx_id
|
||||
`;
|
||||
await this.refreshMaterializedView(sql, 'mempool_digest');
|
||||
const removedTxs = updateResults.map(r => r.tx_id);
|
||||
return { removedTxs: removedTxs };
|
||||
}
|
||||
@@ -2002,7 +2005,9 @@ export class PgWriteStore extends PgStore {
|
||||
// Get threshold block.
|
||||
const blockThreshold = process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? 256;
|
||||
const cutoffResults = await sql<{ block_height: number }[]>`
|
||||
SELECT (block_height - ${blockThreshold}) AS block_height FROM chain_tip
|
||||
SELECT (MAX(block_height) - ${blockThreshold}) AS block_height
|
||||
FROM blocks
|
||||
WHERE canonical = TRUE
|
||||
`;
|
||||
if (cutoffResults.length != 1) {
|
||||
return { deletedTxs: [] };
|
||||
@@ -2016,7 +2021,6 @@ export class PgWriteStore extends PgStore {
|
||||
WHERE pruned = FALSE AND receipt_block_height < ${cutoffBlockHeight}
|
||||
RETURNING tx_id
|
||||
`;
|
||||
await this.refreshMaterializedView(sql, 'mempool_digest');
|
||||
const deletedTxs = deletedTxResults.map(r => r.tx_id);
|
||||
for (const txId of deletedTxs) {
|
||||
await this.notifier?.sendTx({ txId: txId });
|
||||
@@ -2437,11 +2441,12 @@ export class PgWriteStore extends PgStore {
|
||||
|
||||
/**
|
||||
* Refreshes a Postgres materialized view.
|
||||
* @param sql - Pg Client
|
||||
* @param viewName - Materialized view name
|
||||
* @param sql - Pg scoped client. Will use the default client if none specified
|
||||
* @param skipDuringEventReplay - If we should skip refreshing during event replay
|
||||
*/
|
||||
async refreshMaterializedView(sql: PgSqlClient, viewName: string, skipDuringEventReplay = true) {
|
||||
async refreshMaterializedView(viewName: string, sql?: PgSqlClient, skipDuringEventReplay = true) {
|
||||
sql = sql ?? this.sql;
|
||||
if (this.isEventReplay && skipDuringEventReplay) {
|
||||
return;
|
||||
}
|
||||
@@ -2454,34 +2459,32 @@ export class PgWriteStore extends PgStore {
|
||||
* @param txs - Transaction event data
|
||||
* @param unanchored - If this refresh is requested from a block or microblock
|
||||
*/
|
||||
async refreshNftCustody(
|
||||
sql: PgSqlClient,
|
||||
txs: DataStoreTxEventData[],
|
||||
unanchored: boolean = false
|
||||
) {
|
||||
const newNftEventCount = txs
|
||||
.map(tx => tx.nftEvents.length)
|
||||
.reduce((prev, cur) => prev + cur, 0);
|
||||
if (newNftEventCount > 0) {
|
||||
// Always refresh unanchored view since even if we're in a new anchored block we should update the
|
||||
// unanchored state to the current one.
|
||||
await this.refreshMaterializedView(sql, 'nft_custody_unanchored');
|
||||
if (!unanchored) {
|
||||
await this.refreshMaterializedView(sql, 'nft_custody');
|
||||
async refreshNftCustody(txs: DataStoreTxEventData[], unanchored: boolean = false) {
|
||||
await this.sql.begin(async sql => {
|
||||
const newNftEventCount = txs
|
||||
.map(tx => tx.nftEvents.length)
|
||||
.reduce((prev, cur) => prev + cur, 0);
|
||||
if (newNftEventCount > 0) {
|
||||
// Always refresh unanchored view since even if we're in a new anchored block we should update the
|
||||
// unanchored state to the current one.
|
||||
await this.refreshMaterializedView('nft_custody_unanchored', sql);
|
||||
if (!unanchored) {
|
||||
await this.refreshMaterializedView('nft_custody', sql);
|
||||
}
|
||||
} else if (!unanchored) {
|
||||
// Even if we didn't receive new NFT events in a new anchor block, we should check if we need to
|
||||
// update the anchored view to reflect any changes made by previous microblocks.
|
||||
const result = await sql<{ outdated: boolean }[]>`
|
||||
WITH anchored_height AS (SELECT MAX(block_height) AS anchored FROM nft_custody),
|
||||
unanchored_height AS (SELECT MAX(block_height) AS unanchored FROM nft_custody_unanchored)
|
||||
SELECT unanchored > anchored AS outdated
|
||||
FROM anchored_height CROSS JOIN unanchored_height
|
||||
`;
|
||||
if (result.length > 0 && result[0].outdated) {
|
||||
await this.refreshMaterializedView('nft_custody', sql);
|
||||
}
|
||||
}
|
||||
} else if (!unanchored) {
|
||||
// Even if we didn't receive new NFT events in a new anchor block, we should check if we need to
|
||||
// update the anchored view to reflect any changes made by previous microblocks.
|
||||
const result = await sql<{ outdated: boolean }[]>`
|
||||
WITH anchored_height AS (SELECT MAX(block_height) AS anchored FROM nft_custody),
|
||||
unanchored_height AS (SELECT MAX(block_height) AS unanchored FROM nft_custody_unanchored)
|
||||
SELECT unanchored > anchored AS outdated
|
||||
FROM anchored_height CROSS JOIN unanchored_height
|
||||
`;
|
||||
if (result.length > 0 && result[0].outdated) {
|
||||
await this.refreshMaterializedView(sql, 'nft_custody');
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2492,10 +2495,10 @@ export class PgWriteStore extends PgStore {
|
||||
return;
|
||||
}
|
||||
await this.sql.begin(async sql => {
|
||||
await this.refreshMaterializedView(sql, 'nft_custody', false);
|
||||
await this.refreshMaterializedView(sql, 'nft_custody_unanchored', false);
|
||||
await this.refreshMaterializedView(sql, 'chain_tip', false);
|
||||
await this.refreshMaterializedView(sql, 'mempool_digest', false);
|
||||
await this.refreshMaterializedView('nft_custody', sql, false);
|
||||
await this.refreshMaterializedView('nft_custody_unanchored', sql, false);
|
||||
await this.refreshMaterializedView('chain_tip', sql, false);
|
||||
await this.refreshMaterializedView('mempool_digest', sql, false);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -342,7 +342,7 @@ describe('cache-control tests', () => {
|
||||
block_height: 1,
|
||||
index_block_hash: '0x01',
|
||||
})
|
||||
.addTx()
|
||||
.addTx({ tx_id: '0x0001' })
|
||||
.build();
|
||||
await db.update(block1);
|
||||
|
||||
@@ -383,41 +383,67 @@ describe('cache-control tests', () => {
|
||||
expect(request4.headers['etag'] !== etag1).toEqual(true);
|
||||
const etag2 = request4.headers['etag'];
|
||||
|
||||
// Restore dropped tx.
|
||||
await db.restoreMempoolTxs(client, ['0x1101']);
|
||||
|
||||
// Cache with new ETag now a miss, new ETag is the same as the original.
|
||||
const request5 = await supertest(api.server)
|
||||
.get('/extended/v1/tx/mempool')
|
||||
.set('If-None-Match', etag2);
|
||||
expect(request5.status).toBe(200);
|
||||
expect(request5.type).toBe('application/json');
|
||||
expect(request5.headers['etag']).toEqual(etag1);
|
||||
|
||||
// Prune same tx.
|
||||
await db.pruneMempoolTxs(client, ['0x1101']);
|
||||
|
||||
// ETag is now the same as when dropped.
|
||||
const request6 = await supertest(api.server)
|
||||
.get('/extended/v1/tx/mempool')
|
||||
.set('If-None-Match', etag1);
|
||||
expect(request6.status).toBe(200);
|
||||
expect(request6.type).toBe('application/json');
|
||||
expect(request6.headers['etag']).toEqual(etag2);
|
||||
|
||||
// Garbage collect all txs.
|
||||
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = '0';
|
||||
// Prune the other tx from the mempool by confirming it into a block.
|
||||
const block2 = new TestBlockBuilder({
|
||||
block_height: 2,
|
||||
index_block_hash: '0x02',
|
||||
parent_index_block_hash: '0x01',
|
||||
})
|
||||
.addTx()
|
||||
.addTx({ tx_id: '0x1102' })
|
||||
.build();
|
||||
await db.update(block2);
|
||||
|
||||
// Cache is now a miss and ETag is zero because mempool is empty.
|
||||
const request5 = await supertest(api.server)
|
||||
.get('/extended/v1/tx/mempool')
|
||||
.set('If-None-Match', etag2);
|
||||
expect(request5.status).toBe(200);
|
||||
expect(request5.type).toBe('application/json');
|
||||
expect(request5.headers['etag']).toEqual('"0"');
|
||||
const etag3 = request5.headers['etag'];
|
||||
|
||||
// Restore a tx back into the mempool by making its anchor block non-canonical.
|
||||
const block2b = new TestBlockBuilder({
|
||||
block_height: 2,
|
||||
index_block_hash: '0x02bb',
|
||||
parent_index_block_hash: '0x01',
|
||||
})
|
||||
.addTx({ tx_id: '0x0002' })
|
||||
.build();
|
||||
await db.update(block2b);
|
||||
const block3 = new TestBlockBuilder({
|
||||
block_height: 3,
|
||||
index_block_hash: '0x03',
|
||||
parent_index_block_hash: '0x02bb',
|
||||
})
|
||||
.addTx({ tx_id: '0x0003' })
|
||||
.build();
|
||||
await db.update(block3);
|
||||
|
||||
// Cache is now a miss and ETag is non-zero because mempool is not empty.
|
||||
const request6 = await supertest(api.server)
|
||||
.get('/extended/v1/tx/mempool')
|
||||
.set('If-None-Match', etag3);
|
||||
expect(request6.status).toBe(200);
|
||||
expect(request6.type).toBe('application/json');
|
||||
expect(request6.headers['etag']).toEqual(etag2);
|
||||
const etag4 = request6.headers['etag'];
|
||||
|
||||
// Garbage collect all txs.
|
||||
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = '0';
|
||||
const block4 = new TestBlockBuilder({
|
||||
block_height: 4,
|
||||
index_block_hash: '0x04',
|
||||
parent_index_block_hash: '0x03',
|
||||
})
|
||||
.addTx({ tx_id: '0x0004' })
|
||||
.build();
|
||||
await db.update(block4);
|
||||
|
||||
// ETag zero once again.
|
||||
const request7 = await supertest(api.server).get('/extended/v1/tx/mempool');
|
||||
const request7 = await supertest(api.server)
|
||||
.get('/extended/v1/tx/mempool')
|
||||
.set('If-None-Match', etag4);
|
||||
expect(request7.status).toBe(200);
|
||||
expect(request7.type).toBe('application/json');
|
||||
expect(request7.headers['etag']).toEqual('"0"');
|
||||
|
||||
Reference in New Issue
Block a user