From 2896e40632bb0b0306e874051115cb75ad894d11 Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Thu, 20 Oct 2022 13:51:41 -0400 Subject: [PATCH 1/4] fix: if the act of storing mempool nonces in a write-through manner fails for some reason, then cache them until the end of the iteration loop and store them then as a transaction --- src/core/mempool.rs | 159 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 138 insertions(+), 21 deletions(-) diff --git a/src/core/mempool.rs b/src/core/mempool.rs index b23b70a9c..ab5ccbf9f 100644 --- a/src/core/mempool.rs +++ b/src/core/mempool.rs @@ -797,7 +797,21 @@ impl NonceCache { } } - fn get(&mut self, address: &StacksAddress, clarity_tx: &mut C, mempool_db: &DBConn) -> u64 + /// Get a nonce from the cache. + /// First, the RAM cache will be checked for this address. + /// If absent, then the `nonces` table will be queried for this address. + /// If absent, then the MARF will be queried for this address. + /// + /// If not in RAM, the nonce will be opportunistically stored to the `nonces` table. If that + /// fails due to lock contention, then the method will return `true` for its second tuple argument. + /// + /// Returns (nonce, should-try-store-again?) + fn get( + &mut self, + address: &StacksAddress, + clarity_tx: &mut C, + mempool_db: &DBConn, + ) -> (u64, bool) where C: ClarityConnection, { @@ -806,7 +820,7 @@ impl NonceCache { // Check in-memory cache match self.cache.get(address) { - Some(nonce) => *nonce, + Some(nonce) => (*nonce, false), None => { // Check sqlite cache let opt_nonce = match db_get_nonce(mempool_db, address) { @@ -822,33 +836,42 @@ impl NonceCache { if self.cache.len() < self.max_cache_size { self.cache.insert(address.clone(), nonce); } - nonce + (nonce, false) } None => { let nonce = StacksChainState::get_nonce(clarity_tx, &address.clone().into()); - match db_set_nonce(mempool_db, address, nonce) { - Ok(_) => (), - Err(e) => warn!("error caching nonce to sqlite: {}", e), - } + let should_store_again = match db_set_nonce(mempool_db, address, nonce) { + Ok(_) => false, + Err(e) => { + warn!("error caching nonce to sqlite: {}", e); + true + } + }; if self.cache.len() < self.max_cache_size { self.cache.insert(address.clone(), nonce); } - nonce + (nonce, should_store_again) } } } } } - fn update(&mut self, address: StacksAddress, value: u64, mempool_db: &DBConn) { + /// Store the (address, nonce) pair to the `nonces` table. + /// If storage fails, return false. + /// Otherwise return true. + fn update(&mut self, address: StacksAddress, value: u64, mempool_db: &DBConn) -> bool { // Sqlite cache - match db_set_nonce(mempool_db, &address, value) { - Ok(_) => (), - Err(e) => warn!("error caching nonce to sqlite: {}", e), - } + let success = match db_set_nonce(mempool_db, &address, value) { + Ok(_) => true, + Err(e) => { + warn!("error caching nonce to sqlite: {}", e); + false + } + }; // In-memory cache match self.cache.get_mut(&address) { @@ -857,6 +880,8 @@ impl NonceCache { } None => (), } + + success } } @@ -876,6 +901,22 @@ fn db_get_nonce(conn: &DBConn, address: &StacksAddress) -> Result, d query_row(conn, sql, rusqlite::params![&addr_str]) } +#[cfg(test)] +pub fn db_get_all_nonces(conn: &DBConn) -> Result, db_error> { + let sql = "SELECT * FROM nonces"; + let mut stmt = conn.prepare(&sql).map_err(|e| db_error::SqliteError(e))?; + let mut iter = stmt + .query(NO_PARAMS) + .map_err(|e| db_error::SqliteError(e))?; + let mut ret = vec![]; + while let Ok(Some(row)) = iter.next() { + let addr = StacksAddress::from_column(row, "address")?; + let nonce = u64::from_column(row, "nonce")?; + ret.push((addr, nonce)); + } + Ok(ret) +} + /// Cache potential candidate transactions for subsequent iterations. /// While walking the mempool, transactions that have nonces that are too high /// to process yet (but could be processed in the future) are added to `next`. @@ -1260,6 +1301,25 @@ impl MemPoolDB { Ok(updated) } + /// Helper method to record nonces to a retry-buffer. + /// This is needed for when we try to write-through a new (address, nonce) pair to the on-disk + /// `nonces` cache, but the write fails due to lock contention from another thread. The + /// retry-buffer will be used to later store this data in a single transaction. + fn save_nonce_for_retry( + retry_store: &mut HashMap, + max_size: u64, + addr: StacksAddress, + new_nonce: u64, + ) { + if (retry_store.len() as u64) < max_size { + if let Some(nonce) = retry_store.get_mut(&addr) { + *nonce = cmp::max(new_nonce, *nonce); + } else { + retry_store.insert(addr, new_nonce); + } + } + } + /// Iterate over candidates in the mempool /// `todo` will be called once for each transaction that is a valid /// candidate for inclusion in the next block, meaning its origin and @@ -1312,16 +1372,20 @@ impl MemPoolDB { let mut candidate_cache = CandidateCache::new(settings.candidate_retry_cache_size); let mut nonce_cache = NonceCache::new(settings.nonce_cache_size); + // set of (address, nonce) to store after the inner loop completes. This will be done in a + // single transaction. This cannot grow to more than `settings.nonce_cache_size` entries. + let mut retry_store = HashMap::new(); + let sql = " SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate FROM mempool WHERE fee_rate IS NULL "; - let mut query_stmt = self + let mut query_stmt_null = self .db .prepare(&sql) .map_err(|err| Error::SqliteError(err))?; - let mut null_iterator = query_stmt + let mut null_iterator = query_stmt_null .query(NO_PARAMS) .map_err(|err| Error::SqliteError(err))?; @@ -1331,11 +1395,11 @@ impl MemPoolDB { WHERE fee_rate IS NOT NULL ORDER BY fee_rate DESC "; - let mut query_stmt = self + let mut query_stmt_fee = self .db .prepare(&sql) .map_err(|err| Error::SqliteError(err))?; - let mut fee_iterator = query_stmt + let mut fee_iterator = query_stmt_fee .query(NO_PARAMS) .map_err(|err| Error::SqliteError(err))?; @@ -1392,10 +1456,30 @@ impl MemPoolDB { }; // Check the nonces. - let expected_origin_nonce = + let (expected_origin_nonce, retry_store_origin_nonce) = nonce_cache.get(&candidate.origin_address, clarity_tx, self.conn()); - let expected_sponsor_nonce = + let (expected_sponsor_nonce, retry_store_sponsor_nonce) = nonce_cache.get(&candidate.sponsor_address, clarity_tx, self.conn()); + + // Try storing these nonces later if we failed to do so here, e.g. due to some other + // thread holding the write-lock on the mempool DB. + if retry_store_origin_nonce { + Self::save_nonce_for_retry( + &mut retry_store, + settings.nonce_cache_size, + candidate.origin_address.clone(), + expected_origin_nonce, + ); + } + if retry_store_sponsor_nonce { + Self::save_nonce_for_retry( + &mut retry_store, + settings.nonce_cache_size, + candidate.sponsor_address.clone(), + expected_sponsor_nonce, + ); + } + match order_nonces( candidate.origin_nonce, expected_origin_nonce, @@ -1461,17 +1545,34 @@ impl MemPoolDB { match tx_event { TransactionEvent::Success(_) => { // Bump nonces in the cache for the executed transaction - nonce_cache.update( + let stored = nonce_cache.update( consider.tx.metadata.origin_address, expected_origin_nonce + 1, self.conn(), ); + if !stored { + Self::save_nonce_for_retry( + &mut retry_store, + settings.nonce_cache_size, + consider.tx.metadata.origin_address, + expected_origin_nonce + 1, + ); + } + if consider.tx.tx.auth.is_sponsored() { - nonce_cache.update( + let stored = nonce_cache.update( consider.tx.metadata.sponsor_address, expected_sponsor_nonce + 1, self.conn(), ); + if !stored { + Self::save_nonce_for_retry( + &mut retry_store, + settings.nonce_cache_size, + consider.tx.metadata.sponsor_address, + expected_sponsor_nonce + 1, + ); + } } output_events.push(tx_event); } @@ -1497,6 +1598,22 @@ impl MemPoolDB { candidate_cache.reset(); } + // drop these rusqlite statements and queries, since their existence as immutable borrows on the + // connection prevents us from beginning a transaction below (which requires a mutable + // borrow). + drop(null_iterator); + drop(fee_iterator); + drop(query_stmt_null); + drop(query_stmt_fee); + + if retry_store.len() > 0 { + let tx = self.tx_begin()?; + for (address, nonce) in retry_store.into_iter() { + nonce_cache.update(address, nonce, &tx); + } + tx.commit()?; + } + debug!( "Mempool iteration finished"; "considered_txs" => total_considered, From 83f54f2eb57a056938c438a868862e8d5881efd4 Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Thu, 20 Oct 2022 13:52:31 -0400 Subject: [PATCH 2/4] fix: add a test to verify that a concurrent writer thread may delay the mempool walk from caching updated nonces, but it cannot prevent it --- src/core/tests/mod.rs | 165 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/src/core/tests/mod.rs b/src/core/tests/mod.rs index acd1d1d60..7a11bf483 100644 --- a/src/core/tests/mod.rs +++ b/src/core/tests/mod.rs @@ -15,6 +15,7 @@ // along with this program. If not, see . use std::cmp; +use std::collections::HashMap; use std::collections::HashSet; use std::io; @@ -39,6 +40,7 @@ use crate::chainstate::stacks::{ use crate::chainstate::stacks::{ C32_ADDRESS_VERSION_MAINNET_SINGLESIG, C32_ADDRESS_VERSION_TESTNET_SINGLESIG, }; +use crate::core::mempool::db_get_all_nonces; use crate::core::mempool::MemPoolWalkSettings; use crate::core::mempool::TxTag; use crate::core::mempool::{BLOOM_COUNTER_DEPTH, BLOOM_COUNTER_ERROR_RATE, MAX_BLOOM_COUNTER_TXS}; @@ -63,6 +65,7 @@ use stacks_common::address::AddressHashMode; use stacks_common::types::chainstate::TrieHash; use stacks_common::util::hash::Hash160; use stacks_common::util::secp256k1::MessageSignature; +use stacks_common::util::sleep_ms; use stacks_common::util::{get_epoch_time_ms, get_epoch_time_secs}; use stacks_common::util::{hash::hex_bytes, hash::to_hex, hash::*, log, secp256k1::*}; @@ -1144,6 +1147,168 @@ fn test_iterate_candidates_problematic_transaction() { ); } +#[test] +/// This test verifies that all transactions are visited, and nonce cache on disk updated, even if +/// there's a concurrent write-lock on the mempool DB. +fn test_iterate_candidates_concurrent_write_lock() { + let mut chainstate = instantiate_chainstate_with_balances( + false, + 0x80000000, + "test_iterate_candidates_concurrent_write_lock", + vec![], + ); + let chainstate_path = chainstate_path("test_iterate_candidates_concurrent_write_lock"); + let mut mempool = MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap(); + let b_1 = make_block( + &mut chainstate, + ConsensusHash([0x1; 20]), + &( + FIRST_BURNCHAIN_CONSENSUS_HASH.clone(), + FIRST_STACKS_BLOCK_HASH.clone(), + ), + 1, + 1, + ); + let b_2 = make_block(&mut chainstate, ConsensusHash([0x2; 20]), &b_1, 2, 2); + + let mut mempool_settings = MemPoolWalkSettings::default(); + mempool_settings.min_tx_fee = 10; + let mut tx_events = Vec::new(); + + let mut txs = codec_all_transactions( + &TransactionVersion::Testnet, + 0x80000000, + &TransactionAnchorMode::Any, + &TransactionPostConditionMode::Allow, + ); + + let mut expected_addr_nonces = HashMap::new(); + + // Load 24 transactions into the mempool, alternating whether or not they have a fee-rate. + for nonce in 0..24 { + let mut tx = txs.pop().unwrap(); + let mut mempool_tx = mempool.tx_begin().unwrap(); + + let origin_address = tx.origin_address(); + let origin_nonce = tx.get_origin_nonce(); + let sponsor_address = tx.sponsor_address().unwrap_or(origin_address); + let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); + + if let Some(nonce) = expected_addr_nonces.get_mut(&origin_address) { + *nonce = cmp::max(*nonce, origin_nonce); + } else { + expected_addr_nonces.insert(origin_address.clone(), origin_nonce); + } + + if let Some(nonce) = expected_addr_nonces.get_mut(&sponsor_address) { + *nonce = cmp::max(*nonce, sponsor_nonce); + } else { + expected_addr_nonces.insert(sponsor_address.clone(), sponsor_nonce); + } + + tx.set_tx_fee(100); + let txid = tx.txid(); + let tx_bytes = tx.serialize_to_vec(); + let tx_fee = tx.get_tx_fee(); + let height = 100; + + MemPoolDB::try_add_tx( + &mut mempool_tx, + &mut chainstate, + &b_1.0, + &b_1.1, + txid, + tx_bytes, + tx_fee, + height, + &origin_address, + nonce, + &sponsor_address, + nonce, + None, + ) + .unwrap(); + + if nonce & 1 == 0 { + mempool_tx + .execute( + "UPDATE mempool SET fee_rate = ? WHERE txid = ?", + rusqlite::params![Some(123.0), &txid], + ) + .unwrap(); + } else { + let none: Option = None; + mempool_tx + .execute( + "UPDATE mempool SET fee_rate = ? WHERE txid = ?", + rusqlite::params![none, &txid], + ) + .unwrap(); + } + + mempool_tx.commit().unwrap(); + } + assert!(expected_addr_nonces.len() > 0); + + let all_addr_nonces = db_get_all_nonces(mempool.conn()).unwrap(); + assert_eq!(all_addr_nonces.len(), 0); + + // start a thread that holds a write-lock on the mempool + let write_thread = std::thread::spawn(move || { + let mut thread_mempool = MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap(); + let mempool_tx = thread_mempool.tx_begin().unwrap(); + sleep_ms(10_000); + }); + + sleep_ms(1_000); + + // 50% chance of considering a transaction with unknown fee estimate + mempool_settings.consider_no_estimate_tx_prob = 50; + chainstate.with_read_only_clarity_tx( + &TEST_BURN_STATE_DB, + &StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1), + |clarity_conn| { + let mut count_txs = 0; + mempool + .iterate_candidates::<_, ChainstateError, _>( + clarity_conn, + &mut tx_events, + 2, + mempool_settings.clone(), + |_, available_tx, _| { + count_txs += 1; + Ok(Some( + // Generate any success result + TransactionResult::success( + &available_tx.tx.tx, + available_tx.tx.metadata.tx_fee, + StacksTransactionReceipt::from_stx_transfer( + available_tx.tx.tx.clone(), + vec![], + Value::okay(Value::Bool(true)).unwrap(), + ExecutionCost::zero(), + ), + ) + .convert_to_event(), + )) + }, + ) + .unwrap(); + assert_eq!(count_txs, 24, "Mempool should find all 24 transactions"); + }, + ); + + write_thread.join().unwrap(); + + let all_addr_nonces = db_get_all_nonces(mempool.conn()).unwrap(); + assert_eq!(all_addr_nonces.len(), expected_addr_nonces.len()); + + for (addr, nonce) in all_addr_nonces { + assert!(expected_addr_nonces.get(&addr).is_some()); + assert_eq!(nonce, 24); + } +} + #[test] fn mempool_do_not_replace_tx() { let mut chainstate = instantiate_chainstate_with_balances( From a0fddb7aaf6c030cc2d89fdd66deac2f07d2b7f7 Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Thu, 20 Oct 2022 15:06:04 -0400 Subject: [PATCH 3/4] chore: add changelog --- CHANGELOG.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 008320906..1222ff4f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,31 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to the versioning scheme outlined in the [README.md](README.md). +## [2.05.0.5.0] + +### Changed + +- The act of walking the mempool will now cache address nonces in RAM and to a + temporary mempool table used for the purpose, instead of unconditionally +querying them from the chainstate MARF. This builds upon improvements to mempool +goodput over 2.05.0.4.0 (#3337). +- The node and miner implementation has been refactored to remove write-lock + contention that can arise when the node's chains-coordinator thread attempts to store and +process newly-discovered (or newly-mined) blocks, and when the node's relayer +thread attempts to mine a new block. In addition, the miner logic has been +moved to a separate thread in order to avoid starving the relayer thread (which +must handle block and transaction propagation, as well as block-processing). +The refactored miner thread will be preemptively terminated and restarted +by the arrival of new Stacks blocks or burnchain blocks, which further +prevents the miner from holding open write-locks in the underlying +chainstate databases when there is new chain data to discover (which would +invalidate the miner's work anyway). (#3335). + +### Fixed + +- Fixed `pow` documentation in Clarity (#3338). +- Backported unit tests that were omitted in the 2.05.0.3.0 release (#3348). + ## [2.05.0.4.0] ### Fixed From 33458fb8784112840d57c8dd5b335cc36ac5078f Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Thu, 20 Oct 2022 15:06:53 -0400 Subject: [PATCH 4/4] fix: when mock-mining, always re-try minig an anchored block even if we haven't mined a microblock yet. Also, remove gratuitous debug messages. --- testnet/stacks-node/src/neon_node.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index 332c0faf2..69bd7c1f5 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -1301,13 +1301,6 @@ impl BlockMinerThread { u16::MAX, ) { - debug!( - "Microblocks descended from {}/{} ({}): {:?}", - &prev_block.parent_consensus_hash, - &stacks_parent_header.anchored_header.block_hash(), - stream.len(), - &stream - ); if (prev_block.anchored_block.header.parent_microblock == BlockHeaderHash([0u8; 32]) && stream.len() == 0) @@ -2748,9 +2741,13 @@ impl RelayerThread { return false; } - if self.mined_stacks_block && self.config.node.mine_microblocks { - debug!("Relayer: mined a Stacks block already; waiting for microblock miner"); - return false; + if !self.config.node.mock_mining { + // mock miner can't mine microblocks yet, so don't stop it from trying multiple + // anchored blocks + if self.mined_stacks_block && self.config.node.mine_microblocks { + debug!("Relayer: mined a Stacks block already; waiting for microblock miner"); + return false; + } } let mut miner_thread_state =