feat: make it so the neon runloop sets up structured global thread state, and blocks/unblocks the miner in response to new burnchain data arriving. This prevents the miner from stalling the node in the event of a slow mempool walk.

This commit is contained in:
Jude Nelson
2022-10-05 23:20:45 -04:00
parent c1d2132281
commit 69ce0ff43d

View File

@@ -7,6 +7,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;
@@ -31,13 +32,16 @@ use stacks::util_lib::db::Error as db_error;
use stx_genesis::GenesisData;
use crate::monitoring::start_serving_monitoring_metrics;
use crate::neon_node::Globals;
use crate::neon_node::StacksNode;
use crate::neon_node::RELAYER_MAX_BUFFER;
use crate::node::use_test_genesis_chainstate;
use crate::syncctl::{PoxSyncWatchdog, PoxSyncWatchdogComms};
use crate::{
node::{get_account_balances, get_account_lockups, get_names, get_namespaces},
BitcoinRegtestController, BurnchainController, Config, EventDispatcher, Keychain,
};
use stacks::chainstate::stacks::miner::{signal_mining_blocked, signal_mining_ready, MinerStatus};
use super::RunLoopCallbacks;
use libc;
@@ -126,6 +130,7 @@ impl Counters {
pub struct RunLoop {
config: Config,
pub callbacks: RunLoopCallbacks,
globals: Option<Globals>,
counters: Counters,
coordinator_channels: Option<(CoordinatorReceivers, CoordinatorChannels)>,
should_keep_running: Arc<AtomicBool>,
@@ -134,6 +139,9 @@ pub struct RunLoop {
is_miner: Option<bool>, // not known until .start() is called
burnchain: Option<Burnchain>, // not known until .start() is called
pox_watchdog_comms: PoxSyncWatchdogComms,
/// NOTE: this is duplicated in self.globals, but it needs to be accessible before globals is
/// instantiated (namely, so the test framework can access it).
miner_status: Arc<Mutex<MinerStatus>>,
}
/// Write to stderr in an async-safe manner.
@@ -160,6 +168,7 @@ impl RunLoop {
let channels = CoordinatorCommunication::instantiate();
let should_keep_running = Arc::new(AtomicBool::new(true));
let pox_watchdog_comms = PoxSyncWatchdogComms::new(should_keep_running.clone());
let miner_status = Arc::new(Mutex::new(MinerStatus::make_ready()));
let mut event_dispatcher = EventDispatcher::new();
for observer in config.events_observers.iter() {
@@ -168,6 +177,7 @@ impl RunLoop {
Self {
config,
globals: None,
coordinator_channels: Some(channels),
callbacks: RunLoopCallbacks::new(),
counters: Counters::new(),
@@ -177,9 +187,20 @@ impl RunLoop {
is_miner: None,
burnchain: None,
pox_watchdog_comms,
miner_status,
}
}
pub fn get_globals(&self) -> Globals {
self.globals
.clone()
.expect("FATAL: globals not instantiated")
}
fn set_globals(&mut self, globals: Globals) {
self.globals = Some(globals);
}
pub fn get_coordinator_channel(&self) -> Option<CoordinatorChannels> {
self.coordinator_channels.as_ref().map(|x| x.1.clone())
}
@@ -225,7 +246,7 @@ impl RunLoop {
}
pub fn get_termination_switch(&self) -> Arc<AtomicBool> {
self.should_keep_running.clone()
self.get_globals().should_keep_running.clone()
}
pub fn get_burnchain(&self) -> Burnchain {
@@ -240,6 +261,10 @@ impl RunLoop {
.expect("FATAL: tried to get PoX watchdog before calling .start()")
}
pub fn get_miner_status(&self) -> Arc<Mutex<MinerStatus>> {
self.miner_status.clone()
}
/// Set up termination handler. Have a signal set the `should_keep_running` atomic bool to
/// false. Panics of called more than once.
fn setup_termination_handler(&self) {
@@ -393,6 +418,7 @@ impl RunLoop {
&mut self,
burnchain_config: &Burnchain,
coordinator_receivers: CoordinatorReceivers,
miner_status: Arc<Mutex<MinerStatus>>,
) -> (JoinHandle<()>, Receiver<HashSet<AttachmentInstance>>) {
let use_test_genesis_data = use_test_genesis_chainstate(&self.config);
@@ -451,7 +477,10 @@ impl RunLoop {
let (attachments_tx, attachments_rx) = sync_channel(ATTACHMENTS_CHANNEL_SIZE);
let coordinator_thread_handle = thread::Builder::new()
.name("chains-coordinator".to_string())
.name(format!(
"chains-coordinator-{}",
&moved_config.node.rpc_bind
))
.spawn(move || {
let mut cost_estimator = moved_config.make_cost_estimator();
let mut fee_estimator = moved_config.make_fee_estimator();
@@ -465,6 +494,7 @@ impl RunLoop {
moved_atlas_config,
cost_estimator.as_deref_mut(),
fee_estimator.as_deref_mut(),
miner_status,
);
})
.expect("FATAL: failed to start chains coordinator thread");
@@ -543,21 +573,39 @@ impl RunLoop {
let burnchain_config = burnchain.get_burnchain();
self.burnchain = Some(burnchain_config.clone());
// can we mine?
let is_miner = self.check_is_miner(&mut burnchain);
self.is_miner = Some(is_miner);
// relayer linkup
let (relay_send, relay_recv) = sync_channel(RELAYER_MAX_BUFFER);
// set up globals so other subsystems can instantiate off of the runloop state.
let globals = Globals::new(
coordinator_senders,
self.get_miner_status(),
relay_send,
self.counters.clone(),
self.pox_watchdog_comms.clone(),
self.should_keep_running.clone(),
);
self.set_globals(globals.clone());
// have headers; boot up the chains coordinator and instantiate the chain state
let (coordinator_thread_handle, attachments_rx) =
self.spawn_chains_coordinator(&burnchain_config, coordinator_receivers);
let (coordinator_thread_handle, attachments_rx) = self.spawn_chains_coordinator(
&burnchain_config,
coordinator_receivers,
globals.get_miner_status(),
);
self.instantiate_pox_watchdog();
self.start_prometheus();
// We announce a new burn block so that the chains coordinator
// can resume prior work and handle eventual unprocessed sortitions
// stored during a previous session.
coordinator_senders.announce_new_burn_block();
globals.coord().announce_new_burn_block();
// Make sure at least one sortition has happened
// Make sure at least one sortition has happened, and make sure it's globally available
let sortdb = burnchain.sortdb_mut();
let (rc_aligned_height, sn) =
RunLoop::get_reward_cycle_sortition_db_height(&sortdb, &burnchain_config);
@@ -572,14 +620,11 @@ impl RunLoop {
sn
};
globals.set_last_sortition(burnchain_tip_snapshot);
// Boot up the p2p network and relayer, and figure out how many sortitions we have so far
// (it could be non-zero if the node is resuming from chainstate)
let mut node = StacksNode::spawn(
self,
Some(burnchain_tip_snapshot),
coordinator_senders.clone(),
attachments_rx,
);
let mut node = StacksNode::spawn(self, globals.clone(), relay_recv, attachments_rx);
// Wait for all pending sortitions to process
let mut burnchain_tip = burnchain
@@ -609,14 +654,14 @@ impl RunLoop {
let mut last_tenure_sortition_height = 0;
loop {
if !self.should_keep_running.load(Ordering::SeqCst) {
if !globals.keep_running() {
// The p2p thread relies on the same atomic_bool, it will
// discontinue its execution after completing its ongoing runloop epoch.
info!("Terminating p2p process");
info!("Terminating relayer");
info!("Terminating chains-coordinator");
coordinator_senders.stop_chains_coordinator();
globals.coord().stop_chains_coordinator();
coordinator_thread_handle.join().unwrap();
node.join();
@@ -652,7 +697,7 @@ impl RunLoop {
// runloop will cause the PoX sync watchdog to wait until it believes that the node has
// obtained all the Stacks blocks it can.
while burnchain_height <= target_burnchain_block_height {
if !self.should_keep_running.load(Ordering::SeqCst) {
if !globals.keep_running() {
break;
}
@@ -686,9 +731,15 @@ impl RunLoop {
);
let mut sort_count = 0;
signal_mining_blocked(globals.get_miner_status());
// first, let's process all blocks in (sortition_db_height, next_sortition_height]
for block_to_process in (sortition_db_height + 1)..(next_sortition_height + 1) {
// stop mining so we can advance the sortition DB and so our
// ProcessTenure() directive (sent by relayer_sortition_notify() below)
// will be unblocked.
debug!("Runloop: disable miner to process sortitions");
let block = {
let ic = burnchain.sortdb_ref().index_conn();
SortitionDB::get_ancestor_snapshot(&ic, block_to_process, sortition_tip)
@@ -718,6 +769,8 @@ impl RunLoop {
}
}
signal_mining_ready(globals.get_miner_status());
num_sortitions_in_last_cycle = sort_count;
debug!(
"Synchronized burnchain up to block height {} from {} (chain tip height is {}); {} sortitions",
@@ -730,7 +783,7 @@ impl RunLoop {
// we may have downloaded all the blocks already,
// so we can't rely on the relayer alone to
// drive it.
coordinator_senders.announce_new_stacks_block();
globals.coord().announce_new_stacks_block();
}
if burnchain_height == target_burnchain_block_height
@@ -771,10 +824,11 @@ impl RunLoop {
);
last_tenure_sortition_height = sortition_db_height;
}
if !node.relayer_issue_tenure() {
// relayer hung up, exit.
error!("Block relayer and miner hung up, exiting.");
continue;
break;
}
}
}