use std::collections::VecDeque; use stacks::burnchains::{Burnchain, Error as burnchain_error}; use stacks::chainstate::stacks::db::StacksChainState; use stacks::util::get_epoch_time_secs; use stacks::util::sleep_ms; use crate::burnchains::BurnchainTip; use crate::Config; use std::sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }; // amount of time to wait for an inv or download sync to complete. // These _really should_ complete before the PoX sync watchdog permits processing the next reward // cycle, so this number is intentionally high (like, there's something really wrong with your // network if your node is actualy waiting a day in-between reward cycles). const SYNC_WAIT_SECS: u64 = 24 * 3600; #[derive(Clone)] pub struct PoxSyncWatchdogComms { /// how many passes in the p2p state machine have taken place since startup? p2p_state_passes: Arc, /// how many times have we done an inv sync? inv_sync_passes: Arc, /// how many times have we done a download pass? download_passes: Arc, /// What's the burnchain tip we last saw? burnchain_tip_height: Arc, /// What's our last IBD status? last_ibd: Arc, /// Should keep running? should_keep_running: Arc, } impl PoxSyncWatchdogComms { pub fn new(should_keep_running: Arc) -> PoxSyncWatchdogComms { PoxSyncWatchdogComms { p2p_state_passes: Arc::new(AtomicU64::new(0)), inv_sync_passes: Arc::new(AtomicU64::new(0)), download_passes: Arc::new(AtomicU64::new(0)), burnchain_tip_height: Arc::new(AtomicU64::new(0)), last_ibd: Arc::new(AtomicBool::new(true)), should_keep_running, } } pub fn get_p2p_state_passes(&self) -> u64 { self.p2p_state_passes.load(Ordering::SeqCst) } pub fn get_inv_sync_passes(&self) -> u64 { self.inv_sync_passes.load(Ordering::SeqCst) } pub fn get_download_passes(&self) -> u64 { self.download_passes.load(Ordering::SeqCst) } pub fn get_ibd(&self) -> bool { self.last_ibd.load(Ordering::SeqCst) } /// Wait for at least one inv-sync state-machine passes pub fn wait_for_inv_sync_pass(&self, timeout: u64) -> Result { let current = self.get_inv_sync_passes(); let now = get_epoch_time_secs(); while current >= self.get_inv_sync_passes() { if now + timeout < get_epoch_time_secs() { debug!("PoX watchdog comms: timed out waiting for one inv-sync pass"); return Ok(false); } self.interruptable_sleep(1)?; std::hint::spin_loop(); } return Ok(true); } fn interruptable_sleep(&self, secs: u64) -> Result<(), burnchain_error> { let deadline = secs + get_epoch_time_secs(); while get_epoch_time_secs() < deadline { sleep_ms(1000); if !self.should_keep_running() { return Err(burnchain_error::CoordinatorClosed); } } Ok(()) } pub fn wait_for_download_pass(&self, timeout: u64) -> Result { let current = self.get_download_passes(); let now = get_epoch_time_secs(); while current >= self.get_download_passes() { if now + timeout < get_epoch_time_secs() { debug!("PoX watchdog comms: timed out waiting for one download pass"); return Ok(false); } self.interruptable_sleep(1)?; std::hint::spin_loop(); } return Ok(true); } pub fn should_keep_running(&self) -> bool { self.should_keep_running.load(Ordering::SeqCst) } pub fn notify_p2p_state_pass(&mut self) { self.p2p_state_passes.fetch_add(1, Ordering::SeqCst); } pub fn notify_inv_sync_pass(&mut self) { self.inv_sync_passes.fetch_add(1, Ordering::SeqCst); } pub fn notify_download_pass(&mut self) { self.download_passes.fetch_add(1, Ordering::SeqCst); } pub fn set_ibd(&mut self, value: bool) { self.last_ibd.store(value, Ordering::SeqCst); } } /// Monitor the state of the Stacks blockchain as the peer network and relay threads download and /// proces Stacks blocks. Don't allow the node to process the next PoX reward cycle's sortitions /// unless it's reasonably sure that it has processed all Stacks blocks for this reward cycle. /// This struct monitors the Stacks chainstate to make this determination. pub struct PoxSyncWatchdog { /// number of attachable but unprocessed staging blocks over time new_attachable_blocks: VecDeque, /// number of newly-processed staging blocks over time new_processed_blocks: VecDeque, /// last time we asked for attachable blocks last_attachable_query: u64, /// last time we asked for processed blocks last_processed_query: u64, /// number of samples to take max_samples: u64, /// maximum number of blocks to count per query (affects performance!) max_staging: u64, /// when did we first start watching? watch_start_ts: u64, /// when did we first see a flatline in block-processing rate? last_block_processed_ts: u64, /// estimated time for a block to get downloaded. Used to infer how long to wait for the first /// blocks to show up when waiting for this reward cycle. estimated_block_download_time: f64, /// estimated time for a block to get processed -- from when it shows up as attachable to when /// it shows up as processed. Used to infer how long to wait for the last block to get /// processed before unblocking burnchain sync for the next reward cycle. estimated_block_process_time: f64, /// time between burnchain syncs in stead state steady_state_burnchain_sync_interval: u64, /// when to re-sync under steady state steady_state_resync_ts: u64, /// chainstate handle chainstate: StacksChainState, /// handle to relayer thread that informs the watchdog when the P2P state-machine does stuff relayer_comms: PoxSyncWatchdogComms, /// should this sync watchdog always download? used in integration tests. unconditionally_download: bool, } const PER_SAMPLE_WAIT_MS: u64 = 1000; impl PoxSyncWatchdog { pub fn new( config: &Config, should_keep_running: Arc, ) -> Result { let mainnet = config.is_mainnet(); let chain_id = config.burnchain.chain_id; let chainstate_path = config.get_chainstate_path_str(); let burnchain_poll_time = config.burnchain.poll_time_secs; let download_timeout = config.connection_options.timeout; let max_samples = config.node.pox_sync_sample_secs; let unconditionally_download = config.node.pox_sync_sample_secs == 0; let marf_opts = config.node.get_marf_opts(); let (chainstate, _) = match StacksChainState::open(mainnet, chain_id, &chainstate_path, Some(marf_opts)) { Ok(cs) => cs, Err(e) => { return Err(format!( "Failed to open chainstate at '{}': {:?}", &chainstate_path, &e )); } }; Ok(PoxSyncWatchdog { unconditionally_download, new_attachable_blocks: VecDeque::new(), new_processed_blocks: VecDeque::new(), last_attachable_query: 0, last_processed_query: 0, max_samples: max_samples, max_staging: 10, watch_start_ts: 0, last_block_processed_ts: 0, estimated_block_download_time: download_timeout as f64, estimated_block_process_time: 5.0, steady_state_burnchain_sync_interval: burnchain_poll_time, steady_state_resync_ts: 0, chainstate: chainstate, relayer_comms: PoxSyncWatchdogComms::new(should_keep_running), }) } pub fn make_comms_handle(&self) -> PoxSyncWatchdogComms { self.relayer_comms.clone() } /// How many recently-added Stacks blocks are in an attachable state, up to $max_staging? fn count_attachable_stacks_blocks(&mut self) -> Result { // number of staging blocks that have arrived since the last sortition let cnt = StacksChainState::count_attachable_staging_blocks( &self.chainstate.db(), self.max_staging, self.last_attachable_query, ) .map_err(|e| format!("Failed to count attachable staging blocks: {:?}", &e))?; self.last_attachable_query = get_epoch_time_secs(); Ok(cnt) } /// How many recently-processed Stacks blocks are there, up to $max_staging? /// ($max_staging is necessary to limit the runtime of this method, since the underlying SQL /// uses COUNT(*), which in Sqlite is a _O(n)_ operation for _n_ rows) fn count_processed_stacks_blocks(&mut self) -> Result { // number of staging blocks that have arrived since the last sortition let cnt = StacksChainState::count_processed_staging_blocks( &self.chainstate.db(), self.max_staging, self.last_processed_query, ) .map_err(|e| format!("Failed to count attachable staging blocks: {:?}", &e))?; self.last_processed_query = get_epoch_time_secs(); Ok(cnt) } /// Are we in the initial burnchain block download? i.e. is the burn tip snapshot far enough away /// from the burnchain height that we should be eagerly downloading snapshots? pub fn infer_initial_burnchain_block_download( burnchain: &Burnchain, last_processed_height: u64, burnchain_height: u64, ) -> bool { let ibd = last_processed_height + (burnchain.stable_confirmations as u64) < burnchain_height; if ibd { debug!( "PoX watchdog: {} + {} < {}, so initial block download", last_processed_height, burnchain.stable_confirmations, burnchain_height ); } else { debug!( "PoX watchdog: {} + {} >= {}, so steady-state", last_processed_height, burnchain.stable_confirmations, burnchain_height ); } ibd } /// Calculate the first derivative of a list of points fn derivative(sample_list: &VecDeque) -> Vec { let mut deltas = vec![]; let mut prev = 0; for (i, sample) in sample_list.iter().enumerate() { if i == 0 { prev = *sample; continue; } let delta = *sample - prev; prev = *sample; deltas.push(delta); } deltas } /// Is a derivative approximately flat, with a maximum absolute deviation from 0? /// Return whether or not the sample is mostly flat, and how many points were over the given /// error bar in either direction. fn is_mostly_flat(deriv: &Vec, error: i64) -> (bool, usize) { let mut total_deviates = 0; let mut ret = true; for d in deriv.iter() { if d.abs() > error { total_deviates += 1; ret = false; } } (ret, total_deviates) } /// low and high pass filter average -- take average without the smallest and largest values fn hilo_filter_avg(samples: &Vec) -> f64 { // take average with low and high pass let mut min = i64::MAX; let mut max = i64::MIN; for s in samples.iter() { if *s < 0 { // nonsensical result (e.g. due to clock drift?) continue; } if *s < min { min = *s; } if *s > max { max = *s; } } let mut count = 0; let mut sum = 0; for s in samples.iter() { if *s < 0 { // nonsensical result continue; } if *s == min { continue; } if *s == max { continue; } count += 1; sum += *s; } if count == 0 { // no viable samples 1.0 } else { (sum as f64) / (count as f64) } } /// estimate how long a block remains in an unprocessed state fn estimate_block_process_time( chainstate: &StacksChainState, burnchain: &Burnchain, tip_height: u64, ) -> f64 { let this_reward_cycle = burnchain .block_height_to_reward_cycle(tip_height) .expect(&format!("BUG: no reward cycle for {}", tip_height)); let prev_reward_cycle = this_reward_cycle.saturating_sub(1); let start_height = burnchain.reward_cycle_to_block_height(prev_reward_cycle); let end_height = burnchain.reward_cycle_to_block_height(this_reward_cycle); if this_reward_cycle > 0 { assert!(start_height < end_height); } else { // no samples yet return 1.0; } let block_wait_times = StacksChainState::measure_block_wait_time(&chainstate.db(), start_height, end_height) .expect("BUG: failed to query chainstate block-processing times"); PoxSyncWatchdog::hilo_filter_avg(&block_wait_times) } /// estimate how long a block takes to download fn estimate_block_download_time( chainstate: &StacksChainState, burnchain: &Burnchain, tip_height: u64, ) -> f64 { let this_reward_cycle = burnchain .block_height_to_reward_cycle(tip_height) .expect(&format!("BUG: no reward cycle for {}", tip_height)); let prev_reward_cycle = this_reward_cycle.saturating_sub(1); let start_height = burnchain.reward_cycle_to_block_height(prev_reward_cycle); let end_height = burnchain.reward_cycle_to_block_height(this_reward_cycle); if this_reward_cycle > 0 { assert!(start_height < end_height); } else { // no samples yet return 1.0; } let block_download_times = StacksChainState::measure_block_download_time( &chainstate.db(), start_height, end_height, ) .expect("BUG: failed to query chainstate block-download times"); PoxSyncWatchdog::hilo_filter_avg(&block_download_times) } /// Reset internal state. Performed when it's okay to begin syncing the burnchain. /// Updates estimate for block-processing time and block-downloading time. fn reset(&mut self, burnchain: &Burnchain, tip_height: u64) { // find the average (with low/high pass filter) time a block spends in the DB without being // processed, during this reward cycle self.estimated_block_process_time = PoxSyncWatchdog::estimate_block_process_time(&self.chainstate, burnchain, tip_height); // find the average (with low/high pass filter) time a block spends downloading self.estimated_block_download_time = PoxSyncWatchdog::estimate_block_download_time(&self.chainstate, burnchain, tip_height); debug!( "Estimated block download time: {}s. Estimated block processing time: {}s", self.estimated_block_download_time, self.estimated_block_process_time ); self.new_attachable_blocks.clear(); self.new_processed_blocks.clear(); self.last_block_processed_ts = 0; self.watch_start_ts = 0; self.steady_state_resync_ts = 0; } /// Wait until all of the Stacks blocks for the given reward cycle are seemingly downloaded and /// processed. Do so by watching the _rate_ at which attachable Stacks blocks arrive and get /// processed. /// Returns whether or not we're still in the initial block download -- i.e. true if we're /// still downloading burnchain blocks, or we haven't reached steady-state block-processing. pub fn pox_sync_wait( &mut self, burnchain: &Burnchain, burnchain_tip: &BurnchainTip, // this is the highest burnchain snapshot we've sync'ed to burnchain_height_opt: Option, // this is the absolute burnchain block height, if known num_sortitions_in_last_cycle: u64, ) -> Result { let burnchain_height = match burnchain_height_opt { Some(bh) => bh, None => { // not known yet, so assume IBD debug!("Pox watchdog: burnchain height not known yet, so assume IBD"); self.relayer_comms.set_ibd(true); sleep_ms(self.steady_state_burnchain_sync_interval); return Ok(true); } }; if self.watch_start_ts == 0 { self.watch_start_ts = get_epoch_time_secs(); } if self.steady_state_resync_ts == 0 { self.steady_state_resync_ts = get_epoch_time_secs() + self.steady_state_burnchain_sync_interval; } let ibbd = PoxSyncWatchdog::infer_initial_burnchain_block_download( burnchain, burnchain_tip.block_snapshot.block_height, burnchain_height, ); // unconditionally download the first reward cycle if burnchain_tip.block_snapshot.block_height < burnchain.first_block_height + (burnchain.pox_constants.reward_cycle_length as u64) { debug!("PoX watchdog in first reward cycle -- sync immediately"); self.relayer_comms.set_ibd(ibbd); self.relayer_comms .interruptable_sleep(self.steady_state_burnchain_sync_interval)?; return Ok(ibbd); } if self.unconditionally_download { debug!( "PoX watchdog set to unconditionally download (ibd={})", ibbd ); self.relayer_comms.set_ibd(ibbd); return Ok(ibbd); } let mut waited = false; if ibbd { // we are far behind the burnchain tip (i.e. not in the last reward cycle), // so make sure the downloader knows about blocks it doesn't have yet so we can go and // fetch its blocks before proceeding. if num_sortitions_in_last_cycle > 0 { debug!("PoX watchdog: Wait for at least one inventory state-machine pass..."); self.relayer_comms.wait_for_inv_sync_pass(SYNC_WAIT_SECS)?; waited = true; } else { debug!("PoX watchdog: In initial block download, and no sortitions to consider in this reward cycle -- sync immediately"); self.relayer_comms.set_ibd(ibbd); return Ok(ibbd); } } else { debug!("PoX watchdog: not in initial burn block download, so not waiting for an inventory state-machine pass"); } if burnchain_tip.block_snapshot.block_height + (burnchain.pox_constants.reward_cycle_length as u64) >= burnchain_height { // unconditionally download if we're within the last reward cycle (after the poll timeout) if !waited { debug!( "PoX watchdog in last reward cycle -- sync after {} seconds", self.steady_state_burnchain_sync_interval ); self.relayer_comms.set_ibd(ibbd); self.relayer_comms .interruptable_sleep(self.steady_state_burnchain_sync_interval)?; } else { debug!("PoX watchdog in last reward cycle -- sync immediately"); self.relayer_comms.set_ibd(ibbd); } return Ok(ibbd); } // have we reached steady-state behavior? i.e. have we stopped processing both burnchain // and Stacks blocks? let mut steady_state = false; debug!("PoX watchdog: Wait until chainstate reaches steady-state block-processing..."); let ibbd = loop { if !self.relayer_comms.should_keep_running() { break false; } let ibbd = PoxSyncWatchdog::infer_initial_burnchain_block_download( burnchain, burnchain_tip.block_snapshot.block_height, burnchain_height, ); let expected_first_block_deadline = self.watch_start_ts + (self.estimated_block_download_time as u64); let expected_last_block_deadline = self.last_block_processed_ts + (self.estimated_block_download_time as u64) + (self.estimated_block_process_time as u64); match ( self.count_attachable_stacks_blocks(), self.count_processed_stacks_blocks(), ) { (Ok(num_available), Ok(num_processed)) => { self.new_attachable_blocks.push_back(num_available as i64); self.new_processed_blocks.push_back(num_processed as i64); if (self.new_attachable_blocks.len() as u64) > self.max_samples { self.new_attachable_blocks.pop_front(); } if (self.new_processed_blocks.len() as u64) > self.max_samples { self.new_processed_blocks.pop_front(); } if (self.new_attachable_blocks.len() as u64) < self.max_samples || (self.new_processed_blocks.len() as u64) < self.max_samples { // still getting initial samples if self.new_processed_blocks.len() % 10 == 0 { debug!( "PoX watchdog: Still warming up: {} out of {} samples...", &self.new_attachable_blocks.len(), &self.max_samples ); } sleep_ms(PER_SAMPLE_WAIT_MS); continue; } if self.watch_start_ts > 0 && get_epoch_time_secs() < expected_first_block_deadline { // still waiting for that first block in this reward cycle debug!("PoX watchdog: Still warming up: waiting until {}s for first Stacks block download (estimated download time: {}s)...", expected_first_block_deadline, self.estimated_block_download_time); sleep_ms(PER_SAMPLE_WAIT_MS); continue; } if self.watch_start_ts > 0 && (self.new_attachable_blocks.len() as u64) < self.max_samples && self.watch_start_ts + self.max_samples + self.steady_state_burnchain_sync_interval * (burnchain.stable_confirmations as u64) < get_epoch_time_secs() { debug!( "PoX watchdog: could not calculate {} samples in {} seconds. Assuming suspend/resume, or assuming load is too high.", self.max_samples, self.max_samples + self.steady_state_burnchain_sync_interval * (burnchain.stable_confirmations as u64) ); self.reset(burnchain, burnchain_tip.block_snapshot.block_height); self.watch_start_ts = get_epoch_time_secs(); self.steady_state_resync_ts = get_epoch_time_secs() + self.steady_state_burnchain_sync_interval; continue; } // take first derivative of samples -- see if the download and processing rate has gone to 0 let attachable_delta = PoxSyncWatchdog::derivative(&self.new_attachable_blocks); let processed_delta = PoxSyncWatchdog::derivative(&self.new_processed_blocks); let (flat_attachable, attachable_deviants) = PoxSyncWatchdog::is_mostly_flat(&attachable_delta, 0); let (flat_processed, processed_deviants) = PoxSyncWatchdog::is_mostly_flat(&processed_delta, 0); debug!("PoX watchdog: flat-attachable?: {}, flat-processed?: {}, estimated block-download time: {}s, estimated block-processing time: {}s", flat_attachable, flat_processed, self.estimated_block_download_time, self.estimated_block_process_time); if flat_attachable && flat_processed && self.last_block_processed_ts == 0 { // we're flat-lining -- this may be the end of this cycle self.last_block_processed_ts = get_epoch_time_secs(); } if self.last_block_processed_ts > 0 && get_epoch_time_secs() < expected_last_block_deadline { debug!("PoX watchdog: Still processing blocks; waiting until at least min({},{})s before burnchain synchronization (estimated block-processing time: {}s)", get_epoch_time_secs() + 1, expected_last_block_deadline, self.estimated_block_process_time); sleep_ms(PER_SAMPLE_WAIT_MS); continue; } if ibbd { // doing initial burnchain block download right now. // only proceed to fetch the next reward cycle's burnchain blocks if we're neither downloading nor // attaching blocks recently debug!("PoX watchdog: In initial burnchain block download: flat-attachable = {}, flat-processed = {}, min-attachable: {}, min-processed: {}", flat_attachable, flat_processed, &attachable_deviants, &processed_deviants); if !flat_attachable || !flat_processed { sleep_ms(PER_SAMPLE_WAIT_MS); continue; } } else { let now = get_epoch_time_secs(); if now < self.steady_state_resync_ts { // steady state if !steady_state { debug!("PoX watchdog: In steady-state; waiting until at least {} before burnchain synchronization", self.steady_state_resync_ts); steady_state = flat_attachable && flat_processed; } sleep_ms(PER_SAMPLE_WAIT_MS); continue; } else { // steady state if !steady_state { debug!("PoX watchdog: In steady-state, but ready burnchain synchronization as of {}", self.steady_state_resync_ts); steady_state = flat_attachable && flat_processed; } } } } (err_attach, err_processed) => { // can only happen on DB query failure error!("PoX watchdog: Failed to count recently attached ('{:?}') and/or processed ('{:?}') staging blocks", &err_attach, &err_processed); panic!(); } }; if ibbd || !steady_state { debug!("PoX watchdog: Wait for at least one downloader state-machine pass before resetting..."); self.relayer_comms.wait_for_download_pass(SYNC_WAIT_SECS)?; } else { debug!("PoX watchdog: in steady-state, so not waiting for download pass"); } self.reset(burnchain, burnchain_tip.block_snapshot.block_height); break ibbd; }; let ret = ibbd || !steady_state; self.relayer_comms.set_ibd(ret); Ok(ret) } }