fix: handle PoX reorg fallout by refreshing wanted tenures; make it so that we try to get the tenure-end block directly after 1 second (since after all, the same peer already served us the tenure-start block and is potentially *more* reliable than the other peer we're asking for a separate tenure-start block); fix typos

This commit is contained in:
Jude Nelson
2024-03-15 17:29:16 -04:00
parent 77de080547
commit 39cd964ccd

View File

@@ -26,10 +26,10 @@
//!
//! # Design
//!
//! The state machine has three layers: a top-level state machine for obtaining managing all of
//! The state machine has three layers: a top-level state machine for managing all of
//! the requisite state for identifying tenures to download, a pair of low-level state machines for
//! fetching individual tenures, and a middle layer for using the tenure data to drive the low-level
//! state machiens to fetch the requisite tenures.
//! state machines to fetch the requisite tenures.
//!
//! The three-layer design is meant to provide a degree of encapsulation of each downloader
//! concern. Because downloading tenures is a multi-step process, we encapsulate the steps to
@@ -38,7 +38,7 @@
//! we have a middle layer for scheduling tenures to peers for download. This middle layer manages
//! the lifecycles of the lower layer state machines. The top layer is needed to interface the
//! middle layer to the chainstate and the rest of the p2p network, and as such, handles the
//! bookkpeeing so that the lower layers can operate without needing access to this
//! bookkeeping so that the lower layers can operate without needing access to this
//! otherwise-unrelated concern.
//!
//! ## NakamotoDownloadStateMachine
@@ -64,7 +64,7 @@
//!
//! * The ongoing and prior reward cycle's sortitions' tenure IDs and winning block hashes
//! (implemented as lists of `WantedTenure`s)
//! * Which sortitions corresponds to tenure start and end blocks (implemented as a table of
//! * Which sortitions correspond to tenure start and end blocks (implemented as a table of
//! `TenureStartEnd`s)
//! * Which neighbors can serve which full tenures
//! * What order to request tenures in
@@ -74,7 +74,7 @@
//! ## `NakamotoTenureDownloadSet`
//!
//! Naturally, the `NakamotoDownloadStateMachine` contains two code paths -- one for each mode.
//! To facilitate confirmeed tenure downloads, it has a second-layer state machine called
//! To facilitate confirmed tenure downloads, it has a second-layer state machine called
//! the `NakamotoTenureDownloadSet`. This is responsible for identifying and issuing requests to
//! peers which can serve complete tenures, and keeping track of whether or not the current reward
//! cycle has any remaining tenures to download. To facilitate unconfirmed tenure downloads (which
@@ -84,7 +84,7 @@
//! This middle layer consumes the data mantained by the `NakamotoDownloaderStateMachine` in order
//! to instantiate, drive, and clean up one or more per-tenure download state machines.
//!
//! ## NakamotoTenureDownloader and `NakamotoUnconfirmedTenureDownloader`
//! ## `NakamotoTenureDownloader` and `NakamotoUnconfirmedTenureDownloader`
//!
//! Per SIP-021, obtaining a confirmed tenure is a multi-step process. To carry this out, this
//! module contains two third-level state machines: `NakamotoTenureDownloader`, which downloads a
@@ -176,8 +176,11 @@ pub(crate) enum NakamotoTenureDownloadState {
/// ongoing tenure, then a NakamotoTenureDownloader will be instantiated with this tenure-end-block
/// already known. This step will be skipped because the end-block is already present in the
/// state machine.
WaitForTenureEndBlock(StacksBlockId),
/// Gettin the tenure-end block directly. This only happens for tenures whose end-blocks
///
/// * if the deadline (second parameter) is exceeded, the state machine transitions to
/// GetTenureEndBlock.
WaitForTenureEndBlock(StacksBlockId, u64),
/// Getting the tenure-end block directly. This only happens for tenures whose end-blocks
/// cannot be provided by tenure downloaders within the same reward cycle.
GetTenureEndBlock(StacksBlockId),
/// Receiving tenure blocks
@@ -186,6 +189,8 @@ pub(crate) enum NakamotoTenureDownloadState {
Done,
}
pub const WAIT_FOR_TENURE_END_BLOCK_TIMEOUT: u64 = 1;
impl fmt::Display for NakamotoTenureDownloadState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
@@ -199,7 +204,7 @@ impl fmt::Display for NakamotoTenureDownloadState {
/// This state machine works as follows:
///
/// 1. Fetch the first block in the given tenure
/// 2. Obtain the last block in te given tenure, via one of the following means:
/// 2. Obtain the last block in the given tenure, via one of the following means:
/// a. Another NakamotoTenureDownloader's tenure-start block happens to be the end-block of this
/// machine's tenure, and can be copied into this machine.
/// b. This machine is configured to directly fetch the end-block. This only happens if this
@@ -221,7 +226,7 @@ impl fmt::Display for NakamotoTenureDownloadState {
pub(crate) struct NakamotoTenureDownloader {
/// Consensus hash that identifies this tenure
pub tenure_id_consensus_hash: ConsensusHash,
/// Stacks block ID of the tenure-start block. Learend from the inventory state machine and
/// Stacks block ID of the tenure-start block. Learned from the inventory state machine and
/// sortition DB.
pub tenure_start_block_id: StacksBlockId,
/// Stacks block ID of the last block in this tenure (this will be the tenure-start block ID
@@ -290,7 +295,7 @@ impl NakamotoTenureDownloader {
}
/// Is this downloader waiting for the tenure-end block data from some other downloader? Per
/// the sturct documentation, this is case 2(a).
/// the struct documentation, this is case 2(a).
pub fn is_waiting(&self) -> bool {
if let NakamotoTenureDownloadState::WaitForTenureEndBlock(..) = self.state {
return true;
@@ -351,16 +356,19 @@ impl NakamotoTenureDownloader {
tenure_end_block.block_id(),
&self.tenure_id_consensus_hash
);
self.state =
NakamotoTenureDownloadState::WaitForTenureEndBlock(tenure_end_block.block_id());
self.state = NakamotoTenureDownloadState::WaitForTenureEndBlock(
tenure_end_block.block_id(),
get_epoch_time_secs() + WAIT_FOR_TENURE_END_BLOCK_TIMEOUT,
);
self.try_accept_tenure_end_block(&tenure_end_block)?;
} else {
// need to get tenure_end_header. By default, assume that another
// NakamotoTenureDownlaoder will provide this block, and allow the
// NakamotoTenureDownloaderSet instance that maanges a collection of these
// NakamotoTenureDownloader will provide this block, and allow the
// NakamotoTenureDownloaderSet instance that manages a collection of these
// state-machines make the call to require this one to fetch the block directly.
self.state = NakamotoTenureDownloadState::WaitForTenureEndBlock(
self.tenure_end_block_id.clone(),
get_epoch_time_secs() + WAIT_FOR_TENURE_END_BLOCK_TIMEOUT,
);
}
Ok(())
@@ -377,13 +385,35 @@ impl NakamotoTenureDownloader {
/// inventory vectors for this tenure's reward cycle, this state-transition must be driven
/// after this machine's instantiation.
pub fn transition_to_fetch_end_block(&mut self) -> Result<(), NetError> {
let NakamotoTenureDownloadState::WaitForTenureEndBlock(end_block_id) = self.state else {
let NakamotoTenureDownloadState::WaitForTenureEndBlock(end_block_id, ..) = self.state
else {
return Err(NetError::InvalidState);
};
test_debug!(
"Transition downloader to {} to directly fetch tenure-end block {} (direct transition)",
&self.naddr,
&end_block_id
);
self.state = NakamotoTenureDownloadState::GetTenureEndBlock(end_block_id);
Ok(())
}
/// Transition to fetching the tenure-end block directly if waiting has taken too long.
pub fn try_transition_to_fetch_end_block(&mut self) {
if let NakamotoTenureDownloadState::WaitForTenureEndBlock(end_block_id, wait_deadline) =
self.state
{
if get_epoch_time_secs() < wait_deadline {
test_debug!(
"Transition downloader to {} to directly fetch tenure-end block {} (timed out)",
&self.naddr,
&end_block_id
);
self.state = NakamotoTenureDownloadState::GetTenureEndBlock(end_block_id);
}
}
}
/// Validate and accept a tenure-end block. If accepted, then advance the state.
/// Once accepted, this function extracts the tenure-change transaction and block header from
/// this block (it does not need the entire block).
@@ -396,7 +426,7 @@ impl NakamotoTenureDownloader {
) -> Result<(), NetError> {
if !matches!(
&self.state,
NakamotoTenureDownloadState::WaitForTenureEndBlock(_)
NakamotoTenureDownloadState::WaitForTenureEndBlock(..)
) && !matches!(
&self.state,
NakamotoTenureDownloadState::GetTenureEndBlock(_)
@@ -484,7 +514,7 @@ impl NakamotoTenureDownloader {
}
/// Add downloaded tenure blocks to this machine.
/// If we have collected all tenure blocks, then return them and trasition to the Done state.
/// If we have collected all tenure blocks, then return them and transition to the Done state.
///
/// Returns Ok(Some([blocks])) if we got all the blocks in this tenure. The blocks will be in
/// ascending order by height, and will include the tenure-start block but exclude the
@@ -621,9 +651,13 @@ impl NakamotoTenureDownloader {
test_debug!("Request tenure-start block {}", &start_block_id);
StacksHttpRequest::new_get_nakamoto_block(peerhost, start_block_id.clone())
}
NakamotoTenureDownloadState::WaitForTenureEndBlock(_block_id) => {
NakamotoTenureDownloadState::WaitForTenureEndBlock(_block_id, _deadline) => {
// we're waiting for some other downloader's block-fetch to complete
test_debug!("Waiting for tenure-end block {}", &_block_id);
test_debug!(
"Waiting for tenure-end block {} until {}",
&_block_id,
_deadline
);
return Ok(None);
}
NakamotoTenureDownloadState::GetTenureEndBlock(end_block_id) => {
@@ -654,10 +688,10 @@ impl NakamotoTenureDownloader {
&mut self,
network: &mut PeerNetwork,
neighbor_rpc: &mut NeighborRPC,
) -> Result<Option<bool>, NetError> {
) -> Result<bool, NetError> {
if neighbor_rpc.has_inflight(&self.naddr) {
test_debug!("Peer {} has an inflight request", &self.naddr);
return Ok(Some(true));
return Ok(true);
}
if neighbor_rpc.is_dead_or_broken(network, &self.naddr) {
return Err(NetError::PeerNotConnected);
@@ -672,16 +706,16 @@ impl NakamotoTenureDownloader {
let request = match self.make_next_download_request(peerhost) {
Ok(Some(request)) => request,
Ok(None) => {
return Ok(Some(true));
return Ok(true);
}
Err(_) => {
return Ok(Some(false));
return Ok(false);
}
};
neighbor_rpc.send_request(network, self.naddr.clone(), request)?;
self.idle = false;
Ok(Some(true))
Ok(true)
}
/// Handle a received StacksHttpResponse and advance the state machine.
@@ -1801,7 +1835,7 @@ impl NakamotoTenureDownloaderSet {
}
/// Assign the given peer to the given downloader state machine. Allocate a slot for it if
/// need.
/// needed.
fn add_downloader(&mut self, naddr: NeighborAddress, downloader: NakamotoTenureDownloader) {
test_debug!(
"Add downloader for tenure {} driven by {}",
@@ -1835,6 +1869,24 @@ impl NakamotoTenureDownloaderSet {
self.downloaders[index] = None;
}
/// How many downloaders are there?
pub fn num_downloaders(&self) -> usize {
self.downloaders
.iter()
.fold(0, |acc, dl| if dl.is_some() { acc + 1 } else { acc })
}
/// How many downloaders are there, which are scheduled?
pub fn num_scheduled_downloaders(&self) -> usize {
let mut cnt = 0;
for (_, idx) in self.peers.iter() {
if let Some(Some(_)) = self.downloaders.get(*idx) {
cnt += 1;
}
}
cnt
}
/// Add a sequence of (address, downloader) pairs to this downloader set.
pub(crate) fn add_downloaders(
&mut self,
@@ -2007,7 +2059,7 @@ impl NakamotoTenureDownloaderSet {
let Some(downloader) = downloader_opt else {
continue;
};
let NakamotoTenureDownloadState::WaitForTenureEndBlock(end_block_id) =
let NakamotoTenureDownloadState::WaitForTenureEndBlock(end_block_id, ..) =
&downloader.state
else {
continue;
@@ -2056,6 +2108,13 @@ impl NakamotoTenureDownloaderSet {
&mut self,
tenure_block_ids: &HashMap<NeighborAddress, AvailableTenures>,
) {
for downloader_opt in self.downloaders.iter_mut() {
let Some(downloader) = downloader_opt.as_mut() else {
continue;
};
downloader.try_transition_to_fetch_end_block();
}
// find tenures in which we need to fetch the tenure-end block directly.
let mut last_available_tenures: HashSet<StacksBlockId> = HashSet::new();
for (_, all_available) in tenure_block_ids.iter() {
@@ -2109,6 +2168,12 @@ impl NakamotoTenureDownloaderSet {
test_debug!("available: {:?}", &available);
test_debug!("tenure_block_ids: {:?}", &tenure_block_ids);
test_debug!("inflight: {}", self.inflight());
test_debug!(
"count: {}, running: {}, scheduled: {}",
count,
self.num_downloaders(),
self.num_scheduled_downloaders()
);
self.clear_available_peers();
self.clear_finished_downloaders();
@@ -2217,7 +2282,7 @@ impl NakamotoTenureDownloaderSet {
network: &mut PeerNetwork,
neighbor_rpc: &mut NeighborRPC,
) -> HashMap<ConsensusHash, Vec<NakamotoBlock>> {
let addrs: Vec<_> = self.peers.keys().map(|addr| addr.clone()).collect();
let addrs: Vec<_> = self.peers.keys().cloned().collect();
let mut finished = vec![];
let mut finished_tenures = vec![];
let mut new_blocks = HashMap::new();
@@ -2244,19 +2309,14 @@ impl NakamotoTenureDownloaderSet {
&downloader.tenure_id_consensus_hash,
&downloader.state
);
let Ok(sent_opt) = downloader.send_next_download_request(network, neighbor_rpc) else {
let Ok(sent) = downloader.send_next_download_request(network, neighbor_rpc) else {
test_debug!("Downloader for {} failed; this peer is dead", &naddr);
neighbor_rpc.add_dead(network, naddr);
continue;
};
if let Some(sent) = sent_opt {
if !sent {
// this downloader is dead or broken
finished.push(naddr.clone());
continue;
}
} else {
// this downloader is blocked
if !sent {
// this downloader is dead or broken
finished.push(naddr.clone());
continue;
}
}
@@ -2268,15 +2328,13 @@ impl NakamotoTenureDownloaderSet {
self.clear_downloader(&naddr);
}
}
for done_naddr in finished.iter() {
for done_naddr in finished.drain(..) {
test_debug!("Remove finished downloader for {}", &done_naddr);
self.clear_downloader(&done_naddr);
}
for done_tenure in finished_tenures.iter() {
self.completed_tenures.insert(done_tenure.clone());
for done_tenure in finished_tenures.drain(..) {
self.completed_tenures.insert(done_tenure);
}
finished.clear();
finished_tenures.clear();
// handle responses
for (naddr, response) in neighbor_rpc.collect_replies(network) {
@@ -2320,12 +2378,12 @@ impl NakamotoTenureDownloaderSet {
self.clear_downloader(naddr);
}
}
for done_naddr in finished.iter() {
for done_naddr in finished.drain(..) {
test_debug!("Remove finished downloader for {}", &done_naddr);
self.clear_downloader(done_naddr);
self.clear_downloader(&done_naddr);
}
for done_tenure in finished_tenures.iter() {
self.completed_tenures.insert(done_tenure.clone());
for done_tenure in finished_tenures.drain(..) {
self.completed_tenures.insert(done_tenure);
}
new_blocks
@@ -2702,7 +2760,7 @@ impl NakamotoDownloadStateMachine {
/// data. These lists are extended in three possible ways, depending on the sortition tip:
///
/// * If the sortition tip is in the same reward cycle that the block downloader is tracking,
/// then any newly-available sortitions are loaded via `load_wnated_tenures_at_tip()` and appended
/// then any newly-available sortitions are loaded via `load_wanted_tenures_at_tip()` and appended
/// to `self.wanted_tenures`. This is what happens most of the time in steady-state.
///
/// * Otherwise, if the sortition tip is different (i.e. ahead) of the block downloader's
@@ -2753,7 +2811,13 @@ impl NakamotoDownloadStateMachine {
let can_advance_wanted_tenures =
if let Some(prev_wanted_tenures) = self.prev_wanted_tenures.as_ref() {
!Self::have_unprocessed_tenures(
sortdb.pox_constants.block_height_to_reward_cycle(sortdb.first_block_height, self.nakamoto_start_height).expect("FATAL: first nakamoto block from before system start"),
sortdb
.pox_constants
.block_height_to_reward_cycle(
sortdb.first_block_height,
self.nakamoto_start_height,
)
.expect("FATAL: first nakamoto block from before system start"),
&self.tenure_downloads.completed_tenures,
prev_wanted_tenures,
&self.tenure_block_ids,
@@ -2826,6 +2890,15 @@ impl NakamotoDownloadStateMachine {
sort_tip: &BlockSnapshot,
sortdb: &SortitionDB,
) -> Result<(), NetError> {
// check for reorgs
let reorg = PeerNetwork::is_reorg(self.last_sort_tip.as_ref(), sort_tip, sortdb);
if reorg {
// force a reload
test_debug!("Detected reorg! Refreshing wanted tenures");
self.prev_wanted_tenures = None;
self.wanted_tenures.clear();
}
if self
.prev_wanted_tenures
.as_ref()
@@ -2919,16 +2992,14 @@ impl NakamotoDownloadStateMachine {
if prev_wanted_rc < first_nakamoto_rc {
// assume the epoch 2.x inventory has this
has_prev_inv = true;
}
else if inv.tenures_inv.get(&prev_wanted_rc).is_some() {
} else if inv.tenures_inv.get(&prev_wanted_rc).is_some() {
has_prev_inv = true;
}
if cur_wanted_rc < first_nakamoto_rc {
// assume the epoch 2.x inventory has this
has_cur_inv = true;
}
else if inv.tenures_inv.get(&cur_wanted_rc).is_some() {
} else if inv.tenures_inv.get(&cur_wanted_rc).is_some() {
has_cur_inv = true;
}
}
@@ -2958,7 +3029,9 @@ impl NakamotoDownloadStateMachine {
}
}
if (prev_wanted_rc >= first_nakamoto_rc && !has_prev_rc_block) || (cur_wanted_rc >= first_nakamoto_rc && !has_cur_rc_block) {
if (prev_wanted_rc >= first_nakamoto_rc && !has_prev_rc_block)
|| (cur_wanted_rc >= first_nakamoto_rc && !has_cur_rc_block)
{
test_debug!(
"tenure_block_ids stale: missing representation in reward cycles {} ({}) and {} ({})",
prev_wanted_rc,
@@ -3070,7 +3143,13 @@ impl NakamotoDownloadStateMachine {
let can_advance_wanted_tenures =
if let Some(prev_wanted_tenures) = self.prev_wanted_tenures.as_ref() {
!Self::have_unprocessed_tenures(
sortdb.pox_constants.block_height_to_reward_cycle(self.nakamoto_start_height, sortdb.first_block_height).expect("FATAL: nakamoto starts before system start"),
sortdb
.pox_constants
.block_height_to_reward_cycle(
self.nakamoto_start_height,
sortdb.first_block_height,
)
.expect("FATAL: nakamoto starts before system start"),
&self.tenure_downloads.completed_tenures,
prev_wanted_tenures,
&self.tenure_block_ids,
@@ -3487,7 +3566,9 @@ impl NakamotoDownloadStateMachine {
// there are still confirmed tenures we have to go and get
if Self::have_unprocessed_tenures(
pox_constants.block_height_to_reward_cycle(first_burn_height, nakamoto_start_block).expect("FATAL: nakamoto starts before system start"),
pox_constants
.block_height_to_reward_cycle(first_burn_height, nakamoto_start_block)
.expect("FATAL: nakamoto starts before system start"),
completed_tenures,
prev_wanted_tenures,
tenure_block_ids,
@@ -3710,10 +3791,9 @@ impl NakamotoDownloadStateMachine {
downloaders.remove(naddr);
}
}
for done_naddr in finished.iter() {
downloaders.remove(done_naddr);
for done_naddr in finished.drain(..) {
downloaders.remove(&done_naddr);
}
finished.clear();
// handle responses
for (naddr, response) in neighbor_rpc.collect_replies(network) {
@@ -4026,7 +4106,10 @@ impl NakamotoDownloadStateMachine {
return HashMap::new();
};
debug!("tenure_downloads.is_empty: {}", self.tenure_downloads.is_empty());
debug!(
"tenure_downloads.is_empty: {}",
self.tenure_downloads.is_empty()
);
if self.tenure_downloads.is_empty()
&& Self::need_unconfirmed_tenures(
self.nakamoto_start_height,