Merge pull request #3352 from stacks-network/fix/retry-store-nonces

Fix/retry store nonces
This commit is contained in:
Jude Nelson
2022-10-20 19:05:13 +00:00
committed by GitHub
4 changed files with 335 additions and 31 deletions

View File

@@ -5,6 +5,31 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to the versioning scheme outlined in the [README.md](README.md).
## [2.05.0.5.0]
### Changed
- The act of walking the mempool will now cache address nonces in RAM and to a
temporary mempool table used for the purpose, instead of unconditionally
querying them from the chainstate MARF. This builds upon improvements to mempool
goodput over 2.05.0.4.0 (#3337).
- The node and miner implementation has been refactored to remove write-lock
contention that can arise when the node's chains-coordinator thread attempts to store and
process newly-discovered (or newly-mined) blocks, and when the node's relayer
thread attempts to mine a new block. In addition, the miner logic has been
moved to a separate thread in order to avoid starving the relayer thread (which
must handle block and transaction propagation, as well as block-processing).
The refactored miner thread will be preemptively terminated and restarted
by the arrival of new Stacks blocks or burnchain blocks, which further
prevents the miner from holding open write-locks in the underlying
chainstate databases when there is new chain data to discover (which would
invalidate the miner's work anyway). (#3335).
### Fixed
- Fixed `pow` documentation in Clarity (#3338).
- Backported unit tests that were omitted in the 2.05.0.3.0 release (#3348).
## [2.05.0.4.0]
### Fixed

View File

@@ -797,7 +797,21 @@ impl NonceCache {
}
}
fn get<C>(&mut self, address: &StacksAddress, clarity_tx: &mut C, mempool_db: &DBConn) -> u64
/// Get a nonce from the cache.
/// First, the RAM cache will be checked for this address.
/// If absent, then the `nonces` table will be queried for this address.
/// If absent, then the MARF will be queried for this address.
///
/// If not in RAM, the nonce will be opportunistically stored to the `nonces` table. If that
/// fails due to lock contention, then the method will return `true` for its second tuple argument.
///
/// Returns (nonce, should-try-store-again?)
fn get<C>(
&mut self,
address: &StacksAddress,
clarity_tx: &mut C,
mempool_db: &DBConn,
) -> (u64, bool)
where
C: ClarityConnection,
{
@@ -806,7 +820,7 @@ impl NonceCache {
// Check in-memory cache
match self.cache.get(address) {
Some(nonce) => *nonce,
Some(nonce) => (*nonce, false),
None => {
// Check sqlite cache
let opt_nonce = match db_get_nonce(mempool_db, address) {
@@ -822,33 +836,42 @@ impl NonceCache {
if self.cache.len() < self.max_cache_size {
self.cache.insert(address.clone(), nonce);
}
nonce
(nonce, false)
}
None => {
let nonce =
StacksChainState::get_nonce(clarity_tx, &address.clone().into());
match db_set_nonce(mempool_db, address, nonce) {
Ok(_) => (),
Err(e) => warn!("error caching nonce to sqlite: {}", e),
}
let should_store_again = match db_set_nonce(mempool_db, address, nonce) {
Ok(_) => false,
Err(e) => {
warn!("error caching nonce to sqlite: {}", e);
true
}
};
if self.cache.len() < self.max_cache_size {
self.cache.insert(address.clone(), nonce);
}
nonce
(nonce, should_store_again)
}
}
}
}
}
fn update(&mut self, address: StacksAddress, value: u64, mempool_db: &DBConn) {
/// Store the (address, nonce) pair to the `nonces` table.
/// If storage fails, return false.
/// Otherwise return true.
fn update(&mut self, address: StacksAddress, value: u64, mempool_db: &DBConn) -> bool {
// Sqlite cache
match db_set_nonce(mempool_db, &address, value) {
Ok(_) => (),
Err(e) => warn!("error caching nonce to sqlite: {}", e),
}
let success = match db_set_nonce(mempool_db, &address, value) {
Ok(_) => true,
Err(e) => {
warn!("error caching nonce to sqlite: {}", e);
false
}
};
// In-memory cache
match self.cache.get_mut(&address) {
@@ -857,6 +880,8 @@ impl NonceCache {
}
None => (),
}
success
}
}
@@ -876,6 +901,22 @@ fn db_get_nonce(conn: &DBConn, address: &StacksAddress) -> Result<Option<u64>, d
query_row(conn, sql, rusqlite::params![&addr_str])
}
#[cfg(test)]
pub fn db_get_all_nonces(conn: &DBConn) -> Result<Vec<(StacksAddress, u64)>, db_error> {
let sql = "SELECT * FROM nonces";
let mut stmt = conn.prepare(&sql).map_err(|e| db_error::SqliteError(e))?;
let mut iter = stmt
.query(NO_PARAMS)
.map_err(|e| db_error::SqliteError(e))?;
let mut ret = vec![];
while let Ok(Some(row)) = iter.next() {
let addr = StacksAddress::from_column(row, "address")?;
let nonce = u64::from_column(row, "nonce")?;
ret.push((addr, nonce));
}
Ok(ret)
}
/// Cache potential candidate transactions for subsequent iterations.
/// While walking the mempool, transactions that have nonces that are too high
/// to process yet (but could be processed in the future) are added to `next`.
@@ -1260,6 +1301,25 @@ impl MemPoolDB {
Ok(updated)
}
/// Helper method to record nonces to a retry-buffer.
/// This is needed for when we try to write-through a new (address, nonce) pair to the on-disk
/// `nonces` cache, but the write fails due to lock contention from another thread. The
/// retry-buffer will be used to later store this data in a single transaction.
fn save_nonce_for_retry(
retry_store: &mut HashMap<StacksAddress, u64>,
max_size: u64,
addr: StacksAddress,
new_nonce: u64,
) {
if (retry_store.len() as u64) < max_size {
if let Some(nonce) = retry_store.get_mut(&addr) {
*nonce = cmp::max(new_nonce, *nonce);
} else {
retry_store.insert(addr, new_nonce);
}
}
}
/// Iterate over candidates in the mempool
/// `todo` will be called once for each transaction that is a valid
/// candidate for inclusion in the next block, meaning its origin and
@@ -1312,16 +1372,20 @@ impl MemPoolDB {
let mut candidate_cache = CandidateCache::new(settings.candidate_retry_cache_size);
let mut nonce_cache = NonceCache::new(settings.nonce_cache_size);
// set of (address, nonce) to store after the inner loop completes. This will be done in a
// single transaction. This cannot grow to more than `settings.nonce_cache_size` entries.
let mut retry_store = HashMap::new();
let sql = "
SELECT txid, origin_nonce, origin_address, sponsor_nonce, sponsor_address, fee_rate
FROM mempool
WHERE fee_rate IS NULL
";
let mut query_stmt = self
let mut query_stmt_null = self
.db
.prepare(&sql)
.map_err(|err| Error::SqliteError(err))?;
let mut null_iterator = query_stmt
let mut null_iterator = query_stmt_null
.query(NO_PARAMS)
.map_err(|err| Error::SqliteError(err))?;
@@ -1331,11 +1395,11 @@ impl MemPoolDB {
WHERE fee_rate IS NOT NULL
ORDER BY fee_rate DESC
";
let mut query_stmt = self
let mut query_stmt_fee = self
.db
.prepare(&sql)
.map_err(|err| Error::SqliteError(err))?;
let mut fee_iterator = query_stmt
let mut fee_iterator = query_stmt_fee
.query(NO_PARAMS)
.map_err(|err| Error::SqliteError(err))?;
@@ -1392,10 +1456,30 @@ impl MemPoolDB {
};
// Check the nonces.
let expected_origin_nonce =
let (expected_origin_nonce, retry_store_origin_nonce) =
nonce_cache.get(&candidate.origin_address, clarity_tx, self.conn());
let expected_sponsor_nonce =
let (expected_sponsor_nonce, retry_store_sponsor_nonce) =
nonce_cache.get(&candidate.sponsor_address, clarity_tx, self.conn());
// Try storing these nonces later if we failed to do so here, e.g. due to some other
// thread holding the write-lock on the mempool DB.
if retry_store_origin_nonce {
Self::save_nonce_for_retry(
&mut retry_store,
settings.nonce_cache_size,
candidate.origin_address.clone(),
expected_origin_nonce,
);
}
if retry_store_sponsor_nonce {
Self::save_nonce_for_retry(
&mut retry_store,
settings.nonce_cache_size,
candidate.sponsor_address.clone(),
expected_sponsor_nonce,
);
}
match order_nonces(
candidate.origin_nonce,
expected_origin_nonce,
@@ -1461,17 +1545,34 @@ impl MemPoolDB {
match tx_event {
TransactionEvent::Success(_) => {
// Bump nonces in the cache for the executed transaction
nonce_cache.update(
let stored = nonce_cache.update(
consider.tx.metadata.origin_address,
expected_origin_nonce + 1,
self.conn(),
);
if !stored {
Self::save_nonce_for_retry(
&mut retry_store,
settings.nonce_cache_size,
consider.tx.metadata.origin_address,
expected_origin_nonce + 1,
);
}
if consider.tx.tx.auth.is_sponsored() {
nonce_cache.update(
let stored = nonce_cache.update(
consider.tx.metadata.sponsor_address,
expected_sponsor_nonce + 1,
self.conn(),
);
if !stored {
Self::save_nonce_for_retry(
&mut retry_store,
settings.nonce_cache_size,
consider.tx.metadata.sponsor_address,
expected_sponsor_nonce + 1,
);
}
}
output_events.push(tx_event);
}
@@ -1497,6 +1598,22 @@ impl MemPoolDB {
candidate_cache.reset();
}
// drop these rusqlite statements and queries, since their existence as immutable borrows on the
// connection prevents us from beginning a transaction below (which requires a mutable
// borrow).
drop(null_iterator);
drop(fee_iterator);
drop(query_stmt_null);
drop(query_stmt_fee);
if retry_store.len() > 0 {
let tx = self.tx_begin()?;
for (address, nonce) in retry_store.into_iter() {
nonce_cache.update(address, nonce, &tx);
}
tx.commit()?;
}
debug!(
"Mempool iteration finished";
"considered_txs" => total_considered,

View File

@@ -15,6 +15,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::cmp;
use std::collections::HashMap;
use std::collections::HashSet;
use std::io;
@@ -39,6 +40,7 @@ use crate::chainstate::stacks::{
use crate::chainstate::stacks::{
C32_ADDRESS_VERSION_MAINNET_SINGLESIG, C32_ADDRESS_VERSION_TESTNET_SINGLESIG,
};
use crate::core::mempool::db_get_all_nonces;
use crate::core::mempool::MemPoolWalkSettings;
use crate::core::mempool::TxTag;
use crate::core::mempool::{BLOOM_COUNTER_DEPTH, BLOOM_COUNTER_ERROR_RATE, MAX_BLOOM_COUNTER_TXS};
@@ -63,6 +65,7 @@ use stacks_common::address::AddressHashMode;
use stacks_common::types::chainstate::TrieHash;
use stacks_common::util::hash::Hash160;
use stacks_common::util::secp256k1::MessageSignature;
use stacks_common::util::sleep_ms;
use stacks_common::util::{get_epoch_time_ms, get_epoch_time_secs};
use stacks_common::util::{hash::hex_bytes, hash::to_hex, hash::*, log, secp256k1::*};
@@ -1144,6 +1147,168 @@ fn test_iterate_candidates_problematic_transaction() {
);
}
#[test]
/// This test verifies that all transactions are visited, and nonce cache on disk updated, even if
/// there's a concurrent write-lock on the mempool DB.
fn test_iterate_candidates_concurrent_write_lock() {
let mut chainstate = instantiate_chainstate_with_balances(
false,
0x80000000,
"test_iterate_candidates_concurrent_write_lock",
vec![],
);
let chainstate_path = chainstate_path("test_iterate_candidates_concurrent_write_lock");
let mut mempool = MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap();
let b_1 = make_block(
&mut chainstate,
ConsensusHash([0x1; 20]),
&(
FIRST_BURNCHAIN_CONSENSUS_HASH.clone(),
FIRST_STACKS_BLOCK_HASH.clone(),
),
1,
1,
);
let b_2 = make_block(&mut chainstate, ConsensusHash([0x2; 20]), &b_1, 2, 2);
let mut mempool_settings = MemPoolWalkSettings::default();
mempool_settings.min_tx_fee = 10;
let mut tx_events = Vec::new();
let mut txs = codec_all_transactions(
&TransactionVersion::Testnet,
0x80000000,
&TransactionAnchorMode::Any,
&TransactionPostConditionMode::Allow,
);
let mut expected_addr_nonces = HashMap::new();
// Load 24 transactions into the mempool, alternating whether or not they have a fee-rate.
for nonce in 0..24 {
let mut tx = txs.pop().unwrap();
let mut mempool_tx = mempool.tx_begin().unwrap();
let origin_address = tx.origin_address();
let origin_nonce = tx.get_origin_nonce();
let sponsor_address = tx.sponsor_address().unwrap_or(origin_address);
let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce);
if let Some(nonce) = expected_addr_nonces.get_mut(&origin_address) {
*nonce = cmp::max(*nonce, origin_nonce);
} else {
expected_addr_nonces.insert(origin_address.clone(), origin_nonce);
}
if let Some(nonce) = expected_addr_nonces.get_mut(&sponsor_address) {
*nonce = cmp::max(*nonce, sponsor_nonce);
} else {
expected_addr_nonces.insert(sponsor_address.clone(), sponsor_nonce);
}
tx.set_tx_fee(100);
let txid = tx.txid();
let tx_bytes = tx.serialize_to_vec();
let tx_fee = tx.get_tx_fee();
let height = 100;
MemPoolDB::try_add_tx(
&mut mempool_tx,
&mut chainstate,
&b_1.0,
&b_1.1,
txid,
tx_bytes,
tx_fee,
height,
&origin_address,
nonce,
&sponsor_address,
nonce,
None,
)
.unwrap();
if nonce & 1 == 0 {
mempool_tx
.execute(
"UPDATE mempool SET fee_rate = ? WHERE txid = ?",
rusqlite::params![Some(123.0), &txid],
)
.unwrap();
} else {
let none: Option<f64> = None;
mempool_tx
.execute(
"UPDATE mempool SET fee_rate = ? WHERE txid = ?",
rusqlite::params![none, &txid],
)
.unwrap();
}
mempool_tx.commit().unwrap();
}
assert!(expected_addr_nonces.len() > 0);
let all_addr_nonces = db_get_all_nonces(mempool.conn()).unwrap();
assert_eq!(all_addr_nonces.len(), 0);
// start a thread that holds a write-lock on the mempool
let write_thread = std::thread::spawn(move || {
let mut thread_mempool = MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap();
let mempool_tx = thread_mempool.tx_begin().unwrap();
sleep_ms(10_000);
});
sleep_ms(1_000);
// 50% chance of considering a transaction with unknown fee estimate
mempool_settings.consider_no_estimate_tx_prob = 50;
chainstate.with_read_only_clarity_tx(
&TEST_BURN_STATE_DB,
&StacksBlockHeader::make_index_block_hash(&b_2.0, &b_2.1),
|clarity_conn| {
let mut count_txs = 0;
mempool
.iterate_candidates::<_, ChainstateError, _>(
clarity_conn,
&mut tx_events,
2,
mempool_settings.clone(),
|_, available_tx, _| {
count_txs += 1;
Ok(Some(
// Generate any success result
TransactionResult::success(
&available_tx.tx.tx,
available_tx.tx.metadata.tx_fee,
StacksTransactionReceipt::from_stx_transfer(
available_tx.tx.tx.clone(),
vec![],
Value::okay(Value::Bool(true)).unwrap(),
ExecutionCost::zero(),
),
)
.convert_to_event(),
))
},
)
.unwrap();
assert_eq!(count_txs, 24, "Mempool should find all 24 transactions");
},
);
write_thread.join().unwrap();
let all_addr_nonces = db_get_all_nonces(mempool.conn()).unwrap();
assert_eq!(all_addr_nonces.len(), expected_addr_nonces.len());
for (addr, nonce) in all_addr_nonces {
assert!(expected_addr_nonces.get(&addr).is_some());
assert_eq!(nonce, 24);
}
}
#[test]
fn mempool_do_not_replace_tx() {
let mut chainstate = instantiate_chainstate_with_balances(

View File

@@ -1301,13 +1301,6 @@ impl BlockMinerThread {
u16::MAX,
)
{
debug!(
"Microblocks descended from {}/{} ({}): {:?}",
&prev_block.parent_consensus_hash,
&stacks_parent_header.anchored_header.block_hash(),
stream.len(),
&stream
);
if (prev_block.anchored_block.header.parent_microblock
== BlockHeaderHash([0u8; 32])
&& stream.len() == 0)
@@ -2748,9 +2741,13 @@ impl RelayerThread {
return false;
}
if self.mined_stacks_block && self.config.node.mine_microblocks {
debug!("Relayer: mined a Stacks block already; waiting for microblock miner");
return false;
if !self.config.node.mock_mining {
// mock miner can't mine microblocks yet, so don't stop it from trying multiple
// anchored blocks
if self.mined_stacks_block && self.config.node.mine_microblocks {
debug!("Relayer: mined a Stacks block already; waiting for microblock miner");
return false;
}
}
let mut miner_thread_state =