mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-06-11 15:33:39 +08:00
688 lines
28 KiB
Rust
688 lines
28 KiB
Rust
use std::collections::VecDeque;
|
|
|
|
use stacks::burnchains::{Burnchain, Error as burnchain_error};
|
|
use stacks::chainstate::stacks::db::StacksChainState;
|
|
use stacks::util::get_epoch_time_secs;
|
|
use stacks::util::sleep_ms;
|
|
|
|
use crate::burnchains::BurnchainTip;
|
|
use crate::Config;
|
|
|
|
use std::sync::{
|
|
atomic::{AtomicBool, AtomicU64, Ordering},
|
|
Arc,
|
|
};
|
|
|
|
// amount of time to wait for an inv or download sync to complete.
|
|
// These _really should_ complete before the PoX sync watchdog permits processing the next reward
|
|
// cycle, so this number is intentionally high (like, there's something really wrong with your
|
|
// network if your node is actualy waiting a day in-between reward cycles).
|
|
const SYNC_WAIT_SECS: u64 = 24 * 3600;
|
|
|
|
#[derive(Clone)]
|
|
pub struct PoxSyncWatchdogComms {
|
|
/// how many passes in the p2p state machine have taken place since startup?
|
|
p2p_state_passes: Arc<AtomicU64>,
|
|
/// how many times have we done an inv sync?
|
|
inv_sync_passes: Arc<AtomicU64>,
|
|
/// how many times have we done a download pass?
|
|
download_passes: Arc<AtomicU64>,
|
|
/// What's the burnchain tip we last saw?
|
|
burnchain_tip_height: Arc<AtomicU64>,
|
|
/// What's our last IBD status?
|
|
last_ibd: Arc<AtomicBool>,
|
|
/// Should keep running?
|
|
should_keep_running: Arc<AtomicBool>,
|
|
}
|
|
|
|
impl PoxSyncWatchdogComms {
|
|
pub fn new(should_keep_running: Arc<AtomicBool>) -> PoxSyncWatchdogComms {
|
|
PoxSyncWatchdogComms {
|
|
p2p_state_passes: Arc::new(AtomicU64::new(0)),
|
|
inv_sync_passes: Arc::new(AtomicU64::new(0)),
|
|
download_passes: Arc::new(AtomicU64::new(0)),
|
|
burnchain_tip_height: Arc::new(AtomicU64::new(0)),
|
|
last_ibd: Arc::new(AtomicBool::new(true)),
|
|
should_keep_running,
|
|
}
|
|
}
|
|
|
|
pub fn get_p2p_state_passes(&self) -> u64 {
|
|
self.p2p_state_passes.load(Ordering::SeqCst)
|
|
}
|
|
|
|
pub fn get_inv_sync_passes(&self) -> u64 {
|
|
self.inv_sync_passes.load(Ordering::SeqCst)
|
|
}
|
|
|
|
pub fn get_download_passes(&self) -> u64 {
|
|
self.download_passes.load(Ordering::SeqCst)
|
|
}
|
|
|
|
pub fn get_ibd(&self) -> bool {
|
|
self.last_ibd.load(Ordering::SeqCst)
|
|
}
|
|
|
|
/// Wait for at least one inv-sync state-machine passes
|
|
pub fn wait_for_inv_sync_pass(&self, timeout: u64) -> Result<bool, burnchain_error> {
|
|
let current = self.get_inv_sync_passes();
|
|
|
|
let now = get_epoch_time_secs();
|
|
while current >= self.get_inv_sync_passes() {
|
|
if now + timeout < get_epoch_time_secs() {
|
|
debug!("PoX watchdog comms: timed out waiting for one inv-sync pass");
|
|
return Ok(false);
|
|
}
|
|
self.interruptable_sleep(1)?;
|
|
std::hint::spin_loop();
|
|
}
|
|
return Ok(true);
|
|
}
|
|
|
|
fn interruptable_sleep(&self, secs: u64) -> Result<(), burnchain_error> {
|
|
let deadline = secs + get_epoch_time_secs();
|
|
while get_epoch_time_secs() < deadline {
|
|
sleep_ms(1000);
|
|
if !self.should_keep_running() {
|
|
return Err(burnchain_error::CoordinatorClosed);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub fn wait_for_download_pass(&self, timeout: u64) -> Result<bool, burnchain_error> {
|
|
let current = self.get_download_passes();
|
|
|
|
let now = get_epoch_time_secs();
|
|
while current >= self.get_download_passes() {
|
|
if now + timeout < get_epoch_time_secs() {
|
|
debug!("PoX watchdog comms: timed out waiting for one download pass");
|
|
return Ok(false);
|
|
}
|
|
self.interruptable_sleep(1)?;
|
|
std::hint::spin_loop();
|
|
}
|
|
return Ok(true);
|
|
}
|
|
|
|
pub fn should_keep_running(&self) -> bool {
|
|
self.should_keep_running.load(Ordering::SeqCst)
|
|
}
|
|
|
|
pub fn notify_p2p_state_pass(&mut self) {
|
|
self.p2p_state_passes.fetch_add(1, Ordering::SeqCst);
|
|
}
|
|
|
|
pub fn notify_inv_sync_pass(&mut self) {
|
|
self.inv_sync_passes.fetch_add(1, Ordering::SeqCst);
|
|
}
|
|
|
|
pub fn notify_download_pass(&mut self) {
|
|
self.download_passes.fetch_add(1, Ordering::SeqCst);
|
|
}
|
|
|
|
pub fn set_ibd(&mut self, value: bool) {
|
|
self.last_ibd.store(value, Ordering::SeqCst);
|
|
}
|
|
}
|
|
|
|
/// Monitor the state of the Stacks blockchain as the peer network and relay threads download and
|
|
/// proces Stacks blocks. Don't allow the node to process the next PoX reward cycle's sortitions
|
|
/// unless it's reasonably sure that it has processed all Stacks blocks for this reward cycle.
|
|
/// This struct monitors the Stacks chainstate to make this determination.
|
|
pub struct PoxSyncWatchdog {
|
|
/// number of attachable but unprocessed staging blocks over time
|
|
new_attachable_blocks: VecDeque<i64>,
|
|
/// number of newly-processed staging blocks over time
|
|
new_processed_blocks: VecDeque<i64>,
|
|
/// last time we asked for attachable blocks
|
|
last_attachable_query: u64,
|
|
/// last time we asked for processed blocks
|
|
last_processed_query: u64,
|
|
/// number of samples to take
|
|
max_samples: u64,
|
|
/// maximum number of blocks to count per query (affects performance!)
|
|
max_staging: u64,
|
|
/// when did we first start watching?
|
|
watch_start_ts: u64,
|
|
/// when did we first see a flatline in block-processing rate?
|
|
last_block_processed_ts: u64,
|
|
/// estimated time for a block to get downloaded. Used to infer how long to wait for the first
|
|
/// blocks to show up when waiting for this reward cycle.
|
|
estimated_block_download_time: f64,
|
|
/// estimated time for a block to get processed -- from when it shows up as attachable to when
|
|
/// it shows up as processed. Used to infer how long to wait for the last block to get
|
|
/// processed before unblocking burnchain sync for the next reward cycle.
|
|
estimated_block_process_time: f64,
|
|
/// time between burnchain syncs in stead state
|
|
steady_state_burnchain_sync_interval: u64,
|
|
/// when to re-sync under steady state
|
|
steady_state_resync_ts: u64,
|
|
/// chainstate handle
|
|
chainstate: StacksChainState,
|
|
/// handle to relayer thread that informs the watchdog when the P2P state-machine does stuff
|
|
relayer_comms: PoxSyncWatchdogComms,
|
|
/// should this sync watchdog always download? used in integration tests.
|
|
unconditionally_download: bool,
|
|
}
|
|
|
|
const PER_SAMPLE_WAIT_MS: u64 = 1000;
|
|
|
|
impl PoxSyncWatchdog {
|
|
pub fn new(
|
|
config: &Config,
|
|
should_keep_running: Arc<AtomicBool>,
|
|
) -> Result<PoxSyncWatchdog, String> {
|
|
let mainnet = config.is_mainnet();
|
|
let chain_id = config.burnchain.chain_id;
|
|
let chainstate_path = config.get_chainstate_path_str();
|
|
let burnchain_poll_time = config.burnchain.poll_time_secs;
|
|
let download_timeout = config.connection_options.timeout;
|
|
let max_samples = config.node.pox_sync_sample_secs;
|
|
let unconditionally_download = config.node.pox_sync_sample_secs == 0;
|
|
let marf_opts = config.node.get_marf_opts();
|
|
|
|
let (chainstate, _) =
|
|
match StacksChainState::open(mainnet, chain_id, &chainstate_path, Some(marf_opts)) {
|
|
Ok(cs) => cs,
|
|
Err(e) => {
|
|
return Err(format!(
|
|
"Failed to open chainstate at '{}': {:?}",
|
|
&chainstate_path, &e
|
|
));
|
|
}
|
|
};
|
|
|
|
Ok(PoxSyncWatchdog {
|
|
unconditionally_download,
|
|
new_attachable_blocks: VecDeque::new(),
|
|
new_processed_blocks: VecDeque::new(),
|
|
last_attachable_query: 0,
|
|
last_processed_query: 0,
|
|
max_samples: max_samples,
|
|
max_staging: 10,
|
|
watch_start_ts: 0,
|
|
last_block_processed_ts: 0,
|
|
estimated_block_download_time: download_timeout as f64,
|
|
estimated_block_process_time: 5.0,
|
|
steady_state_burnchain_sync_interval: burnchain_poll_time,
|
|
steady_state_resync_ts: 0,
|
|
chainstate: chainstate,
|
|
relayer_comms: PoxSyncWatchdogComms::new(should_keep_running),
|
|
})
|
|
}
|
|
|
|
pub fn make_comms_handle(&self) -> PoxSyncWatchdogComms {
|
|
self.relayer_comms.clone()
|
|
}
|
|
|
|
/// How many recently-added Stacks blocks are in an attachable state, up to $max_staging?
|
|
fn count_attachable_stacks_blocks(&mut self) -> Result<u64, String> {
|
|
// number of staging blocks that have arrived since the last sortition
|
|
let cnt = StacksChainState::count_attachable_staging_blocks(
|
|
&self.chainstate.db(),
|
|
self.max_staging,
|
|
self.last_attachable_query,
|
|
)
|
|
.map_err(|e| format!("Failed to count attachable staging blocks: {:?}", &e))?;
|
|
|
|
self.last_attachable_query = get_epoch_time_secs();
|
|
Ok(cnt)
|
|
}
|
|
|
|
/// How many recently-processed Stacks blocks are there, up to $max_staging?
|
|
/// ($max_staging is necessary to limit the runtime of this method, since the underlying SQL
|
|
/// uses COUNT(*), which in Sqlite is a _O(n)_ operation for _n_ rows)
|
|
fn count_processed_stacks_blocks(&mut self) -> Result<u64, String> {
|
|
// number of staging blocks that have arrived since the last sortition
|
|
let cnt = StacksChainState::count_processed_staging_blocks(
|
|
&self.chainstate.db(),
|
|
self.max_staging,
|
|
self.last_processed_query,
|
|
)
|
|
.map_err(|e| format!("Failed to count attachable staging blocks: {:?}", &e))?;
|
|
|
|
self.last_processed_query = get_epoch_time_secs();
|
|
Ok(cnt)
|
|
}
|
|
|
|
/// Are we in the initial burnchain block download? i.e. is the burn tip snapshot far enough away
|
|
/// from the burnchain height that we should be eagerly downloading snapshots?
|
|
pub fn infer_initial_burnchain_block_download(
|
|
burnchain: &Burnchain,
|
|
last_processed_height: u64,
|
|
burnchain_height: u64,
|
|
) -> bool {
|
|
let ibd =
|
|
last_processed_height + (burnchain.stable_confirmations as u64) < burnchain_height;
|
|
if ibd {
|
|
debug!(
|
|
"PoX watchdog: {} + {} < {}, so initial block download",
|
|
last_processed_height, burnchain.stable_confirmations, burnchain_height
|
|
);
|
|
} else {
|
|
debug!(
|
|
"PoX watchdog: {} + {} >= {}, so steady-state",
|
|
last_processed_height, burnchain.stable_confirmations, burnchain_height
|
|
);
|
|
}
|
|
ibd
|
|
}
|
|
|
|
/// Calculate the first derivative of a list of points
|
|
fn derivative(sample_list: &VecDeque<i64>) -> Vec<i64> {
|
|
let mut deltas = vec![];
|
|
let mut prev = 0;
|
|
for (i, sample) in sample_list.iter().enumerate() {
|
|
if i == 0 {
|
|
prev = *sample;
|
|
continue;
|
|
}
|
|
let delta = *sample - prev;
|
|
prev = *sample;
|
|
deltas.push(delta);
|
|
}
|
|
deltas
|
|
}
|
|
|
|
/// Is a derivative approximately flat, with a maximum absolute deviation from 0?
|
|
/// Return whether or not the sample is mostly flat, and how many points were over the given
|
|
/// error bar in either direction.
|
|
fn is_mostly_flat(deriv: &Vec<i64>, error: i64) -> (bool, usize) {
|
|
let mut total_deviates = 0;
|
|
let mut ret = true;
|
|
for d in deriv.iter() {
|
|
if d.abs() > error {
|
|
total_deviates += 1;
|
|
ret = false;
|
|
}
|
|
}
|
|
(ret, total_deviates)
|
|
}
|
|
|
|
/// low and high pass filter average -- take average without the smallest and largest values
|
|
fn hilo_filter_avg(samples: &Vec<i64>) -> f64 {
|
|
// take average with low and high pass
|
|
let mut min = i64::MAX;
|
|
let mut max = i64::MIN;
|
|
for s in samples.iter() {
|
|
if *s < 0 {
|
|
// nonsensical result (e.g. due to clock drift?)
|
|
continue;
|
|
}
|
|
if *s < min {
|
|
min = *s;
|
|
}
|
|
if *s > max {
|
|
max = *s;
|
|
}
|
|
}
|
|
|
|
let mut count = 0;
|
|
let mut sum = 0;
|
|
for s in samples.iter() {
|
|
if *s < 0 {
|
|
// nonsensical result
|
|
continue;
|
|
}
|
|
if *s == min {
|
|
continue;
|
|
}
|
|
if *s == max {
|
|
continue;
|
|
}
|
|
count += 1;
|
|
sum += *s;
|
|
}
|
|
|
|
if count == 0 {
|
|
// no viable samples
|
|
1.0
|
|
} else {
|
|
(sum as f64) / (count as f64)
|
|
}
|
|
}
|
|
|
|
/// estimate how long a block remains in an unprocessed state
|
|
fn estimate_block_process_time(
|
|
chainstate: &StacksChainState,
|
|
burnchain: &Burnchain,
|
|
tip_height: u64,
|
|
) -> f64 {
|
|
let this_reward_cycle = burnchain
|
|
.block_height_to_reward_cycle(tip_height)
|
|
.expect(&format!("BUG: no reward cycle for {}", tip_height));
|
|
let prev_reward_cycle = this_reward_cycle.saturating_sub(1);
|
|
|
|
let start_height = burnchain.reward_cycle_to_block_height(prev_reward_cycle);
|
|
let end_height = burnchain.reward_cycle_to_block_height(this_reward_cycle);
|
|
|
|
if this_reward_cycle > 0 {
|
|
assert!(start_height < end_height);
|
|
} else {
|
|
// no samples yet
|
|
return 1.0;
|
|
}
|
|
|
|
let block_wait_times =
|
|
StacksChainState::measure_block_wait_time(&chainstate.db(), start_height, end_height)
|
|
.expect("BUG: failed to query chainstate block-processing times");
|
|
|
|
PoxSyncWatchdog::hilo_filter_avg(&block_wait_times)
|
|
}
|
|
|
|
/// estimate how long a block takes to download
|
|
fn estimate_block_download_time(
|
|
chainstate: &StacksChainState,
|
|
burnchain: &Burnchain,
|
|
tip_height: u64,
|
|
) -> f64 {
|
|
let this_reward_cycle = burnchain
|
|
.block_height_to_reward_cycle(tip_height)
|
|
.expect(&format!("BUG: no reward cycle for {}", tip_height));
|
|
let prev_reward_cycle = this_reward_cycle.saturating_sub(1);
|
|
|
|
let start_height = burnchain.reward_cycle_to_block_height(prev_reward_cycle);
|
|
let end_height = burnchain.reward_cycle_to_block_height(this_reward_cycle);
|
|
|
|
if this_reward_cycle > 0 {
|
|
assert!(start_height < end_height);
|
|
} else {
|
|
// no samples yet
|
|
return 1.0;
|
|
}
|
|
|
|
let block_download_times = StacksChainState::measure_block_download_time(
|
|
&chainstate.db(),
|
|
start_height,
|
|
end_height,
|
|
)
|
|
.expect("BUG: failed to query chainstate block-download times");
|
|
|
|
PoxSyncWatchdog::hilo_filter_avg(&block_download_times)
|
|
}
|
|
|
|
/// Reset internal state. Performed when it's okay to begin syncing the burnchain.
|
|
/// Updates estimate for block-processing time and block-downloading time.
|
|
fn reset(&mut self, burnchain: &Burnchain, tip_height: u64) {
|
|
// find the average (with low/high pass filter) time a block spends in the DB without being
|
|
// processed, during this reward cycle
|
|
self.estimated_block_process_time =
|
|
PoxSyncWatchdog::estimate_block_process_time(&self.chainstate, burnchain, tip_height);
|
|
|
|
// find the average (with low/high pass filter) time a block spends downloading
|
|
self.estimated_block_download_time =
|
|
PoxSyncWatchdog::estimate_block_download_time(&self.chainstate, burnchain, tip_height);
|
|
|
|
debug!(
|
|
"Estimated block download time: {}s. Estimated block processing time: {}s",
|
|
self.estimated_block_download_time, self.estimated_block_process_time
|
|
);
|
|
|
|
self.new_attachable_blocks.clear();
|
|
self.new_processed_blocks.clear();
|
|
self.last_block_processed_ts = 0;
|
|
self.watch_start_ts = 0;
|
|
self.steady_state_resync_ts = 0;
|
|
}
|
|
|
|
/// Wait until all of the Stacks blocks for the given reward cycle are seemingly downloaded and
|
|
/// processed. Do so by watching the _rate_ at which attachable Stacks blocks arrive and get
|
|
/// processed.
|
|
/// Returns whether or not we're still in the initial block download -- i.e. true if we're
|
|
/// still downloading burnchain blocks, or we haven't reached steady-state block-processing.
|
|
pub fn pox_sync_wait(
|
|
&mut self,
|
|
burnchain: &Burnchain,
|
|
burnchain_tip: &BurnchainTip, // this is the highest burnchain snapshot we've sync'ed to
|
|
burnchain_height_opt: Option<u64>, // this is the absolute burnchain block height, if known
|
|
num_sortitions_in_last_cycle: u64,
|
|
) -> Result<bool, burnchain_error> {
|
|
let burnchain_height = match burnchain_height_opt {
|
|
Some(bh) => bh,
|
|
None => {
|
|
// not known yet, so assume IBD
|
|
debug!("Pox watchdog: burnchain height not known yet, so assume IBD");
|
|
self.relayer_comms.set_ibd(true);
|
|
|
|
sleep_ms(self.steady_state_burnchain_sync_interval);
|
|
return Ok(true);
|
|
}
|
|
};
|
|
|
|
if self.watch_start_ts == 0 {
|
|
self.watch_start_ts = get_epoch_time_secs();
|
|
}
|
|
if self.steady_state_resync_ts == 0 {
|
|
self.steady_state_resync_ts =
|
|
get_epoch_time_secs() + self.steady_state_burnchain_sync_interval;
|
|
}
|
|
|
|
let ibbd = PoxSyncWatchdog::infer_initial_burnchain_block_download(
|
|
burnchain,
|
|
burnchain_tip.block_snapshot.block_height,
|
|
burnchain_height,
|
|
);
|
|
|
|
// unconditionally download the first reward cycle
|
|
if burnchain_tip.block_snapshot.block_height
|
|
< burnchain.first_block_height + (burnchain.pox_constants.reward_cycle_length as u64)
|
|
{
|
|
debug!("PoX watchdog in first reward cycle -- sync immediately");
|
|
self.relayer_comms.set_ibd(ibbd);
|
|
|
|
self.relayer_comms
|
|
.interruptable_sleep(self.steady_state_burnchain_sync_interval)?;
|
|
|
|
return Ok(ibbd);
|
|
}
|
|
|
|
if self.unconditionally_download {
|
|
debug!(
|
|
"PoX watchdog set to unconditionally download (ibd={})",
|
|
ibbd
|
|
);
|
|
self.relayer_comms.set_ibd(ibbd);
|
|
return Ok(ibbd);
|
|
}
|
|
|
|
let mut waited = false;
|
|
if ibbd {
|
|
// we are far behind the burnchain tip (i.e. not in the last reward cycle),
|
|
// so make sure the downloader knows about blocks it doesn't have yet so we can go and
|
|
// fetch its blocks before proceeding.
|
|
if num_sortitions_in_last_cycle > 0 {
|
|
debug!("PoX watchdog: Wait for at least one inventory state-machine pass...");
|
|
self.relayer_comms.wait_for_inv_sync_pass(SYNC_WAIT_SECS)?;
|
|
waited = true;
|
|
} else {
|
|
debug!("PoX watchdog: In initial block download, and no sortitions to consider in this reward cycle -- sync immediately");
|
|
self.relayer_comms.set_ibd(ibbd);
|
|
return Ok(ibbd);
|
|
}
|
|
} else {
|
|
debug!("PoX watchdog: not in initial burn block download, so not waiting for an inventory state-machine pass");
|
|
}
|
|
|
|
if burnchain_tip.block_snapshot.block_height
|
|
+ (burnchain.pox_constants.reward_cycle_length as u64)
|
|
>= burnchain_height
|
|
{
|
|
// unconditionally download if we're within the last reward cycle (after the poll timeout)
|
|
if !waited {
|
|
debug!(
|
|
"PoX watchdog in last reward cycle -- sync after {} seconds",
|
|
self.steady_state_burnchain_sync_interval
|
|
);
|
|
self.relayer_comms.set_ibd(ibbd);
|
|
|
|
self.relayer_comms
|
|
.interruptable_sleep(self.steady_state_burnchain_sync_interval)?;
|
|
} else {
|
|
debug!("PoX watchdog in last reward cycle -- sync immediately");
|
|
self.relayer_comms.set_ibd(ibbd);
|
|
}
|
|
return Ok(ibbd);
|
|
}
|
|
|
|
// have we reached steady-state behavior? i.e. have we stopped processing both burnchain
|
|
// and Stacks blocks?
|
|
let mut steady_state = false;
|
|
debug!("PoX watchdog: Wait until chainstate reaches steady-state block-processing...");
|
|
|
|
let ibbd = loop {
|
|
if !self.relayer_comms.should_keep_running() {
|
|
break false;
|
|
}
|
|
let ibbd = PoxSyncWatchdog::infer_initial_burnchain_block_download(
|
|
burnchain,
|
|
burnchain_tip.block_snapshot.block_height,
|
|
burnchain_height,
|
|
);
|
|
|
|
let expected_first_block_deadline =
|
|
self.watch_start_ts + (self.estimated_block_download_time as u64);
|
|
let expected_last_block_deadline = self.last_block_processed_ts
|
|
+ (self.estimated_block_download_time as u64)
|
|
+ (self.estimated_block_process_time as u64);
|
|
|
|
match (
|
|
self.count_attachable_stacks_blocks(),
|
|
self.count_processed_stacks_blocks(),
|
|
) {
|
|
(Ok(num_available), Ok(num_processed)) => {
|
|
self.new_attachable_blocks.push_back(num_available as i64);
|
|
self.new_processed_blocks.push_back(num_processed as i64);
|
|
|
|
if (self.new_attachable_blocks.len() as u64) > self.max_samples {
|
|
self.new_attachable_blocks.pop_front();
|
|
}
|
|
if (self.new_processed_blocks.len() as u64) > self.max_samples {
|
|
self.new_processed_blocks.pop_front();
|
|
}
|
|
|
|
if (self.new_attachable_blocks.len() as u64) < self.max_samples
|
|
|| (self.new_processed_blocks.len() as u64) < self.max_samples
|
|
{
|
|
// still getting initial samples
|
|
if self.new_processed_blocks.len() % 10 == 0 {
|
|
debug!(
|
|
"PoX watchdog: Still warming up: {} out of {} samples...",
|
|
&self.new_attachable_blocks.len(),
|
|
&self.max_samples
|
|
);
|
|
}
|
|
sleep_ms(PER_SAMPLE_WAIT_MS);
|
|
continue;
|
|
}
|
|
|
|
if self.watch_start_ts > 0
|
|
&& get_epoch_time_secs() < expected_first_block_deadline
|
|
{
|
|
// still waiting for that first block in this reward cycle
|
|
debug!("PoX watchdog: Still warming up: waiting until {}s for first Stacks block download (estimated download time: {}s)...", expected_first_block_deadline, self.estimated_block_download_time);
|
|
sleep_ms(PER_SAMPLE_WAIT_MS);
|
|
continue;
|
|
}
|
|
|
|
if self.watch_start_ts > 0
|
|
&& (self.new_attachable_blocks.len() as u64) < self.max_samples
|
|
&& self.watch_start_ts
|
|
+ self.max_samples
|
|
+ self.steady_state_burnchain_sync_interval
|
|
* (burnchain.stable_confirmations as u64)
|
|
< get_epoch_time_secs()
|
|
{
|
|
debug!(
|
|
"PoX watchdog: could not calculate {} samples in {} seconds. Assuming suspend/resume, or assuming load is too high.",
|
|
self.max_samples,
|
|
self.max_samples + self.steady_state_burnchain_sync_interval * (burnchain.stable_confirmations as u64)
|
|
);
|
|
self.reset(burnchain, burnchain_tip.block_snapshot.block_height);
|
|
|
|
self.watch_start_ts = get_epoch_time_secs();
|
|
self.steady_state_resync_ts =
|
|
get_epoch_time_secs() + self.steady_state_burnchain_sync_interval;
|
|
continue;
|
|
}
|
|
|
|
// take first derivative of samples -- see if the download and processing rate has gone to 0
|
|
let attachable_delta = PoxSyncWatchdog::derivative(&self.new_attachable_blocks);
|
|
let processed_delta = PoxSyncWatchdog::derivative(&self.new_processed_blocks);
|
|
|
|
let (flat_attachable, attachable_deviants) =
|
|
PoxSyncWatchdog::is_mostly_flat(&attachable_delta, 0);
|
|
let (flat_processed, processed_deviants) =
|
|
PoxSyncWatchdog::is_mostly_flat(&processed_delta, 0);
|
|
|
|
debug!("PoX watchdog: flat-attachable?: {}, flat-processed?: {}, estimated block-download time: {}s, estimated block-processing time: {}s",
|
|
flat_attachable, flat_processed, self.estimated_block_download_time, self.estimated_block_process_time);
|
|
|
|
if flat_attachable && flat_processed && self.last_block_processed_ts == 0 {
|
|
// we're flat-lining -- this may be the end of this cycle
|
|
self.last_block_processed_ts = get_epoch_time_secs();
|
|
}
|
|
|
|
if self.last_block_processed_ts > 0
|
|
&& get_epoch_time_secs() < expected_last_block_deadline
|
|
{
|
|
debug!("PoX watchdog: Still processing blocks; waiting until at least min({},{})s before burnchain synchronization (estimated block-processing time: {}s)",
|
|
get_epoch_time_secs() + 1, expected_last_block_deadline, self.estimated_block_process_time);
|
|
sleep_ms(PER_SAMPLE_WAIT_MS);
|
|
continue;
|
|
}
|
|
|
|
if ibbd {
|
|
// doing initial burnchain block download right now.
|
|
// only proceed to fetch the next reward cycle's burnchain blocks if we're neither downloading nor
|
|
// attaching blocks recently
|
|
debug!("PoX watchdog: In initial burnchain block download: flat-attachable = {}, flat-processed = {}, min-attachable: {}, min-processed: {}",
|
|
flat_attachable, flat_processed, &attachable_deviants, &processed_deviants);
|
|
|
|
if !flat_attachable || !flat_processed {
|
|
sleep_ms(PER_SAMPLE_WAIT_MS);
|
|
continue;
|
|
}
|
|
} else {
|
|
let now = get_epoch_time_secs();
|
|
if now < self.steady_state_resync_ts {
|
|
// steady state
|
|
if !steady_state {
|
|
debug!("PoX watchdog: In steady-state; waiting until at least {} before burnchain synchronization", self.steady_state_resync_ts);
|
|
steady_state = flat_attachable && flat_processed;
|
|
}
|
|
sleep_ms(PER_SAMPLE_WAIT_MS);
|
|
continue;
|
|
} else {
|
|
// steady state
|
|
if !steady_state {
|
|
debug!("PoX watchdog: In steady-state, but ready burnchain synchronization as of {}", self.steady_state_resync_ts);
|
|
steady_state = flat_attachable && flat_processed;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
(err_attach, err_processed) => {
|
|
// can only happen on DB query failure
|
|
error!("PoX watchdog: Failed to count recently attached ('{:?}') and/or processed ('{:?}') staging blocks", &err_attach, &err_processed);
|
|
panic!();
|
|
}
|
|
};
|
|
|
|
if ibbd || !steady_state {
|
|
debug!("PoX watchdog: Wait for at least one downloader state-machine pass before resetting...");
|
|
self.relayer_comms.wait_for_download_pass(SYNC_WAIT_SECS)?;
|
|
} else {
|
|
debug!("PoX watchdog: in steady-state, so not waiting for download pass");
|
|
}
|
|
|
|
self.reset(burnchain, burnchain_tip.block_snapshot.block_height);
|
|
break ibbd;
|
|
};
|
|
|
|
let ret = ibbd || !steady_state;
|
|
self.relayer_comms.set_ibd(ret);
|
|
Ok(ret)
|
|
}
|
|
}
|