Merge pull request #252 from hirosystems/fix/respect-first-burn-ht

Fix: handful of resiliency fixes
This commit is contained in:
Brice Dobry
2023-04-18 12:45:50 -04:00
committed by GitHub
6 changed files with 61 additions and 38 deletions

View File

@@ -457,7 +457,7 @@ pub enum Error {
/// Thread channel error
ThreadChannelError,
/// Missing headers
MissingHeaders,
MissingHeaders(BurnchainHeaderHash),
/// Missing parent block
MissingParentBlock,
/// Remote burnchain peer has misbehaved
@@ -480,7 +480,7 @@ impl fmt::Display for Error {
Error::DBError(ref dbe) => fmt::Display::fmt(dbe, f),
Error::DownloadError(ref btce) => fmt::Display::fmt(btce, f),
Error::ParseError => write!(f, "Parse error"),
Error::MissingHeaders => write!(f, "Missing block headers"),
Error::MissingHeaders(ref bh) => write!(f, "Missing block headers for block {}", bh),
Error::MissingParentBlock => write!(f, "Missing parent block"),
Error::ThreadChannelError => write!(f, "Error in thread channel"),
Error::BurnchainPeerBroken => write!(f, "Remote burnchain peer has misbehaved"),
@@ -501,7 +501,7 @@ impl error::Error for Error {
Error::DBError(ref e) => Some(e),
Error::DownloadError(ref _e) => None,
Error::ParseError => None,
Error::MissingHeaders => None,
Error::MissingHeaders(_) => None,
Error::MissingParentBlock => None,
Error::ThreadChannelError => None,
Error::BurnchainPeerBroken => None,

View File

@@ -727,7 +727,7 @@ impl BlockDownloader {
return Ok(vec![]);
}
debug!("Begin headers load");
trace!("Begin headers load");
let begin_ts = get_epoch_time_ms();
let last_ancestor = SortitionDB::get_ancestor_snapshot(
&ic,
@@ -766,7 +766,7 @@ impl BlockDownloader {
);
}
let end_ts = get_epoch_time_ms();
debug!("End headers load ({} ms)", end_ts.saturating_sub(begin_ts));
trace!("End headers load ({} ms)", end_ts.saturating_sub(begin_ts));
// update cache
SortitionDB::merge_block_header_cache(header_cache, &local_blocks);

View File

@@ -1225,8 +1225,8 @@ impl PeerNetwork {
pub fn num_inventory_reward_cycles(&self) -> u64 {
let tip_block_height = self.burnchain_tip.block_height;
self.burnchain
.pox_constants
.num_sync_cycles_to_height(tip_block_height)
.block_height_to_reward_cycle(tip_block_height)
.unwrap_or(0)
}
/// Try to make a GetPoxInv request for the target reward cycle for this peer.
@@ -1459,7 +1459,7 @@ impl PeerNetwork {
Ok(s) => s,
Err(net_error::NotFoundError) => {
// we're not caught up
debug!("{:?}: Will not send GetBlocksInv to {:?}, since we are not caught up to block height {}", &self.local_peer, nk, target_block_height);
warn!("{:?}: Will not send GetBlocksInv to {:?}, since we are not caught up to block height {}", &self.local_peer, nk, target_block_height);
return Ok(None);
}
Err(e) => {
@@ -1487,7 +1487,7 @@ impl PeerNetwork {
)? {
0 => {
// cannot ask this peer for any blocks in this reward cycle
debug!("{:?}: no blocks available from {} at cycle {} (which starts at height {})", &self.local_peer, nk, target_block_reward_cycle, self.burnchain.reward_cycle_to_block_height(target_block_reward_cycle));
warn!("{:?}: no blocks available from {} at cycle {} (which starts at height {})", &self.local_peer, nk, target_block_reward_cycle, self.burnchain.reward_cycle_to_block_height(target_block_reward_cycle));
return Ok(None);
}
x => x,
@@ -1589,15 +1589,10 @@ impl PeerNetwork {
nk: &NeighborKey,
stats: &NeighborBlockStats,
) -> Result<Option<(u64, GetBlocksInv)>, net_error> {
if stats.block_reward_cycle <= stats.inv.num_reward_cycles {
self.make_getblocksinv(sortdb, nk, stats, stats.block_reward_cycle)
.and_then(|getblocksinv_opt| {
Ok(getblocksinv_opt
.map(|getblocksinv| (stats.block_reward_cycle, getblocksinv)))
})
} else {
Ok(None)
}
self.make_getblocksinv(sortdb, nk, stats, stats.block_reward_cycle)
.and_then(|getblocksinv_opt| {
Ok(getblocksinv_opt.map(|getblocksinv| (stats.block_reward_cycle, getblocksinv)))
})
}
/// Determine at which reward cycle to begin scanning inventories
@@ -1717,10 +1712,6 @@ impl PeerNetwork {
.burnchain
.reward_cycle_to_block_height(stats.target_block_reward_cycle);
debug!(
"{:?}: got blocksinv at reward cycle {} (block height {}) from {:?}: {:?}",
&self.local_peer, stats.target_block_reward_cycle, target_block_height, nk, &blocks_inv
);
let (new_blocks, new_microblocks) = stats.inv.merge_blocks_inv(
target_block_height,
blocks_inv.bitlen as u64,
@@ -1729,8 +1720,22 @@ impl PeerNetwork {
true,
);
debug!("{:?}: {:?} has {} new blocks and {} new microblocks (total {} blocks, {} microblocks, {} sortitions): {:?}",
&self.local_peer, &nk, new_blocks, new_microblocks, stats.inv.num_blocks(), stats.inv.num_microblock_streams(), stats.inv.num_sortitions, &stats.inv);
debug!(
"Received blocksinv";
"local_peer" => ?self.local_peer,
"neighbor" => ?nk,
"neighbor_block_reward_cycle" => stats.block_reward_cycle,
"target_block_reward_cycle" => stats.target_block_reward_cycle,
"target_burn_block_height" => target_block_height,
"new_blocks_count" => new_blocks,
"new_microblocks_count" => new_microblocks,
"total_blocks_count" => stats.inv.num_blocks(),
"total_microblocks_count" => stats.inv.num_microblock_streams(),
"total_sortitions_count" => stats.inv.num_sortitions,
"local_burn_height" => self.burnchain_tip.block_height,
"local_inventory_reward_cycles" => self.num_inventory_reward_cycles(),
"inventory" => ?stats.inv
);
if new_blocks > 0 || new_microblocks > 0 {
stats.learned_data = true;
@@ -1870,7 +1875,7 @@ impl PeerNetwork {
for (nk, stats) in inv_state.block_stats.iter_mut() {
debug!(
"{:?}: inv state-machine for {:?} is in state {:?}, at PoX {},target={}; blocks {},target={}; status {:?}, done={}",
"{:?}: inv state-machine for {:?} is in state {:?}, at PoX {},target={}; blocks {}, target={}; status {:?}, done={}",
&network.local_peer,
nk,
&stats.state,

View File

@@ -58,7 +58,7 @@ pub fn get_header_for_hash(
match row_option {
Some(row) => Ok(row),
None => Err(BurnchainError::MissingHeaders),
None => Err(BurnchainError::MissingHeaders(hash.clone())),
}
}
@@ -158,6 +158,13 @@ fn process_reorg(
old_tip: &BurnBlockIndexRow,
) -> Result<u64, BurnchainError> {
// Step 1: Set `is_canonical` to true for ancestors of the new tip.
info!(
"Processing Stacks (L1) chain reorg";
"old_tip_id" => %old_tip.header_hash,
"old_tip_height" => old_tip.height,
"new_tip_id" => %new_tip.header_hash,
"new_tip_height" => new_tip.height,
);
let mut up_cursor = BurnchainHeaderHash(new_tip.parent_header_hash());
let greatest_common_ancestor = loop {
let cursor_header = get_header_for_hash(&transaction, &up_cursor)?;
@@ -217,12 +224,20 @@ fn find_first_canonical_ancestor(
struct DBBurnBlockInputChannel {
/// Path to the db file underlying this logic.
output_db_path: String,
config: BurnchainConfig,
}
impl BurnchainChannel for DBBurnBlockInputChannel {
/// Add `new_block` to the `block_index` database.
fn push_block(&self, new_block: NewBlock) -> Result<(), BurnchainError> {
debug!("BurnchainChannel: try pushing; new_block {:?}", &new_block);
if self.config.first_burn_header_height > new_block.block_height {
debug!("BurnchainChannel skipping new_block event before first_burn_header_height";
"first_burn_height" => %self.config.first_burn_header_height,
"new_block_height" => new_block.block_height,
);
return Ok(());
}
debug!("BurnchainChannel: push_block"; "new_block_ht" => new_block.block_height, "new_block_id" => %new_block.index_block_hash);
// Re-open the connection.
let open_flags = OpenFlags::SQLITE_OPEN_READ_WRITE;
let mut connection = sqlite_open(&self.output_db_path, open_flags, true)?;
@@ -266,10 +281,13 @@ impl BurnchainChannel for DBBurnBlockInputChannel {
&block_string,
];
let transaction = connection.transaction()?;
transaction.execute(
if let Err(e) = transaction.execute(
"INSERT INTO block_index (height, header_hash, parent_header_hash, time_stamp, is_canonical, block) VALUES (?, ?, ?, ?, ?, ?)",
params,
)?;
) {
warn!("Failed to write block header to block index, probably a duplicate event"; "error" => ?e);
return Ok(())
}
// Possibly process re-org in the database representation.
if needs_reorg {
@@ -518,6 +536,7 @@ impl BurnchainIndexer for DBBurnchainIndexer {
fn get_channel(&self) -> Arc<(dyn BurnchainChannel + 'static)> {
Arc::new(DBBurnBlockInputChannel {
output_db_path: self.get_headers_path(),
config: self.config.clone(),
})
}

View File

@@ -672,7 +672,7 @@ impl BurnchainIndexer for MockIndexer {
fn get_headers_height(&self) -> Result<u64, BurnchainError> {
if self.blocks.len() == 0 {
Err(BurnchainError::MissingHeaders)
Err(BurnchainError::MissingHeaders(BurnchainHeaderHash([0; 32])))
} else {
Ok(self.minimum_recorded_height + (self.blocks.len() as u64) - 1)
}
@@ -724,7 +724,7 @@ impl BurnchainIndexer for MockIndexer {
end_block: u64,
) -> Result<Vec<MockHeader>, BurnchainError> {
if start_block < self.minimum_recorded_height {
return Err(BurnchainError::MissingHeaders);
return Err(BurnchainError::MissingHeaders(BurnchainHeaderHash([0; 32])));
}
if end_block < start_block {
return Err(BurnchainError::BurnchainPeerBroken);

View File

@@ -38,6 +38,7 @@ use crate::{BurnchainController, Config, EventDispatcher};
use super::RunLoopCallbacks;
use libc;
pub const STDERR: i32 = 2;
pub const SORTITION_PROCESS_INCREMENTS: u64 = 1000;
#[cfg(test)]
pub type RunLoopCounter = Arc<AtomicU64>;
@@ -537,13 +538,9 @@ impl RunLoop {
let mut burnchain_height = sortition_db_height;
let mut num_sortitions_in_last_cycle = 1;
// prepare to fetch the first reward cycle!
let mut target_burnchain_block_height = burnchain_config.reward_cycle_to_block_height(
burnchain_config
.block_height_to_reward_cycle(burnchain_height)
.expect("BUG: block height is not in a reward cycle")
+ 1,
);
// prepare to fetch the first set of sortitions
let mut target_burnchain_block_height =
(1 + (burnchain_height / SORTITION_PROCESS_INCREMENTS)) * SORTITION_PROCESS_INCREMENTS;
debug!(
"Begin main runloop starting a burnchain block {}",
@@ -684,6 +681,8 @@ impl RunLoop {
}
}
target_burnchain_block_height += SORTITION_PROCESS_INCREMENTS;
if sortition_db_height >= burnchain_height && !ibd {
let canonical_stacks_tip_height =
SortitionDB::get_canonical_burn_chain_tip(burnchain.sortdb_ref().conn())