diff --git a/stackslib/src/net/download/nakamoto.rs b/stackslib/src/net/download/nakamoto.rs index e6a363420..f53292a36 100644 --- a/stackslib/src/net/download/nakamoto.rs +++ b/stackslib/src/net/download/nakamoto.rs @@ -26,10 +26,10 @@ //! //! # Design //! -//! The state machine has three layers: a top-level state machine for obtaining managing all of +//! The state machine has three layers: a top-level state machine for managing all of //! the requisite state for identifying tenures to download, a pair of low-level state machines for //! fetching individual tenures, and a middle layer for using the tenure data to drive the low-level -//! state machiens to fetch the requisite tenures. +//! state machines to fetch the requisite tenures. //! //! The three-layer design is meant to provide a degree of encapsulation of each downloader //! concern. Because downloading tenures is a multi-step process, we encapsulate the steps to @@ -38,7 +38,7 @@ //! we have a middle layer for scheduling tenures to peers for download. This middle layer manages //! the lifecycles of the lower layer state machines. The top layer is needed to interface the //! middle layer to the chainstate and the rest of the p2p network, and as such, handles the -//! bookkpeeing so that the lower layers can operate without needing access to this +//! bookkeeping so that the lower layers can operate without needing access to this //! otherwise-unrelated concern. //! //! ## NakamotoDownloadStateMachine @@ -64,7 +64,7 @@ //! //! * The ongoing and prior reward cycle's sortitions' tenure IDs and winning block hashes //! (implemented as lists of `WantedTenure`s) -//! * Which sortitions corresponds to tenure start and end blocks (implemented as a table of +//! * Which sortitions correspond to tenure start and end blocks (implemented as a table of //! `TenureStartEnd`s) //! * Which neighbors can serve which full tenures //! * What order to request tenures in @@ -74,7 +74,7 @@ //! ## `NakamotoTenureDownloadSet` //! //! Naturally, the `NakamotoDownloadStateMachine` contains two code paths -- one for each mode. -//! To facilitate confirmeed tenure downloads, it has a second-layer state machine called +//! To facilitate confirmed tenure downloads, it has a second-layer state machine called //! the `NakamotoTenureDownloadSet`. This is responsible for identifying and issuing requests to //! peers which can serve complete tenures, and keeping track of whether or not the current reward //! cycle has any remaining tenures to download. To facilitate unconfirmed tenure downloads (which @@ -84,7 +84,7 @@ //! This middle layer consumes the data mantained by the `NakamotoDownloaderStateMachine` in order //! to instantiate, drive, and clean up one or more per-tenure download state machines. //! -//! ## NakamotoTenureDownloader and `NakamotoUnconfirmedTenureDownloader` +//! ## `NakamotoTenureDownloader` and `NakamotoUnconfirmedTenureDownloader` //! //! Per SIP-021, obtaining a confirmed tenure is a multi-step process. To carry this out, this //! module contains two third-level state machines: `NakamotoTenureDownloader`, which downloads a @@ -176,8 +176,11 @@ pub(crate) enum NakamotoTenureDownloadState { /// ongoing tenure, then a NakamotoTenureDownloader will be instantiated with this tenure-end-block /// already known. This step will be skipped because the end-block is already present in the /// state machine. - WaitForTenureEndBlock(StacksBlockId), - /// Gettin the tenure-end block directly. This only happens for tenures whose end-blocks + /// + /// * if the deadline (second parameter) is exceeded, the state machine transitions to + /// GetTenureEndBlock. + WaitForTenureEndBlock(StacksBlockId, u64), + /// Getting the tenure-end block directly. This only happens for tenures whose end-blocks /// cannot be provided by tenure downloaders within the same reward cycle. GetTenureEndBlock(StacksBlockId), /// Receiving tenure blocks @@ -186,6 +189,8 @@ pub(crate) enum NakamotoTenureDownloadState { Done, } +pub const WAIT_FOR_TENURE_END_BLOCK_TIMEOUT: u64 = 1; + impl fmt::Display for NakamotoTenureDownloadState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self) @@ -199,7 +204,7 @@ impl fmt::Display for NakamotoTenureDownloadState { /// This state machine works as follows: /// /// 1. Fetch the first block in the given tenure -/// 2. Obtain the last block in te given tenure, via one of the following means: +/// 2. Obtain the last block in the given tenure, via one of the following means: /// a. Another NakamotoTenureDownloader's tenure-start block happens to be the end-block of this /// machine's tenure, and can be copied into this machine. /// b. This machine is configured to directly fetch the end-block. This only happens if this @@ -221,7 +226,7 @@ impl fmt::Display for NakamotoTenureDownloadState { pub(crate) struct NakamotoTenureDownloader { /// Consensus hash that identifies this tenure pub tenure_id_consensus_hash: ConsensusHash, - /// Stacks block ID of the tenure-start block. Learend from the inventory state machine and + /// Stacks block ID of the tenure-start block. Learned from the inventory state machine and /// sortition DB. pub tenure_start_block_id: StacksBlockId, /// Stacks block ID of the last block in this tenure (this will be the tenure-start block ID @@ -290,7 +295,7 @@ impl NakamotoTenureDownloader { } /// Is this downloader waiting for the tenure-end block data from some other downloader? Per - /// the sturct documentation, this is case 2(a). + /// the struct documentation, this is case 2(a). pub fn is_waiting(&self) -> bool { if let NakamotoTenureDownloadState::WaitForTenureEndBlock(..) = self.state { return true; @@ -351,16 +356,19 @@ impl NakamotoTenureDownloader { tenure_end_block.block_id(), &self.tenure_id_consensus_hash ); - self.state = - NakamotoTenureDownloadState::WaitForTenureEndBlock(tenure_end_block.block_id()); + self.state = NakamotoTenureDownloadState::WaitForTenureEndBlock( + tenure_end_block.block_id(), + get_epoch_time_secs() + WAIT_FOR_TENURE_END_BLOCK_TIMEOUT, + ); self.try_accept_tenure_end_block(&tenure_end_block)?; } else { // need to get tenure_end_header. By default, assume that another - // NakamotoTenureDownlaoder will provide this block, and allow the - // NakamotoTenureDownloaderSet instance that maanges a collection of these + // NakamotoTenureDownloader will provide this block, and allow the + // NakamotoTenureDownloaderSet instance that manages a collection of these // state-machines make the call to require this one to fetch the block directly. self.state = NakamotoTenureDownloadState::WaitForTenureEndBlock( self.tenure_end_block_id.clone(), + get_epoch_time_secs() + WAIT_FOR_TENURE_END_BLOCK_TIMEOUT, ); } Ok(()) @@ -377,13 +385,35 @@ impl NakamotoTenureDownloader { /// inventory vectors for this tenure's reward cycle, this state-transition must be driven /// after this machine's instantiation. pub fn transition_to_fetch_end_block(&mut self) -> Result<(), NetError> { - let NakamotoTenureDownloadState::WaitForTenureEndBlock(end_block_id) = self.state else { + let NakamotoTenureDownloadState::WaitForTenureEndBlock(end_block_id, ..) = self.state + else { return Err(NetError::InvalidState); }; + test_debug!( + "Transition downloader to {} to directly fetch tenure-end block {} (direct transition)", + &self.naddr, + &end_block_id + ); self.state = NakamotoTenureDownloadState::GetTenureEndBlock(end_block_id); Ok(()) } + /// Transition to fetching the tenure-end block directly if waiting has taken too long. + pub fn try_transition_to_fetch_end_block(&mut self) { + if let NakamotoTenureDownloadState::WaitForTenureEndBlock(end_block_id, wait_deadline) = + self.state + { + if get_epoch_time_secs() < wait_deadline { + test_debug!( + "Transition downloader to {} to directly fetch tenure-end block {} (timed out)", + &self.naddr, + &end_block_id + ); + self.state = NakamotoTenureDownloadState::GetTenureEndBlock(end_block_id); + } + } + } + /// Validate and accept a tenure-end block. If accepted, then advance the state. /// Once accepted, this function extracts the tenure-change transaction and block header from /// this block (it does not need the entire block). @@ -396,7 +426,7 @@ impl NakamotoTenureDownloader { ) -> Result<(), NetError> { if !matches!( &self.state, - NakamotoTenureDownloadState::WaitForTenureEndBlock(_) + NakamotoTenureDownloadState::WaitForTenureEndBlock(..) ) && !matches!( &self.state, NakamotoTenureDownloadState::GetTenureEndBlock(_) @@ -484,7 +514,7 @@ impl NakamotoTenureDownloader { } /// Add downloaded tenure blocks to this machine. - /// If we have collected all tenure blocks, then return them and trasition to the Done state. + /// If we have collected all tenure blocks, then return them and transition to the Done state. /// /// Returns Ok(Some([blocks])) if we got all the blocks in this tenure. The blocks will be in /// ascending order by height, and will include the tenure-start block but exclude the @@ -621,9 +651,13 @@ impl NakamotoTenureDownloader { test_debug!("Request tenure-start block {}", &start_block_id); StacksHttpRequest::new_get_nakamoto_block(peerhost, start_block_id.clone()) } - NakamotoTenureDownloadState::WaitForTenureEndBlock(_block_id) => { + NakamotoTenureDownloadState::WaitForTenureEndBlock(_block_id, _deadline) => { // we're waiting for some other downloader's block-fetch to complete - test_debug!("Waiting for tenure-end block {}", &_block_id); + test_debug!( + "Waiting for tenure-end block {} until {}", + &_block_id, + _deadline + ); return Ok(None); } NakamotoTenureDownloadState::GetTenureEndBlock(end_block_id) => { @@ -654,10 +688,10 @@ impl NakamotoTenureDownloader { &mut self, network: &mut PeerNetwork, neighbor_rpc: &mut NeighborRPC, - ) -> Result, NetError> { + ) -> Result { if neighbor_rpc.has_inflight(&self.naddr) { test_debug!("Peer {} has an inflight request", &self.naddr); - return Ok(Some(true)); + return Ok(true); } if neighbor_rpc.is_dead_or_broken(network, &self.naddr) { return Err(NetError::PeerNotConnected); @@ -672,16 +706,16 @@ impl NakamotoTenureDownloader { let request = match self.make_next_download_request(peerhost) { Ok(Some(request)) => request, Ok(None) => { - return Ok(Some(true)); + return Ok(true); } Err(_) => { - return Ok(Some(false)); + return Ok(false); } }; neighbor_rpc.send_request(network, self.naddr.clone(), request)?; self.idle = false; - Ok(Some(true)) + Ok(true) } /// Handle a received StacksHttpResponse and advance the state machine. @@ -1801,7 +1835,7 @@ impl NakamotoTenureDownloaderSet { } /// Assign the given peer to the given downloader state machine. Allocate a slot for it if - /// need. + /// needed. fn add_downloader(&mut self, naddr: NeighborAddress, downloader: NakamotoTenureDownloader) { test_debug!( "Add downloader for tenure {} driven by {}", @@ -1835,6 +1869,24 @@ impl NakamotoTenureDownloaderSet { self.downloaders[index] = None; } + /// How many downloaders are there? + pub fn num_downloaders(&self) -> usize { + self.downloaders + .iter() + .fold(0, |acc, dl| if dl.is_some() { acc + 1 } else { acc }) + } + + /// How many downloaders are there, which are scheduled? + pub fn num_scheduled_downloaders(&self) -> usize { + let mut cnt = 0; + for (_, idx) in self.peers.iter() { + if let Some(Some(_)) = self.downloaders.get(*idx) { + cnt += 1; + } + } + cnt + } + /// Add a sequence of (address, downloader) pairs to this downloader set. pub(crate) fn add_downloaders( &mut self, @@ -2007,7 +2059,7 @@ impl NakamotoTenureDownloaderSet { let Some(downloader) = downloader_opt else { continue; }; - let NakamotoTenureDownloadState::WaitForTenureEndBlock(end_block_id) = + let NakamotoTenureDownloadState::WaitForTenureEndBlock(end_block_id, ..) = &downloader.state else { continue; @@ -2056,6 +2108,13 @@ impl NakamotoTenureDownloaderSet { &mut self, tenure_block_ids: &HashMap, ) { + for downloader_opt in self.downloaders.iter_mut() { + let Some(downloader) = downloader_opt.as_mut() else { + continue; + }; + downloader.try_transition_to_fetch_end_block(); + } + // find tenures in which we need to fetch the tenure-end block directly. let mut last_available_tenures: HashSet = HashSet::new(); for (_, all_available) in tenure_block_ids.iter() { @@ -2109,6 +2168,12 @@ impl NakamotoTenureDownloaderSet { test_debug!("available: {:?}", &available); test_debug!("tenure_block_ids: {:?}", &tenure_block_ids); test_debug!("inflight: {}", self.inflight()); + test_debug!( + "count: {}, running: {}, scheduled: {}", + count, + self.num_downloaders(), + self.num_scheduled_downloaders() + ); self.clear_available_peers(); self.clear_finished_downloaders(); @@ -2217,7 +2282,7 @@ impl NakamotoTenureDownloaderSet { network: &mut PeerNetwork, neighbor_rpc: &mut NeighborRPC, ) -> HashMap> { - let addrs: Vec<_> = self.peers.keys().map(|addr| addr.clone()).collect(); + let addrs: Vec<_> = self.peers.keys().cloned().collect(); let mut finished = vec![]; let mut finished_tenures = vec![]; let mut new_blocks = HashMap::new(); @@ -2244,19 +2309,14 @@ impl NakamotoTenureDownloaderSet { &downloader.tenure_id_consensus_hash, &downloader.state ); - let Ok(sent_opt) = downloader.send_next_download_request(network, neighbor_rpc) else { + let Ok(sent) = downloader.send_next_download_request(network, neighbor_rpc) else { test_debug!("Downloader for {} failed; this peer is dead", &naddr); neighbor_rpc.add_dead(network, naddr); continue; }; - if let Some(sent) = sent_opt { - if !sent { - // this downloader is dead or broken - finished.push(naddr.clone()); - continue; - } - } else { - // this downloader is blocked + if !sent { + // this downloader is dead or broken + finished.push(naddr.clone()); continue; } } @@ -2268,15 +2328,13 @@ impl NakamotoTenureDownloaderSet { self.clear_downloader(&naddr); } } - for done_naddr in finished.iter() { + for done_naddr in finished.drain(..) { test_debug!("Remove finished downloader for {}", &done_naddr); self.clear_downloader(&done_naddr); } - for done_tenure in finished_tenures.iter() { - self.completed_tenures.insert(done_tenure.clone()); + for done_tenure in finished_tenures.drain(..) { + self.completed_tenures.insert(done_tenure); } - finished.clear(); - finished_tenures.clear(); // handle responses for (naddr, response) in neighbor_rpc.collect_replies(network) { @@ -2320,12 +2378,12 @@ impl NakamotoTenureDownloaderSet { self.clear_downloader(naddr); } } - for done_naddr in finished.iter() { + for done_naddr in finished.drain(..) { test_debug!("Remove finished downloader for {}", &done_naddr); - self.clear_downloader(done_naddr); + self.clear_downloader(&done_naddr); } - for done_tenure in finished_tenures.iter() { - self.completed_tenures.insert(done_tenure.clone()); + for done_tenure in finished_tenures.drain(..) { + self.completed_tenures.insert(done_tenure); } new_blocks @@ -2702,7 +2760,7 @@ impl NakamotoDownloadStateMachine { /// data. These lists are extended in three possible ways, depending on the sortition tip: /// /// * If the sortition tip is in the same reward cycle that the block downloader is tracking, - /// then any newly-available sortitions are loaded via `load_wnated_tenures_at_tip()` and appended + /// then any newly-available sortitions are loaded via `load_wanted_tenures_at_tip()` and appended /// to `self.wanted_tenures`. This is what happens most of the time in steady-state. /// /// * Otherwise, if the sortition tip is different (i.e. ahead) of the block downloader's @@ -2753,7 +2811,13 @@ impl NakamotoDownloadStateMachine { let can_advance_wanted_tenures = if let Some(prev_wanted_tenures) = self.prev_wanted_tenures.as_ref() { !Self::have_unprocessed_tenures( - sortdb.pox_constants.block_height_to_reward_cycle(sortdb.first_block_height, self.nakamoto_start_height).expect("FATAL: first nakamoto block from before system start"), + sortdb + .pox_constants + .block_height_to_reward_cycle( + sortdb.first_block_height, + self.nakamoto_start_height, + ) + .expect("FATAL: first nakamoto block from before system start"), &self.tenure_downloads.completed_tenures, prev_wanted_tenures, &self.tenure_block_ids, @@ -2826,6 +2890,15 @@ impl NakamotoDownloadStateMachine { sort_tip: &BlockSnapshot, sortdb: &SortitionDB, ) -> Result<(), NetError> { + // check for reorgs + let reorg = PeerNetwork::is_reorg(self.last_sort_tip.as_ref(), sort_tip, sortdb); + if reorg { + // force a reload + test_debug!("Detected reorg! Refreshing wanted tenures"); + self.prev_wanted_tenures = None; + self.wanted_tenures.clear(); + } + if self .prev_wanted_tenures .as_ref() @@ -2919,16 +2992,14 @@ impl NakamotoDownloadStateMachine { if prev_wanted_rc < first_nakamoto_rc { // assume the epoch 2.x inventory has this has_prev_inv = true; - } - else if inv.tenures_inv.get(&prev_wanted_rc).is_some() { + } else if inv.tenures_inv.get(&prev_wanted_rc).is_some() { has_prev_inv = true; } if cur_wanted_rc < first_nakamoto_rc { // assume the epoch 2.x inventory has this has_cur_inv = true; - } - else if inv.tenures_inv.get(&cur_wanted_rc).is_some() { + } else if inv.tenures_inv.get(&cur_wanted_rc).is_some() { has_cur_inv = true; } } @@ -2958,7 +3029,9 @@ impl NakamotoDownloadStateMachine { } } - if (prev_wanted_rc >= first_nakamoto_rc && !has_prev_rc_block) || (cur_wanted_rc >= first_nakamoto_rc && !has_cur_rc_block) { + if (prev_wanted_rc >= first_nakamoto_rc && !has_prev_rc_block) + || (cur_wanted_rc >= first_nakamoto_rc && !has_cur_rc_block) + { test_debug!( "tenure_block_ids stale: missing representation in reward cycles {} ({}) and {} ({})", prev_wanted_rc, @@ -3070,7 +3143,13 @@ impl NakamotoDownloadStateMachine { let can_advance_wanted_tenures = if let Some(prev_wanted_tenures) = self.prev_wanted_tenures.as_ref() { !Self::have_unprocessed_tenures( - sortdb.pox_constants.block_height_to_reward_cycle(self.nakamoto_start_height, sortdb.first_block_height).expect("FATAL: nakamoto starts before system start"), + sortdb + .pox_constants + .block_height_to_reward_cycle( + self.nakamoto_start_height, + sortdb.first_block_height, + ) + .expect("FATAL: nakamoto starts before system start"), &self.tenure_downloads.completed_tenures, prev_wanted_tenures, &self.tenure_block_ids, @@ -3487,7 +3566,9 @@ impl NakamotoDownloadStateMachine { // there are still confirmed tenures we have to go and get if Self::have_unprocessed_tenures( - pox_constants.block_height_to_reward_cycle(first_burn_height, nakamoto_start_block).expect("FATAL: nakamoto starts before system start"), + pox_constants + .block_height_to_reward_cycle(first_burn_height, nakamoto_start_block) + .expect("FATAL: nakamoto starts before system start"), completed_tenures, prev_wanted_tenures, tenure_block_ids, @@ -3710,10 +3791,9 @@ impl NakamotoDownloadStateMachine { downloaders.remove(naddr); } } - for done_naddr in finished.iter() { - downloaders.remove(done_naddr); + for done_naddr in finished.drain(..) { + downloaders.remove(&done_naddr); } - finished.clear(); // handle responses for (naddr, response) in neighbor_rpc.collect_replies(network) { @@ -4026,7 +4106,10 @@ impl NakamotoDownloadStateMachine { return HashMap::new(); }; - debug!("tenure_downloads.is_empty: {}", self.tenure_downloads.is_empty()); + debug!( + "tenure_downloads.is_empty: {}", + self.tenure_downloads.is_empty() + ); if self.tenure_downloads.is_empty() && Self::need_unconfirmed_tenures( self.nakamoto_start_height,