fix: optimize mempool transaction reads and writes (#1781)

* fix: change mempool_digest into a table

* fix: change digest to be last updated timestamp

* fix: build

* fix: update count on reconcile

* test: mempool renconcile
This commit is contained in:
Rafael Cárdenas
2024-01-02 09:59:07 -06:00
committed by GitHub
parent e8cccdd46d
commit 3a02f5741f
7 changed files with 191 additions and 174 deletions

View File

@@ -0,0 +1,27 @@
/* eslint-disable camelcase */
exports.shorthands = undefined;
exports.up = pgm => {
pgm.addColumn('chain_tip', {
mempool_tx_count: {
type: 'int',
default: 0,
},
mempool_updated_at: {
type: 'timestamptz',
default: pgm.func('(NOW())'),
},
});
pgm.sql(`
UPDATE chain_tip SET
mempool_tx_count = (SELECT COUNT(*)::int FROM mempool_txs WHERE pruned = FALSE),
mempool_updated_at = NOW()
`);
pgm.alterColumn('chain_tip', 'mempool_tx_count', { notNull: true });
pgm.alterColumn('chain_tip', 'mempool_updated_at', { notNull: true });
};
exports.down = pgm => {
pgm.dropColumn('chain_tip', ['mempool_tx_count', 'mempool_updated_at']);
};

View File

@@ -1474,12 +1474,13 @@ export class PgStore extends BasePgStore {
const unanchoredTxs: string[] = !includeUnanchored
? (await this.getUnanchoredTxsInternal(sql)).txs.map(tx => tx.tx_id)
: [];
// If caller is not filtering by any param, get the tx count from the `chain_tip` table.
const count =
senderAddress || recipientAddress || address
? sql`(COUNT(*) OVER())::int AS count`
: sql`(SELECT mempool_tx_count FROM chain_tip) AS count`;
const resultQuery = await sql<(MempoolTxQueryResult & { count: number })[]>`
SELECT ${unsafeCols(sql, [
...MEMPOOL_TX_COLUMNS,
abiColumn('mempool_txs'),
'(COUNT(*) OVER())::INTEGER AS count',
])}
SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}, ${count}
FROM mempool_txs
WHERE ${
address
@@ -1523,7 +1524,9 @@ export class PgStore extends BasePgStore {
* @returns `FoundOrNot` object with a possible `digest` string.
*/
async getMempoolTxDigest(): Promise<FoundOrNot<{ digest: string }>> {
const result = await this.sql<{ digest: string }[]>`SELECT digest FROM mempool_digest`;
const result = await this.sql<{ digest: string }[]>`
SELECT date_part('epoch', mempool_updated_at)::text AS digest FROM chain_tip
`;
if (result.length === 0) {
return { found: false } as const;
}

View File

@@ -293,8 +293,6 @@ export class PgWriteStore extends PgStore {
tx_count = (SELECT tx_count FROM new_tx_count),
tx_count_unanchored = (SELECT tx_count FROM new_tx_count)
`;
await this.refreshMaterializedView('mempool_digest');
});
// Do we have an IBD height defined in ENV? If so, check if this block update reached it.
const ibdHeight = getIbdBlockHeight();
@@ -686,8 +684,6 @@ export class PgWriteStore extends PgStore {
`;
});
await this.refreshMaterializedView('mempool_digest');
if (this.notifier) {
for (const microblock of dbMicroblocks) {
await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash });
@@ -717,20 +713,28 @@ export class PgWriteStore extends PgStore {
// NOTE: this is essentially a work-around for whatever bug is causing the underlying problem.
async reconcileMempoolStatus(sql: PgSqlClient): Promise<void> {
const txsResult = await sql<{ tx_id: string }[]>`
UPDATE mempool_txs
SET pruned = true
FROM txs
WHERE
mempool_txs.tx_id = txs.tx_id AND
mempool_txs.pruned = false AND
txs.canonical = true AND
txs.microblock_canonical = true AND
txs.status IN ${sql([
DbTxStatus.Success,
DbTxStatus.AbortByResponse,
DbTxStatus.AbortByPostCondition,
])}
RETURNING mempool_txs.tx_id
WITH pruned AS (
UPDATE mempool_txs
SET pruned = true
FROM txs
WHERE
mempool_txs.tx_id = txs.tx_id AND
mempool_txs.pruned = false AND
txs.canonical = true AND
txs.microblock_canonical = true AND
txs.status IN ${sql([
DbTxStatus.Success,
DbTxStatus.AbortByResponse,
DbTxStatus.AbortByPostCondition,
])}
RETURNING mempool_txs.tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
if (txsResult.length > 0) {
const txs = txsResult.map(tx => tx.tx_id);
@@ -1630,99 +1634,106 @@ export class PgWriteStore extends PgStore {
return result.count;
}
async insertDbMempoolTx(
tx: DbMempoolTxRaw,
async insertDbMempoolTxs(
txs: DbMempoolTxRaw[],
chainTip: DbChainTip,
sql: PgSqlClient
): Promise<boolean> {
const values: MempoolTxInsertValues = {
pruned: tx.pruned,
tx_id: tx.tx_id,
raw_tx: tx.raw_tx,
type_id: tx.type_id,
anchor_mode: tx.anchor_mode,
status: tx.status,
receipt_time: tx.receipt_time,
receipt_block_height: chainTip.block_height,
post_conditions: tx.post_conditions,
nonce: tx.nonce,
fee_rate: tx.fee_rate,
sponsored: tx.sponsored,
sponsor_nonce: tx.sponsor_nonce ?? null,
sponsor_address: tx.sponsor_address ?? null,
sender_address: tx.sender_address,
origin_hash_mode: tx.origin_hash_mode,
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
token_transfer_amount: tx.token_transfer_amount ?? null,
token_transfer_memo: tx.token_transfer_memo ?? null,
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
smart_contract_source_code: tx.smart_contract_source_code ?? null,
contract_call_contract_id: tx.contract_call_contract_id ?? null,
contract_call_function_name: tx.contract_call_function_name ?? null,
contract_call_function_args: tx.contract_call_function_args ?? null,
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
coinbase_payload: tx.coinbase_payload ?? null,
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
coinbase_vrf_proof: tx.coinbase_vrf_proof ?? null,
tenure_change_tenure_consensus_hash: tx.tenure_change_tenure_consensus_hash ?? null,
tenure_change_prev_tenure_consensus_hash: tx.tenure_change_prev_tenure_consensus_hash ?? null,
tenure_change_burn_view_consensus_hash: tx.tenure_change_burn_view_consensus_hash ?? null,
tenure_change_previous_tenure_end: tx.tenure_change_previous_tenure_end ?? null,
tenure_change_previous_tenure_blocks: tx.tenure_change_previous_tenure_blocks ?? null,
tenure_change_cause: tx.tenure_change_cause ?? null,
tenure_change_pubkey_hash: tx.tenure_change_pubkey_hash ?? null,
tenure_change_signature: tx.tenure_change_signature ?? null,
tenure_change_signers: tx.tenure_change_signers ?? null,
};
const result = await sql`
INSERT INTO mempool_txs ${sql(values)}
ON CONFLICT ON CONSTRAINT unique_tx_id DO NOTHING
`;
if (result.count !== 1) {
const errMsg = `A duplicate transaction was attempted to be inserted into the mempool_txs table: ${tx.tx_id}`;
logger.warn(errMsg);
return false;
} else {
return true;
): Promise<string[]> {
const txIds: string[] = [];
for (const batch of batchIterate(txs, INSERT_BATCH_SIZE)) {
const values: MempoolTxInsertValues[] = batch.map(tx => ({
pruned: tx.pruned,
tx_id: tx.tx_id,
raw_tx: tx.raw_tx,
type_id: tx.type_id,
anchor_mode: tx.anchor_mode,
status: tx.status,
receipt_time: tx.receipt_time,
receipt_block_height: chainTip.block_height,
post_conditions: tx.post_conditions,
nonce: tx.nonce,
fee_rate: tx.fee_rate,
sponsored: tx.sponsored,
sponsor_nonce: tx.sponsor_nonce ?? null,
sponsor_address: tx.sponsor_address ?? null,
sender_address: tx.sender_address,
origin_hash_mode: tx.origin_hash_mode,
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
token_transfer_amount: tx.token_transfer_amount ?? null,
token_transfer_memo: tx.token_transfer_memo ?? null,
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
smart_contract_source_code: tx.smart_contract_source_code ?? null,
contract_call_contract_id: tx.contract_call_contract_id ?? null,
contract_call_function_name: tx.contract_call_function_name ?? null,
contract_call_function_args: tx.contract_call_function_args ?? null,
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
coinbase_payload: tx.coinbase_payload ?? null,
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
coinbase_vrf_proof: tx.coinbase_vrf_proof ?? null,
tenure_change_tenure_consensus_hash: tx.tenure_change_tenure_consensus_hash ?? null,
tenure_change_prev_tenure_consensus_hash:
tx.tenure_change_prev_tenure_consensus_hash ?? null,
tenure_change_burn_view_consensus_hash: tx.tenure_change_burn_view_consensus_hash ?? null,
tenure_change_previous_tenure_end: tx.tenure_change_previous_tenure_end ?? null,
tenure_change_previous_tenure_blocks: tx.tenure_change_previous_tenure_blocks ?? null,
tenure_change_cause: tx.tenure_change_cause ?? null,
tenure_change_pubkey_hash: tx.tenure_change_pubkey_hash ?? null,
tenure_change_signature: tx.tenure_change_signature ?? null,
tenure_change_signers: tx.tenure_change_signers ?? null,
}));
const result = await sql<{ tx_id: string }[]>`
WITH inserted AS (
INSERT INTO mempool_txs ${sql(values)}
ON CONFLICT ON CONSTRAINT unique_tx_id DO NOTHING
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count + (SELECT COUNT(*) FROM inserted),
mempool_updated_at = NOW()
)
SELECT tx_id FROM inserted
`;
txIds.push(...result.map(r => r.tx_id));
}
return txIds;
}
async updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTxRaw[] }): Promise<void> {
const updatedTxIds: string[] = [];
await this.sqlWriteTransaction(async sql => {
const chainTip = await this.getChainTip();
for (const tx of txs) {
const inserted = await this.insertDbMempoolTx(tx, chainTip, sql);
if (inserted) {
updatedTxIds.push(tx.tx_id);
}
}
updatedTxIds.push(...(await this.insertDbMempoolTxs(txs, chainTip, sql)));
if (!this.isEventReplay) {
await this.reconcileMempoolStatus(sql);
const mempoolStats = await this.getMempoolStatsInternal({ sql });
this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats);
}
});
await this.refreshMaterializedView('mempool_digest');
for (const txId of updatedTxIds) {
await this.notifier?.sendTx({ txId: txId });
await this.notifier?.sendTx({ txId });
}
}
async dropMempoolTxs({ status, txIds }: { status: DbTxStatus; txIds: string[] }): Promise<void> {
const updateResults = await this.sql<MempoolTxQueryResult[]>`
UPDATE mempool_txs
SET pruned = true, status = ${status}
WHERE tx_id IN ${this.sql(txIds)}
RETURNING ${this.sql(MEMPOOL_TX_COLUMNS)}
const updateResults = await this.sql<{ tx_id: string }[]>`
WITH pruned AS (
UPDATE mempool_txs
SET pruned = TRUE, status = ${status}
WHERE tx_id IN ${this.sql(txIds)} AND pruned = FALSE
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
const updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r));
await this.refreshMaterializedView('mempool_digest');
for (const tx of updatedTxs) {
await this.notifier?.sendTx({ txId: tx.tx_id });
for (const txId of updateResults.map(r => r.tx_id)) {
await this.notifier?.sendTx({ txId });
}
}
@@ -2286,19 +2297,24 @@ export class PgWriteStore extends PgStore {
* @param txIds - List of transactions to update in the mempool
*/
async restoreMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ restoredTxs: string[] }> {
if (txIds.length === 0) {
// Avoid an unnecessary query.
return { restoredTxs: [] };
}
if (txIds.length === 0) return { restoredTxs: [] };
for (const txId of txIds) {
logger.debug(`Restoring mempool tx: ${txId}`);
}
const updatedRows = await sql<{ tx_id: string }[]>`
UPDATE mempool_txs
SET pruned = false
WHERE tx_id IN ${sql(txIds)}
RETURNING tx_id
WITH restored AS (
UPDATE mempool_txs
SET pruned = FALSE
WHERE tx_id IN ${sql(txIds)} AND pruned = TRUE
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count + (SELECT COUNT(*) FROM restored),
mempool_updated_at = NOW()
)
SELECT tx_id FROM restored
`;
const updatedTxs = updatedRows.map(r => r.tx_id);
@@ -2350,13 +2366,20 @@ export class PgWriteStore extends PgStore {
logger.debug(`Pruning mempool tx: ${txId}`);
}
const updateResults = await sql<{ tx_id: string }[]>`
UPDATE mempool_txs
SET pruned = true
WHERE tx_id IN ${sql(txIds)}
RETURNING tx_id
WITH pruned AS (
UPDATE mempool_txs
SET pruned = true
WHERE tx_id IN ${sql(txIds)} AND pruned = FALSE
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
const removedTxs = updateResults.map(r => r.tx_id);
return { removedTxs: removedTxs };
return { removedTxs: updateResults.map(r => r.tx_id) };
}
/**
@@ -2365,27 +2388,26 @@ export class PgWriteStore extends PgStore {
* @returns List of deleted `tx_id`s
*/
async deleteGarbageCollectedMempoolTxs(sql: PgSqlClient): Promise<{ deletedTxs: string[] }> {
// Get threshold block.
const blockThreshold = process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? 256;
const cutoffResults = await sql<{ block_height: number }[]>`
SELECT (MAX(block_height) - ${blockThreshold}) AS block_height
FROM blocks
WHERE canonical = TRUE
`;
if (cutoffResults.length != 1) {
return { deletedTxs: [] };
}
const cutoffBlockHeight = cutoffResults[0].block_height;
// Delete every mempool tx that came before that block.
const blockThreshold = parseInt(
process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? '256'
);
// TODO: Use DELETE instead of UPDATE once we implement a non-archival API replay mode.
const deletedTxResults = await sql<{ tx_id: string }[]>`
UPDATE mempool_txs
SET pruned = TRUE, status = ${DbTxStatus.DroppedApiGarbageCollect}
WHERE pruned = FALSE AND receipt_block_height < ${cutoffBlockHeight}
RETURNING tx_id
WITH pruned AS (
UPDATE mempool_txs
SET pruned = TRUE, status = ${DbTxStatus.DroppedApiGarbageCollect}
WHERE pruned = FALSE
AND receipt_block_height <= (SELECT block_height - ${blockThreshold} FROM chain_tip)
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
const deletedTxs = deletedTxResults.map(r => r.tx_id);
return { deletedTxs: deletedTxs };
return { deletedTxs: deletedTxResults.map(r => r.tx_id) };
}
async markEntitiesCanonical(
@@ -2747,28 +2769,6 @@ export class PgWriteStore extends PgStore {
return updatedEntities;
}
/**
* Refreshes a Postgres materialized view.
* @param viewName - Materialized view name
* @param sql - Pg scoped client. Will use the default client if none specified
* @param skipDuringEventReplay - If we should skip refreshing during event replay
*/
async refreshMaterializedView(viewName: string, sql?: PgSqlClient, skipDuringEventReplay = true) {
sql = sql ?? this.sql;
if ((this.isEventReplay && skipDuringEventReplay) || !this.isIbdBlockHeightReached) {
return;
}
await sql`REFRESH MATERIALIZED VIEW ${isProdEnv ? sql`CONCURRENTLY` : sql``} ${sql(viewName)}`;
}
/**
* Called when a full event import is complete.
*/
async finishEventReplay() {
if (!this.isEventReplay) return;
await this.refreshMaterializedView('mempool_digest', this.sql, false);
}
/**
* batch operations (mainly for event-replay)
*/

View File

@@ -171,7 +171,6 @@ export async function importEventsFromTsv(
responses.push(response);
}
}
await db.finishEventReplay();
console.log(`Event import and playback successful.`);
await eventServer.closeAsync();
await db.close();

View File

@@ -264,10 +264,6 @@ export class ReplayController {
// Remainder events to be replayed with regular HTTP POSTs
await this.ingestRemainderEvents();
// Refreshing materialized views
logger.info({ component: 'event-replay' }, `Refreshing materialized views`);
await this.db.finishEventReplay();
// Close DB
logger.info({ component: 'event-replay' }, 'Closing DB connection');
await this.db.close();

View File

@@ -380,7 +380,7 @@ describe('cache-control tests', () => {
const request1 = await supertest(api.server).get('/extended/v1/tx/mempool');
expect(request1.status).toBe(200);
expect(request1.type).toBe('application/json');
expect(request1.headers['etag']).toEqual('"0"');
const etag0 = request1.headers['etag'];
// Add mempool txs.
const mempoolTx1 = testMempoolTx({ tx_id: '0x1101' });
@@ -393,6 +393,7 @@ describe('cache-control tests', () => {
expect(request2.type).toBe('application/json');
expect(request2.headers['etag']).toBeTruthy();
const etag1 = request2.headers['etag'];
expect(etag1).not.toEqual(etag0);
// Cache works with valid ETag.
const request3 = await supertest(api.server)
@@ -423,13 +424,12 @@ describe('cache-control tests', () => {
.build();
await db.update(block2);
// Cache is now a miss and ETag is zero because mempool is empty.
// Cache is now a miss.
const request5 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag2);
expect(request5.status).toBe(200);
expect(request5.type).toBe('application/json');
expect(request5.headers['etag']).toEqual('"0"');
const etag3 = request5.headers['etag'];
// Restore a tx back into the mempool by making its anchor block non-canonical.
@@ -456,7 +456,7 @@ describe('cache-control tests', () => {
.set('If-None-Match', etag3);
expect(request6.status).toBe(200);
expect(request6.type).toBe('application/json');
expect(request6.headers['etag']).toEqual(etag2);
expect(request6.headers['etag']).not.toEqual(etag3);
const etag4 = request6.headers['etag'];
// Garbage collect all txs.
@@ -470,25 +470,13 @@ describe('cache-control tests', () => {
.build();
await db.update(block4);
// ETag zero once again.
// ETag changes once again.
const request7 = await supertest(api.server)
.get('/extended/v1/tx/mempool')
.set('If-None-Match', etag4);
expect(request7.status).toBe(200);
expect(request7.type).toBe('application/json');
expect(request7.headers['etag']).toEqual('"0"');
// Simulate an incompatible pg version (without `bit_xor`).
await client.begin(async sql => {
await sql`DROP MATERIALIZED VIEW mempool_digest`;
await sql`CREATE MATERIALIZED VIEW mempool_digest AS (SELECT NULL AS digest)`;
});
// ETag is undefined as if mempool cache did not exist.
const request8 = await supertest(api.server).get('/extended/v1/tx/mempool');
expect(request8.status).toBe(200);
expect(request8.type).toBe('application/json');
expect(request8.headers['etag']).toBeUndefined();
expect(request7.headers['etag']).not.toEqual(etag4);
});
test('transaction cache control', async () => {

View File

@@ -1584,7 +1584,7 @@ describe('mempool tests', () => {
// directly inserting the mempool-tx and mined-tx, bypassing the normal update functions.
await db.updateBlock(db.sql, dbBlock1);
const chainTip = await db.getChainTip();
await db.insertDbMempoolTx(mempoolTx, chainTip, db.sql);
await db.insertDbMempoolTxs([mempoolTx], chainTip, db.sql);
await db.updateTx(db.sql, dbTx1);
// Verify tx shows up in mempool (non-pruned)
@@ -1592,6 +1592,8 @@ describe('mempool tests', () => {
`/extended/v1/address/${mempoolTx.sender_address}/mempool`
);
expect(mempoolResult1.body.results[0].tx_id).toBe(txId);
const mempoolCount1 = await supertest(api.server).get(`/extended/v1/tx/mempool`);
expect(mempoolCount1.body.total).toBe(1);
const mempoolResult2 = await supertest(api.server).get(
`/extended/v1/tx/mempool?sender_address=${senderAddress}`
);
@@ -1614,6 +1616,8 @@ describe('mempool tests', () => {
`/extended/v1/address/${mempoolTx.sender_address}/mempool`
);
expect(mempoolResult3.body.results).toHaveLength(0);
const mempoolCount2 = await supertest(api.server).get(`/extended/v1/tx/mempool`);
expect(mempoolCount2.body.total).toBe(0);
const mempoolResult4 = await supertest(api.server).get(
`/extended/v1/tx/mempool?sender_address=${senderAddress}`
);