feat: make the miner loop interruptable by checking the status of a shared mutex. Mining is blocked as long as at least one thread wants it blocked, and is unblocked if no threads want it blocked. Blocking/unblocking is idempotent when done from the same thread.

This commit is contained in:
Jude Nelson
2022-10-05 23:13:33 -04:00
parent d47df1fb9f
commit b1ceaba5e0

View File

@@ -19,6 +19,11 @@ use std::collections::HashSet;
use std::convert::From;
use std::fs;
use std::mem;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread::ThreadId;
use crate::burnchains::PrivateKey;
use crate::burnchains::PublicKey;
@@ -68,10 +73,72 @@ use clarity::vm::clarity::TransactionConnection;
use clarity::vm::errors::Error as InterpreterError;
use clarity::vm::types::TypeSignature;
/// System status for mining.
/// The miner can be Ready, in which case a miner is allowed to run
/// The miner can be Blocked, in which case the miner *should not start* and/or *should terminate*
/// if running.
/// The inner u64 is a per-thread ID that lets threads querying the miner status identify whether
/// or not they or another thread were the last to modify the state.
#[derive(Debug, Clone, PartialEq)]
pub struct MinerStatus {
blockers: HashSet<ThreadId>,
}
impl MinerStatus {
pub fn make_ready() -> MinerStatus {
MinerStatus {
blockers: HashSet::new(),
}
}
pub fn add_blocked(&mut self) {
self.blockers.insert(std::thread::current().id());
}
pub fn remove_blocked(&mut self) {
self.blockers.remove(&std::thread::current().id());
}
pub fn is_blocked(&self) -> bool {
self.blockers.len() > 0
}
}
impl std::fmt::Display for MinerStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", &self)
}
}
/// halt mining
pub fn signal_mining_blocked(miner_status: Arc<Mutex<MinerStatus>>) {
match miner_status.lock() {
Ok(mut status) => {
status.add_blocked();
}
Err(_e) => {
panic!("FATAL: mutex poisoned");
}
}
}
/// resume mining if we blocked it earlier
pub fn signal_mining_ready(miner_status: Arc<Mutex<MinerStatus>>) {
match miner_status.lock() {
Ok(mut status) => {
status.remove_blocked();
}
Err(_e) => {
panic!("FATAL: mutex poisoned");
}
}
}
#[derive(Debug, Clone)]
pub struct BlockBuilderSettings {
pub max_miner_time_ms: u64,
pub mempool_settings: MemPoolWalkSettings,
pub miner_status: Arc<Mutex<MinerStatus>>,
}
impl BlockBuilderSettings {
@@ -79,6 +146,7 @@ impl BlockBuilderSettings {
BlockBuilderSettings {
max_miner_time_ms: u64::max_value(),
mempool_settings: MemPoolWalkSettings::default(),
miner_status: Arc::new(Mutex::new(MinerStatus::make_ready())),
}
}
@@ -86,6 +154,7 @@ impl BlockBuilderSettings {
BlockBuilderSettings {
max_miner_time_ms: u64::max_value(),
mempool_settings: MemPoolWalkSettings::zero(),
miner_status: Arc::new(Mutex::new(MinerStatus::make_ready())),
}
}
}
@@ -1021,6 +1090,8 @@ impl<'a> StacksMicroblockBuilder<'a> {
"Microblock transaction selection begins (child of {}), bytes so far: {}",
&self.anchor_block, bytes_so_far
);
let mut blocked = false;
let result = {
let mut intermediate_result;
loop {
@@ -1042,6 +1113,12 @@ impl<'a> StacksMicroblockBuilder<'a> {
return Ok(None);
}
blocked = (*self.settings.miner_status.lock().expect("FATAL: mutex poisoned")).is_blocked();
if blocked {
debug!("Microblock miner stopping due to preemption");
return Ok(None);
}
if considered.contains(&mempool_tx.tx.txid()) {
return Ok(Some(TransactionResult::skipped(
&mempool_tx.tx, "Transaction already considered.".to_string()).convert_to_event()));
@@ -1154,8 +1231,9 @@ impl<'a> StacksMicroblockBuilder<'a> {
}
intermediate_result
};
debug!(
"Microblock transaction selection finished (child of {}); {} transactions selected",
"Miner: Microblock transaction selection finished (child of {}); {} transactions selected",
&self.anchor_block, num_selected
);
@@ -1178,6 +1256,14 @@ impl<'a> StacksMicroblockBuilder<'a> {
event_dispatcher.mempool_txs_dropped(invalidated_txs, MemPoolDropReason::TOO_EXPENSIVE);
event_dispatcher.mempool_txs_dropped(to_drop_and_blacklist, MemPoolDropReason::PROBLEMATIC);
if blocked {
debug!(
"Miner: Microblock transaction selection aborted (child of {}); {} transactions selected",
&self.anchor_block, num_selected
);
return Err(Error::MinerAborted);
}
match result {
Ok(_) => {}
Err(e) => {
@@ -1794,7 +1880,6 @@ impl StacksBlockBuilder {
chainstate: &mut StacksChainState,
parent_consensus_hash: &ConsensusHash,
parent_header_hash: &BlockHeaderHash,
parent_index_hash: &StacksBlockId,
) -> Result<Vec<StacksMicroblock>, Error> {
if let Some(microblock_parent_hash) = self.parent_microblock_hash.as_ref() {
// load up a microblock fork
@@ -1806,9 +1891,20 @@ impl StacksBlockBuilder {
)?
.ok_or(Error::NoSuchBlockError)?;
debug!(
"Loaded {} microblocks made by {}/{} tipped at {}",
microblocks.len(),
&parent_consensus_hash,
&parent_header_hash,
&microblock_parent_hash
);
Ok(microblocks)
} else {
// apply all known parent microblocks before beginning our tenure
let parent_index_hash = StacksBlockHeader::make_index_block_hash(
&self.parent_consensus_hash,
&self.parent_header_hash,
);
let (parent_microblocks, _) =
match StacksChainState::load_descendant_staging_microblock_stream_with_poison(
&chainstate.db(),
@@ -1819,6 +1915,13 @@ impl StacksBlockBuilder {
Some(x) => x,
None => (vec![], None),
};
debug!(
"Loaded {} microblocks made by {}/{}",
parent_microblocks.len(),
&parent_consensus_hash,
&parent_header_hash
);
Ok(parent_microblocks)
}
}
@@ -1873,7 +1976,6 @@ impl StacksBlockBuilder {
chainstate,
&self.parent_consensus_hash.clone(),
&self.parent_header_hash.clone(),
&parent_index_hash,
) {
Ok(x) => x,
Err(e) => {
@@ -1908,6 +2010,7 @@ impl StacksBlockBuilder {
let mainnet = chainstate.config().mainnet;
// data won't be committed, so do a concurrent transaction
let (chainstate_tx, clarity_instance) = chainstate.chainstate_tx_begin()?;
let ast_rules =
@@ -2215,6 +2318,7 @@ impl StacksBlockBuilder {
let mut block_limit_hit = BlockLimitFunction::NO_LIMIT_HIT;
let deadline = ts_start + (max_miner_time_ms as u128);
let mut num_txs = 0;
let mut blocked = false;
debug!(
"Anchored block transaction selection begins (child of {})",
@@ -2230,6 +2334,14 @@ impl StacksBlockBuilder {
tip_height,
mempool_settings.clone(),
|epoch_tx, to_consider, estimator| {
// first, have we been preempted?
blocked = (*settings.miner_status.lock().expect("FATAL: mutex poisoned"))
.is_blocked();
if blocked {
debug!("Miner stopping due to preemption");
return Ok(None);
}
let txinfo = &to_consider.tx;
let update_estimator = to_consider.update_estimate;
@@ -2404,6 +2516,14 @@ impl StacksBlockBuilder {
}
}
if blocked {
debug!(
"Miner: Anchored block transaction selection aborted (child of {})",
&parent_stacks_header.anchored_header.block_hash()
);
return Err(Error::MinerAborted);
}
// the prior do_rebuild logic wasn't necessary
// a transaction that caused a budget exception is rolled back in process_transaction