fix: optimize mempool transaction reads and writes (#1781) (#1792)

* fix: change mempool_digest into a table

* fix: change digest to be last updated timestamp

* fix: build

* fix: update count on reconcile

* test: mempool renconcile
This commit is contained in:
Rafael Cárdenas
2024-01-02 11:14:30 -06:00
committed by GitHub
parent aa287f8b6f
commit 2700642ed2
6 changed files with 180 additions and 136 deletions

View File

@@ -0,0 +1,27 @@
/* eslint-disable camelcase */
exports.shorthands = undefined;
exports.up = pgm => {
pgm.addColumn('chain_tip', {
mempool_tx_count: {
type: 'int',
default: 0,
},
mempool_updated_at: {
type: 'timestamptz',
default: pgm.func('(NOW())'),
},
});
pgm.sql(`
UPDATE chain_tip SET
mempool_tx_count = (SELECT COUNT(*)::int FROM mempool_txs WHERE pruned = FALSE),
mempool_updated_at = NOW()
`);
pgm.alterColumn('chain_tip', 'mempool_tx_count', { notNull: true });
pgm.alterColumn('chain_tip', 'mempool_updated_at', { notNull: true });
};
exports.down = pgm => {
pgm.dropColumn('chain_tip', ['mempool_tx_count', 'mempool_updated_at']);
};

View File

@@ -1306,12 +1306,13 @@ export class PgStore {
const unanchoredTxs: string[] = !includeUnanchored
? (await this.getUnanchoredTxsInternal(sql)).txs.map(tx => tx.tx_id)
: [];
// If caller is not filtering by any param, get the tx count from the `chain_tip` table.
const count =
senderAddress || recipientAddress || address
? sql`(COUNT(*) OVER())::int AS count`
: sql`(SELECT mempool_tx_count FROM chain_tip) AS count`;
const resultQuery = await sql<(MempoolTxQueryResult & { count: number })[]>`
SELECT ${unsafeCols(sql, [
...MEMPOOL_TX_COLUMNS,
abiColumn('mempool_txs'),
'(COUNT(*) OVER())::INTEGER AS count',
])}
SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}, ${count}
FROM mempool_txs
WHERE ${
address
@@ -1355,7 +1356,9 @@ export class PgStore {
* @returns `FoundOrNot` object with a possible `digest` string.
*/
async getMempoolTxDigest(): Promise<FoundOrNot<{ digest: string }>> {
const result = await this.sql<{ digest: string }[]>`SELECT digest FROM mempool_digest`;
const result = await this.sql<{ digest: string }[]>`
SELECT date_part('epoch', mempool_updated_at)::text AS digest FROM chain_tip
`;
if (result.length === 0) {
return { found: false } as const;
}

View File

@@ -705,8 +705,6 @@ export class PgWriteStore extends PgStore {
`;
});
await this.refreshMaterializedView('mempool_digest');
if (this.notifier) {
for (const microblock of dbMicroblocks) {
await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash });
@@ -736,6 +734,7 @@ export class PgWriteStore extends PgStore {
// NOTE: this is essentially a work-around for whatever bug is causing the underlying problem.
async reconcileMempoolStatus(sql: PgSqlClient): Promise<void> {
const txsResult = await sql<{ tx_id: string }[]>`
WITH pruned AS (
UPDATE mempool_txs
SET pruned = true
FROM txs
@@ -750,6 +749,13 @@ export class PgWriteStore extends PgStore {
DbTxStatus.AbortByPostCondition,
])}
RETURNING mempool_txs.tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
if (txsResult.length > 0) {
const txs = txsResult.map(tx => tx.tx_id);
@@ -1656,12 +1662,14 @@ export class PgWriteStore extends PgStore {
return result.count;
}
async insertDbMempoolTx(
tx: DbMempoolTxRaw,
async insertDbMempoolTxs(
txs: DbMempoolTxRaw[],
chainTip: DbChainTip,
sql: PgSqlClient
): Promise<boolean> {
const values: MempoolTxInsertValues = {
): Promise<string[]> {
const txIds: string[] = [];
for (const batch of batchIterate(txs, 500)) {
const values: MempoolTxInsertValues[] = batch.map(tx => ({
pruned: tx.pruned,
tx_id: tx.tx_id,
raw_tx: tx.raw_tx,
@@ -1691,54 +1699,58 @@ export class PgWriteStore extends PgStore {
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
coinbase_payload: tx.coinbase_payload ?? null,
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
};
const result = await sql`
}));
const result = await sql<{ tx_id: string }[]>`
WITH inserted AS (
INSERT INTO mempool_txs ${sql(values)}
ON CONFLICT ON CONSTRAINT unique_tx_id DO NOTHING
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count + (SELECT COUNT(*) FROM inserted),
mempool_updated_at = NOW()
)
SELECT tx_id FROM inserted
`;
if (result.count !== 1) {
const errMsg = `A duplicate transaction was attempted to be inserted into the mempool_txs table: ${tx.tx_id}`;
logger.warn(errMsg);
return false;
} else {
return true;
txIds.push(...result.map(r => r.tx_id));
}
return txIds;
}
async updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTxRaw[] }): Promise<void> {
const updatedTxIds: string[] = [];
await this.sqlWriteTransaction(async sql => {
const chainTip = await this.getChainTip();
for (const tx of txs) {
const inserted = await this.insertDbMempoolTx(tx, chainTip, sql);
if (inserted) {
updatedTxIds.push(tx.tx_id);
}
}
updatedTxIds.push(...(await this.insertDbMempoolTxs(txs, chainTip, sql)));
if (!this.isEventReplay) {
await this.reconcileMempoolStatus(sql);
const mempoolStats = await this.getMempoolStatsInternal({ sql });
this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats);
}
});
await this.refreshMaterializedView('mempool_digest');
for (const txId of updatedTxIds) {
await this.notifier?.sendTx({ txId: txId });
await this.notifier?.sendTx({ txId });
}
}
async dropMempoolTxs({ status, txIds }: { status: DbTxStatus; txIds: string[] }): Promise<void> {
const updateResults = await this.sql<MempoolTxQueryResult[]>`
const updateResults = await this.sql<{ tx_id: string }[]>`
WITH pruned AS (
UPDATE mempool_txs
SET pruned = true, status = ${status}
WHERE tx_id IN ${this.sql(txIds)}
RETURNING ${this.sql(MEMPOOL_TX_COLUMNS)}
SET pruned = TRUE, status = ${status}
WHERE tx_id IN ${this.sql(txIds)} AND pruned = FALSE
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
const updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r));
await this.refreshMaterializedView('mempool_digest');
for (const tx of updatedTxs) {
await this.notifier?.sendTx({ txId: tx.tx_id });
for (const txId of updateResults.map(r => r.tx_id)) {
await this.notifier?.sendTx({ txId });
}
}
@@ -2326,19 +2338,24 @@ export class PgWriteStore extends PgStore {
* @param txIds - List of transactions to update in the mempool
*/
async restoreMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ restoredTxs: string[] }> {
if (txIds.length === 0) {
// Avoid an unnecessary query.
return { restoredTxs: [] };
}
if (txIds.length === 0) return { restoredTxs: [] };
for (const txId of txIds) {
logger.debug(`Restoring mempool tx: ${txId}`);
}
const updatedRows = await sql<{ tx_id: string }[]>`
WITH restored AS (
UPDATE mempool_txs
SET pruned = false
WHERE tx_id IN ${sql(txIds)}
SET pruned = FALSE
WHERE tx_id IN ${sql(txIds)} AND pruned = TRUE
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count + (SELECT COUNT(*) FROM restored),
mempool_updated_at = NOW()
)
SELECT tx_id FROM restored
`;
const updatedTxs = updatedRows.map(r => r.tx_id);
@@ -2393,13 +2410,20 @@ export class PgWriteStore extends PgStore {
logger.debug(`Pruning mempool tx: ${txId}`);
}
const updateResults = await sql<{ tx_id: string }[]>`
WITH pruned AS (
UPDATE mempool_txs
SET pruned = true
WHERE tx_id IN ${sql(txIds)}
WHERE tx_id IN ${sql(txIds)} AND pruned = FALSE
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
const removedTxs = updateResults.map(r => r.tx_id);
return { removedTxs: removedTxs };
return { removedTxs: updateResults.map(r => r.tx_id) };
}
/**
@@ -2408,27 +2432,26 @@ export class PgWriteStore extends PgStore {
* @returns List of deleted `tx_id`s
*/
async deleteGarbageCollectedMempoolTxs(sql: PgSqlClient): Promise<{ deletedTxs: string[] }> {
// Get threshold block.
const blockThreshold = process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? 256;
const cutoffResults = await sql<{ block_height: number }[]>`
SELECT (MAX(block_height) - ${blockThreshold}) AS block_height
FROM blocks
WHERE canonical = TRUE
`;
if (cutoffResults.length != 1) {
return { deletedTxs: [] };
}
const cutoffBlockHeight = cutoffResults[0].block_height;
// Delete every mempool tx that came before that block.
const blockThreshold = parseInt(
process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? '256'
);
// TODO: Use DELETE instead of UPDATE once we implement a non-archival API replay mode.
const deletedTxResults = await sql<{ tx_id: string }[]>`
WITH pruned AS (
UPDATE mempool_txs
SET pruned = TRUE, status = ${DbTxStatus.DroppedApiGarbageCollect}
WHERE pruned = FALSE AND receipt_block_height < ${cutoffBlockHeight}
WHERE pruned = FALSE
AND receipt_block_height <= (SELECT block_height - ${blockThreshold} FROM chain_tip)
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
const deletedTxs = deletedTxResults.map(r => r.tx_id);
return { deletedTxs: deletedTxs };
return { deletedTxs: deletedTxResults.map(r => r.tx_id) };
}
async markEntitiesCanonical(

View File

@@ -167,7 +167,6 @@ export async function importEventsFromTsv(
responses.push(response);
}
}
await db.finishEventReplay();
console.log(`Event import and playback successful.`);
await eventServer.closeAsync();
await db.close();

View File

@@ -373,7 +373,7 @@ describe('cache-control tests', () => {
const request1 = await supertest(api.server).get('/extended/v1/tx/mempool');
expect(request1.status).toBe(200);
expect(request1.type).toBe('application/json');
expect(request1.headers['etag']).toEqual('"0"');
const etag0 = request1.headers['etag'];
// Add mempool txs.
const mempoolTx1 = testMempoolTx({ tx_id: '0x1101' });
@@ -386,6 +386,7 @@ describe('cache-control tests', () => {
expect(request2.type).toBe('application/json');
expect(request2.headers['etag']).toBeTruthy();
const etag1 = request2.headers['etag'];
expect(etag1).not.toEqual(etag0);
// Cache works with valid ETag.
const request3 = await supertest(api.server)
@@ -416,13 +417,12 @@ describe('cache-control tests', () => {
.build();
await db.update(block2);
// Cache is now a miss and ETag is zero because mempool is empty.
// Cache is now a miss.
const request5 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag2);
expect(request5.status).toBe(200);
expect(request5.type).toBe('application/json');
expect(request5.headers['etag']).toEqual('"0"');
const etag3 = request5.headers['etag'];
// Restore a tx back into the mempool by making its anchor block non-canonical.
@@ -449,7 +449,7 @@ describe('cache-control tests', () => {
.set('If-None-Match', etag3);
expect(request6.status).toBe(200);
expect(request6.type).toBe('application/json');
expect(request6.headers['etag']).toEqual(etag2);
expect(request6.headers['etag']).not.toEqual(etag3);
const etag4 = request6.headers['etag'];
// Garbage collect all txs.
@@ -463,25 +463,13 @@ describe('cache-control tests', () => {
.build();
await db.update(block4);
// ETag zero once again.
// ETag changes once again.
const request7 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag4);
expect(request7.status).toBe(200);
expect(request7.type).toBe('application/json');
expect(request7.headers['etag']).toEqual('"0"');
// Simulate an incompatible pg version (without `bit_xor`).
await client.begin(async sql => {
await sql`DROP MATERIALIZED VIEW mempool_digest`;
await sql`CREATE MATERIALIZED VIEW mempool_digest AS (SELECT NULL AS digest)`;
});
// ETag is undefined as if mempool cache did not exist.
const request8 = await supertest(api.server).get('/extended/v1/tx/mempool');
expect(request8.status).toBe(200);
expect(request8.type).toBe('application/json');
expect(request8.headers['etag']).toBeUndefined();
expect(request7.headers['etag']).not.toEqual(etag4);
});
test('transaction cache control', async () => {

View File

@@ -1489,7 +1489,7 @@ describe('mempool tests', () => {
// directly inserting the mempool-tx and mined-tx, bypassing the normal update functions.
await db.updateBlock(db.sql, dbBlock1);
const chainTip = await db.getChainTip();
await db.insertDbMempoolTx(mempoolTx, chainTip, db.sql);
await db.insertDbMempoolTxs([mempoolTx], chainTip, db.sql);
await db.updateTx(db.sql, dbTx1);
// Verify tx shows up in mempool (non-pruned)
@@ -1497,6 +1497,8 @@ describe('mempool tests', () => {
`/extended/v1/address/${mempoolTx.sender_address}/mempool`
);
expect(mempoolResult1.body.results[0].tx_id).toBe(txId);
const mempoolCount1 = await supertest(api.server).get(`/extended/v1/tx/mempool`);
expect(mempoolCount1.body.total).toBe(1);
const mempoolResult2 = await supertest(api.server).get(
`/extended/v1/tx/mempool?sender_address=${senderAddress}`
);
@@ -1519,6 +1521,8 @@ describe('mempool tests', () => {
`/extended/v1/address/${mempoolTx.sender_address}/mempool`
);
expect(mempoolResult3.body.results).toHaveLength(0);
const mempoolCount2 = await supertest(api.server).get(`/extended/v1/tx/mempool`);
expect(mempoolCount2.body.total).toBe(0);
const mempoolResult4 = await supertest(api.server).get(
`/extended/v1/tx/mempool?sender_address=${senderAddress}`
);