fix: wait for one inv and download pass before running tenure, and when we win tenure, broadcast the block _before_ processing it. Also, because blocks can get mined out-of-order relative to burnchain blocks, advance the miner tip only if the tip is higher in the stacks chain

This commit is contained in:
Jude Nelson
2022-10-10 15:38:24 -04:00
parent c19ad0f70b
commit b3b6525713

View File

@@ -93,7 +93,10 @@ enum MinerThreadResult {
Secp256k1PrivateKey,
Option<OngoingBlockCommit>,
),
Microblock(Result<Option<StacksMicroblock>, NetError>, MinerTip),
Microblock(
Result<Option<(StacksMicroblock, ExecutionCost)>, NetError>,
MinerTip,
),
}
/// Fully-assembled Stacks anchored, block as well as some extra metadata pertaining to how it was
@@ -128,8 +131,6 @@ pub enum RelayerDirective {
RunTenure(RegisteredKey, BlockSnapshot, u128), // (vrf key, chain tip, time of issuance in ms)
/// Try to register a VRF public key
RegisterKey(BlockSnapshot),
/// Try to mine a microblock
RunMicroblockTenure(BlockSnapshot, u128), // time of issuance in ms
/// Stop the relayer thread
Exit,
}
@@ -164,14 +165,26 @@ pub struct MinerTip {
block_hash: BlockHeaderHash,
/// Microblock private key to use to sign microblocks
microblock_privkey: Secp256k1PrivateKey,
/// Stacks height
stacks_height: u64,
/// burnchain height
burn_height: u64,
}
impl MinerTip {
pub fn new(ch: ConsensusHash, bh: BlockHeaderHash, pk: Secp256k1PrivateKey) -> MinerTip {
pub fn new(
ch: ConsensusHash,
bh: BlockHeaderHash,
pk: Secp256k1PrivateKey,
stacks_height: u64,
burn_height: u64,
) -> MinerTip {
MinerTip {
consensus_hash: ch,
block_hash: bh,
microblock_privkey: pk,
stacks_height,
burn_height,
}
}
}
@@ -343,6 +356,8 @@ enum Error {
SnapshotNotFoundForChainTip,
/// The burnchain tip changed while this operation was in progress
BurnchainTipChanged,
/// The coordinator channel closed
CoordinatorClosed,
}
/// Metadata required for beginning a new tenure
@@ -404,13 +419,22 @@ pub struct RelayerThread {
last_tenure_issue_time: u128,
/// last observed burnchain block height from the p2p thread (obtained from network results)
last_network_block_height: u64,
/// time at which we observed a change in the network block height (epoch time in millis)
last_network_block_height_ts: u128,
/// last observed number of downloader state-machine passes from the p2p thread (obtained from
/// network results)
last_network_download_passes: u64,
/// last observed number of inventory state-machine passes from the p2p thread (obtained from
/// network results)
last_network_inv_passes: u64,
/// minimum number of downloader state-machine passes that must take place before mining (this
/// is used to ensure that the p2p thread attempts to download new Stacks block data before
/// this thread tries to mine a block)
min_network_download_passes: u64,
/// minimum number of inventory state-machine passes that must take place before mining (this
/// is used to ensure that the p2p thread attempts to download new Stacks block data before
/// this thread tries to mine a block)
min_network_inv_passes: u64,
/// consensus hash of the last sortition we saw, even if we weren't the winner
last_tenure_consensus_hash: Option<ConsensusHash>,
@@ -418,6 +442,10 @@ pub struct RelayerThread {
miner_tip: Option<MinerTip>,
/// last time we mined a microblock, in millis
last_microblock_tenure_time: u128,
/// when should we run the next microblock tenure, in millis
microblock_deadline: u128,
/// cost of the last-produced microblock stream
microblock_stream_cost: ExecutionCost,
/// Inner relayer instance for forwarding broadcasted data back to the p2p thread for dispatch
/// to neighbors
@@ -465,9 +493,9 @@ struct MicroblockMinerThread {
mempool: Option<MemPoolDB>,
/// Handle to the node's event dispatcher
event_dispatcher: EventDispatcher,
/// Stacks block's sortition's consensus hash
/// Parent Stacks block's sortition's consensus hash
parent_consensus_hash: ConsensusHash,
/// Stacks block's hash
/// Parent Stacks block's hash
parent_block_hash: BlockHeaderHash,
/// Microblock signing key
miner_key: Secp256k1PrivateKey,
@@ -550,6 +578,7 @@ impl MicroblockMinerThread {
consensus_hash: ch,
block_hash: bhh,
microblock_privkey: miner_key,
..
} = miner_tip;
debug!(
@@ -561,12 +590,18 @@ impl MicroblockMinerThread {
match StacksChainState::get_anchored_block_header_info(chainstate.db(), &ch, &bhh) {
Ok(Some(_)) => {
let parent_index_hash = StacksBlockHeader::make_index_block_hash(&ch, &bhh);
let cost_so_far = StacksChainState::get_stacks_block_anchored_cost(
chainstate.db(),
&parent_index_hash,
)
.expect("FATAL: failed to get anchored block cost")
.expect("FATAL: no anchored block cost stored for processed anchored block");
let cost_so_far = if relayer_thread.microblock_stream_cost == ExecutionCost::zero()
{
// unknown cost, or this is idempotent.
StacksChainState::get_stacks_block_anchored_cost(
chainstate.db(),
&parent_index_hash,
)
.expect("FATAL: failed to get anchored block cost")
.expect("FATAL: no anchored block cost stored for processed anchored block")
} else {
relayer_thread.microblock_stream_cost.clone()
};
let frequency = config.node.microblock_frequency;
let settings =
@@ -823,7 +858,7 @@ impl MicroblockMinerThread {
sortdb: &SortitionDB,
chainstate: &mut StacksChainState,
mem_pool: &mut MemPoolDB,
) -> Result<Option<StacksMicroblock>, NetError> {
) -> Result<Option<(StacksMicroblock, ExecutionCost)>, NetError> {
if !self.can_mine_on_tip(&self.parent_consensus_hash, &self.parent_block_hash) {
// not configured to mine on this tip
return Ok(None);
@@ -838,7 +873,7 @@ impl MicroblockMinerThread {
return Ok(None);
}
let mut next_microblock = None;
let mut next_microblock_and_runtime = None;
// opportunistically try and mine, but only if there are no attachable blocks in
// recent history (i.e. in the last 10 minutes)
@@ -851,7 +886,7 @@ impl MicroblockMinerThread {
match self.inner_mine_one_microblock(sortdb, chainstate, mem_pool) {
Ok(microblock) => {
// will need to relay this
next_microblock = Some(microblock);
next_microblock_and_runtime = Some((microblock, self.cost_so_far.clone()));
}
Err(ChainstateError::NoTransactionsToMine) => {
info!("Will keep polling mempool for transactions to include in a microblock");
@@ -866,7 +901,7 @@ impl MicroblockMinerThread {
self.last_mined = get_epoch_time_ms();
Ok(next_microblock)
Ok(next_microblock_and_runtime)
}
/// Try to mine one microblock, given the current chain tip and access to the chain state DBs.
@@ -880,7 +915,7 @@ impl MicroblockMinerThread {
pub fn try_mine_microblock(
&mut self,
cur_tip: MinerTip,
) -> Result<Option<StacksMicroblock>, NetError> {
) -> Result<Option<(StacksMicroblock, ExecutionCost)>, NetError> {
self.with_chainstate(|mblock_miner, sortdb, chainstate, mempool| {
mblock_miner.inner_try_mine_microblock(cur_tip, sortdb, chainstate, mempool)
})
@@ -1728,12 +1763,17 @@ impl RelayerThread {
last_tenure_issue_time: 0,
last_network_block_height: 0,
last_network_block_height_ts: 0,
last_network_download_passes: 0,
min_network_download_passes: 0,
last_network_inv_passes: 0,
min_network_inv_passes: 0,
last_tenure_consensus_hash: None,
miner_tip: None,
last_microblock_tenure_time: 0,
microblock_deadline: 0,
microblock_stream_cost: ExecutionCost::zero(),
relayer,
@@ -1782,6 +1822,34 @@ impl RelayerThread {
res
}
/// have we waited for the right conditions under which to start mining a block off of our
/// chain tip?
pub fn has_waited_for_latest_blocks(&self) -> bool {
// a network download pass took place
(self.min_network_download_passes <= self.last_network_download_passes
// a network inv pass took place
&& self.min_network_download_passes <= self.last_network_download_passes)
// we waited long enough for a download pass, but timed out waiting
|| self.last_network_block_height_ts + (self.config.node.wait_time_for_blocks as u128) < get_epoch_time_ms()
// we're not supposed to wait at all
|| !self.config.miner.wait_for_block_download
}
/// Return debug string for waiting for latest blocks
pub fn debug_waited_for_latest_blocks(&self) -> String {
format!(
"({} <= {} && {} <= {}) || {} + {} < {} || {}",
self.min_network_download_passes,
self.last_network_download_passes,
self.min_network_inv_passes,
self.last_network_inv_passes,
self.last_network_block_height_ts,
self.config.node.wait_time_for_blocks,
get_epoch_time_ms(),
self.config.miner.wait_for_block_download
)
}
/// Handle a NetworkResult from the p2p/http state machine. Usually this is the act of
/// * preprocessing and storing new blocks and microblocks
/// * relaying blocks, microblocks, and transacctions
@@ -1801,10 +1869,11 @@ impl RelayerThread {
}
if self.last_network_block_height != net_result.burn_height {
// burnchain advanced; disable mining until we also do a download pass
// burnchain advanced; disable mining until we also do a download pass.
self.last_network_block_height = net_result.burn_height;
self.min_network_download_passes = net_result.num_download_passes + 1;
self.min_network_inv_passes = net_result.num_inv_sync_passes + 1;
self.last_network_block_height_ts = get_epoch_time_ms();
debug!(
"Relayer: block mining until the next download pass {}",
self.min_network_download_passes
@@ -1862,9 +1931,8 @@ impl RelayerThread {
// resume mining if we blocked it, and if we've done the requisite download
// passes
self.last_network_download_passes = net_result.num_download_passes;
if self.min_network_download_passes <= self.last_network_download_passes
|| !self.config.miner.wait_for_block_download
{
self.last_network_inv_passes = net_result.num_inv_sync_passes;
if self.has_waited_for_latest_blocks() {
debug!("Relayer: did a download pass, so unblocking mining");
signal_mining_ready(self.globals.get_miner_status());
}
@@ -1872,15 +1940,15 @@ impl RelayerThread {
/// Process the block and microblocks from a sortition that we won.
/// At this point, we're modifying the chainstate, and merging the artifacts from the previous tenure.
/// Blocks until the given stacks block is processed
/// Blocks until the given stacks block is processed.
/// Returns true if we accepted this block as new.
/// Returns false if we already processed this block.
fn accept_winning_tenure(
&mut self,
anchored_block: &StacksBlock,
consensus_hash: &ConsensusHash,
parent_consensus_hash: &ConsensusHash,
) -> Result<bool, ChainstateError> {
let stacks_blocks_processed = self.globals.coord_comms.get_stacks_blocks_processed();
if StacksChainState::has_stored_block(
self.chainstate_ref().db(),
&self.chainstate_ref().blocks_path,
@@ -1888,7 +1956,7 @@ impl RelayerThread {
&anchored_block.block_hash(),
)? {
// already processed my tenure
return Ok(true);
return Ok(false);
}
let burn_height =
SortitionDB::get_block_snapshot_consensus(self.sortdb_ref().conn(), consensus_hash)
@@ -1975,20 +2043,54 @@ impl RelayerThread {
)
})?;
Ok(true)
}
/// Process a new block we mined
/// Return true if we processed it
/// Return false if we timed out waiting for it
/// Return Err(..) if we couldn't reach the chains coordiantor thread
fn process_new_block(&self) -> Result<bool, Error> {
// process the block
if !self.globals.coord_comms.announce_new_stacks_block() {
return Ok(false);
return Err(Error::CoordinatorClosed);
}
let stacks_blocks_processed = self.globals.coord_comms.get_stacks_blocks_processed();
if !self
.globals
.coord_comms
.wait_for_stacks_blocks_processed(stacks_blocks_processed, u64::MAX)
{
// basically unreachable
warn!("ChainsCoordinator timed out while waiting for new stacks block to be processed");
return Ok(false);
}
Ok(true)
}
/// Given the two miner tips, return the newer tip.
fn pick_higher_tip(cur: Option<MinerTip>, new: Option<MinerTip>) -> Option<MinerTip> {
match (cur, new) {
(Some(cur), None) => Some(cur),
(None, Some(new)) => Some(new),
(None, None) => None,
(Some(cur), Some(new)) => {
if cur.stacks_height < new.stacks_height {
Some(new)
} else if cur.stacks_height > new.stacks_height {
Some(cur)
} else if cur.burn_height < new.burn_height {
Some(new)
} else if cur.burn_height > new.burn_height {
Some(cur)
} else {
assert_eq!(cur, new);
Some(cur)
}
}
}
}
/// Given the pointer to a recently-discovered tenure, see if we won the sortition and if so,
/// store it, preprocess it, and forward it to our neighbors. All the while, keep track of the
/// latest Stacks mining tip we have produced so far.
@@ -2006,7 +2108,7 @@ impl RelayerThread {
block_header_hash: BlockHeaderHash,
burn_hash: BurnchainHeaderHash,
) -> (bool, Option<MinerTip>) {
let miner_tip;
let mut miner_tip = None;
let sn =
SortitionDB::get_block_snapshot_consensus(self.sortdb_ref().conn(), &consensus_hash)
.expect("FATAL: failed to query sortition DB")
@@ -2040,13 +2142,15 @@ impl RelayerThread {
);
increment_stx_blocks_mined_counter();
match self.accept_winning_tenure(&mined_block, &consensus_hash, &parent_consensus_hash)
{
Ok(coordinator_running) => {
if !coordinator_running {
warn!("Coordinator stopped, stopping relayer thread...");
return (false, None);
}
let has_new_data = match self.accept_winning_tenure(
&mined_block,
&consensus_hash,
&parent_consensus_hash,
) {
Ok(accepted) => accepted,
Err(ChainstateError::ChannelClosed(_)) => {
warn!("Coordinator stopped, stopping relayer thread...");
return (false, None);
}
Err(e) => {
warn!("Error processing my tenure, bad block produced: {}", e);
@@ -2089,10 +2193,11 @@ impl RelayerThread {
&consensus_hash,
&mined_block.block_hash()
);
miner_tip = None;
miner_tip = Self::pick_higher_tip(miner_tip, None);
} else {
let ch = snapshot.consensus_hash.clone();
let bh = mined_block.block_hash();
let height = mined_block.header.total_work.work;
if let Err(e) = self
.relayer
@@ -2102,14 +2207,31 @@ impl RelayerThread {
}
// proceed to mine microblocks
miner_tip = Some(MinerTip::new(ch, bh, microblock_privkey));
miner_tip = Self::pick_higher_tip(
miner_tip,
Some(MinerTip::new(
ch,
bh,
microblock_privkey,
height,
snapshot.block_height,
)),
);
}
if has_new_data {
// process the block, now that we've advertized it
if let Err(Error::CoordinatorClosed) = self.process_new_block() {
// coordiantor stopped
return (false, None);
}
}
} else {
debug!(
"Relayer: Did not win sortition in {}, winning block was {}/{}",
&burn_hash, &consensus_hash, &block_header_hash
);
miner_tip = None;
miner_tip = Self::pick_higher_tip(miner_tip, None);
}
(true, miner_tip)
@@ -2200,7 +2322,7 @@ impl RelayerThread {
// coordinator thread hang-up
return false;
}
miner_tip = new_miner_tip;
miner_tip = Self::pick_higher_tip(miner_tip, new_miner_tip);
// clear all blocks up to this consensus hash
let this_burn_tip = SortitionDB::get_block_snapshot_consensus(
@@ -2238,15 +2360,32 @@ impl RelayerThread {
// resume mining if we blocked it
if num_tenures > 0 {
self.mined_stacks_block = false;
signal_mining_ready(self.globals.get_miner_status());
}
// update state
self.miner_tip = miner_tip;
// update state for microblock mining
self.setup_microblock_mining_state(miner_tip);
true
}
/// Update the miner tip with a new tip. If it's changed, then clear out the microblock stream
/// cost since we won't be mining it anymore.
fn setup_microblock_mining_state(&mut self, new_miner_tip: Option<MinerTip>) {
// update state
let my_miner_tip = std::mem::replace(&mut self.miner_tip, None);
let best_tip = Self::pick_higher_tip(my_miner_tip.clone(), new_miner_tip.clone());
if best_tip == new_miner_tip && best_tip != my_miner_tip {
// tip has changed
debug!(
"Relayer: Best miner tip went from {:?} to {:?}",
&my_miner_tip, &new_miner_tip
);
self.microblock_stream_cost = ExecutionCost::zero();
}
self.miner_tip = best_tip;
}
/// Constructs and returns a LeaderKeyRegisterOp out of the provided params
fn inner_generate_leader_key_register_op(
address: StacksAddress,
@@ -2303,7 +2442,16 @@ impl RelayerThread {
ret
}
/// Create the block miner thread state
/// Create the block miner thread state.
/// Only proceeds if all of the following are true:
/// * the miner is not blocked
/// * last_burn_block corresponds to the canonical sortition DB's chain tip
/// * the time of issuance is sufficiently recent
/// * there are no unprocessed stacks blocks in the staging DB
/// * the relayer has already tried a download scan that included this sortition (which, if a
/// block was found, would have placed it into the staging DB and marked it as
/// unprocessed)
/// * a miner thread is not running already
fn create_block_miner(
&mut self,
registered_key: RegisteredKey,
@@ -2370,12 +2518,11 @@ impl RelayerThread {
}
}
if burn_chain_sn.block_height == self.last_network_block_height
&& (self.last_network_download_passes < self.min_network_download_passes
&& self.config.miner.wait_for_block_download)
if burn_chain_sn.block_height != self.last_network_block_height
|| !self.has_waited_for_latest_blocks()
{
debug!("Relayer: network has not had a chance to process in-flight blocks ({} == {} && {} < {})",
burn_chain_sn.block_height, self.last_network_block_height, self.last_network_download_passes, self.min_network_download_passes);
debug!("Relayer: network has not had a chance to process in-flight blocks ({} != {} || !({}))",
burn_chain_sn.block_height, self.last_network_block_height, self.debug_waited_for_latest_blocks());
return None;
}
@@ -2402,8 +2549,7 @@ impl RelayerThread {
debug!(
"Relayer: Spawn tenure thread";
"height" => last_burn_block.block_height,
"burn_header_hash" => %burn_chain_tip,
"last_burn_header_hash" => %burn_header_hash
"burn_header_hash" => %burn_header_hash,
);
let miner_thread_state =
@@ -2423,6 +2569,11 @@ impl RelayerThread {
return false;
}
if self.mined_stacks_block && self.config.node.mine_microblocks {
debug!("Relayer: mined a Stacks block already; waiting for microblock miner");
return false;
}
let mut miner_thread_state =
match self.create_block_miner(registered_key, last_burn_block, issue_timestamp_ms) {
Some(state) => state,
@@ -2445,35 +2596,52 @@ impl RelayerThread {
true
}
/// Start up a microblock miner thread if we can:
/// * no miner thread must be running already
/// * the miner must not be blocked
/// * we must have won the sortition on the stacks chain tip
/// Returns true if the thread was started; false if not.
pub fn microblock_miner_thread_try_start(
&mut self,
burnchain_tip: BlockSnapshot,
tenure_issue_ms: u128,
) -> bool {
/// See if we should run a microblock tenure now.
/// Return true if so; false if not
fn can_run_microblock_tenure(&mut self) -> bool {
if !self.config.node.mine_microblocks {
// node will not mine microblocks
// not enabled
test_debug!("Relayer: not configured to mine microblocks");
return false;
}
if self.last_microblock_tenure_time > tenure_issue_ms {
// stale request
if !self.miner_thread_try_join() {
// already running (for an anchored block or microblock)
test_debug!("Relayer: miner thread already running so cannot mine microblock");
return false;
}
if self.microblock_deadline > get_epoch_time_ms() {
debug!(
"Relayer: Too soon to start a microblock tenure ({} > {})",
self.microblock_deadline,
get_epoch_time_ms()
);
return false;
}
if self.miner_tip.is_none() {
debug!("Relayer: did not win last block, so cannot mine microblocks");
return false;
}
if !self.mined_stacks_block {
// have not tried to mine a stacks block yet that confirms previously-mined unconfirmed
// state (or have not tried to mine a new Stacks block yet for this active tenure);
debug!("Relayer: Did not mine a block yet, so will not mine a microblock");
return false;
}
if let Some(cur_sortition) = self.globals.get_last_sortition() {
if burnchain_tip.sortition_id != cur_sortition.sortition_id {
debug!("Relayer: Drop stale RunMicroblockTenure for {}/{}: current sortition is for {} ({})", &burnchain_tip.consensus_hash, &burnchain_tip.winning_stacks_block_hash, &cur_sortition.consensus_hash, &cur_sortition.burn_header_hash);
return false;
}
if self.globals.get_last_sortition().is_none() {
debug!("Relayer: no first sortition yet");
return false;
}
// go ahead
true
}
/// Start up a microblock miner thread if we can:
/// * no miner thread must be running already
/// * the miner must not be blocked
/// * we must have won the sortition on the stacks chain tip
/// Returns true if the thread was started; false if not.
pub fn microblock_miner_thread_try_start(&mut self) -> bool {
let miner_tip = match self.miner_tip.as_ref() {
Some(tip) => tip.clone(),
None => {
@@ -2481,8 +2649,23 @@ impl RelayerThread {
return false;
}
};
let burnchain_tip = match self.globals.get_last_sortition() {
Some(sn) => sn,
None => {
debug!("Relayer: no first sortition yet");
return false;
}
};
debug!(
"Relayer: mined Stacks block {}/{} so can mine microblocks",
&miner_tip.consensus_hash, &miner_tip.block_hash
);
if !self.miner_thread_try_join() {
// already running (for an anchored block or microblock)
debug!("Relayer: miner thread already running so cannot mine microblock");
return false;
}
if self
@@ -2528,7 +2711,10 @@ impl RelayerThread {
e
})
{
// thread started!
self.miner_thread = Some(miner_handle);
self.microblock_deadline =
get_epoch_time_ms() + (self.config.node.microblock_frequency as u128);
}
true
@@ -2596,7 +2782,7 @@ impl RelayerThread {
MinerThreadResult::Microblock(microblock_result, miner_tip) => {
// finished mining a microblock
match microblock_result {
Ok(Some(next_microblock)) => {
Ok(Some((next_microblock, new_cost))) => {
// apply it
let microblock_hash = next_microblock.block_hash();
@@ -2644,13 +2830,15 @@ impl RelayerThread {
);
}
self.last_microblock_tenure_time = get_epoch_time_ms();
self.microblock_stream_cost = new_cost;
// synchronise state
self.with_chainstate(
|relayer_thread, _sortdb, chainstate, _mempool| {
relayer_thread.globals.send_unconfirmed_txs(chainstate);
},
);
self.last_microblock_tenure_time = get_epoch_time_ms();
// have not yet mined a stacks block that confirms this microblock, so
// do that on the next run
@@ -2709,12 +2897,16 @@ impl RelayerThread {
);
true
}
RelayerDirective::RunMicroblockTenure(burnchain_tip, tenure_issue_ms) => {
self.microblock_miner_thread_try_start(burnchain_tip, tenure_issue_ms);
true
}
RelayerDirective::Exit => false,
};
if !continue_running {
return false;
}
// see if we need to run a microblock tenure
if self.can_run_microblock_tenure() {
self.microblock_miner_thread_try_start();
}
continue_running
}
}
@@ -2854,8 +3046,8 @@ pub struct PeerThread {
/// total number of download state-machine passes so far. Used to signal when to download the
/// next reward cycle of blocks.
num_download_passes: u64,
/// when should we queue up the next request to run a microblock tenure? (epoch time in millis)
mblock_deadline: u128,
/// last burnchain block seen in the PeerNetwork's chain view since the last run
last_burn_block_height: u64,
}
impl PeerThread {
@@ -2932,7 +3124,7 @@ impl PeerThread {
num_p2p_state_machine_passes: 0,
num_inv_sync_passes: 0,
num_download_passes: 0,
mblock_deadline: 0,
last_burn_block_height: 0,
}
}
@@ -3002,7 +3194,7 @@ impl PeerThread {
);
1
} else {
cmp::min(self.poll_timeout, self.config.node.microblock_frequency)
self.poll_timeout
};
let mut expected_attachments = match self.attachments_rx.try_recv() {
@@ -3057,6 +3249,7 @@ impl PeerThread {
match p2p_res {
Ok(network_result) => {
let mut have_update = false;
if self.num_p2p_state_machine_passes < network_result.num_state_machine_passes {
// p2p state-machine did a full pass. Notify anyone listening.
self.globals.sync_comms.notify_p2p_state_pass();
@@ -3067,31 +3260,30 @@ impl PeerThread {
// inv-sync state-machine did a full pass. Notify anyone listening.
self.globals.sync_comms.notify_inv_sync_pass();
self.num_inv_sync_passes = network_result.num_inv_sync_passes;
// the relayer cares about the number of inventory passes, so pass this along
have_update = true;
}
if self.num_download_passes < network_result.num_download_passes {
// download state-machine did a full pass. Notify anyone listening.
self.globals.sync_comms.notify_download_pass();
self.num_download_passes = network_result.num_download_passes;
// the relayer cares about the number of download passes, so pass this along
have_update = true;
}
if network_result.has_data_to_store() {
if network_result.has_data_to_store()
|| self.last_burn_block_height != network_result.burn_height
|| have_update
{
// pass along if we have blocks, microblocks, or transactions, or a status
// update on the network's view of the burnchain
self.last_burn_block_height = network_result.burn_height;
self.results_with_data
.push_back(RelayerDirective::HandleNetResult(network_result));
}
// only do this on the Ok() path, even if we're mining, because an error in
// network dispatching is likely due to resource exhaustion
if self.mblock_deadline < get_epoch_time_ms() && self.config.node.mine_microblocks {
debug!("P2P: schedule microblock tenure");
self.results_with_data
.push_back(RelayerDirective::RunMicroblockTenure(
self.get_network().burnchain_tip.clone(),
get_epoch_time_ms(),
));
self.mblock_deadline =
get_epoch_time_ms() + (self.config.node.microblock_frequency as u128);
}
}
Err(e) => {
// this is only reachable if the network is not instantiated correctly --
@@ -3105,14 +3297,13 @@ impl PeerThread {
// or a directive to mine microblocks
if let Err(e) = self.globals.relay_send.try_send(next_result) {
debug!(
"P2P: {:?}: download backpressure detected",
&self.get_network().local_peer
"P2P: {:?}: download backpressure detected (bufferred {})",
&self.get_network().local_peer,
self.results_with_data.len()
);
match e {
TrySendError::Full(directive) => {
if let RelayerDirective::RunMicroblockTenure(..) = directive {
// can drop this
} else if let RelayerDirective::RunTenure(..) = directive {
if let RelayerDirective::RunTenure(..) = directive {
// can drop this
} else {
// don't lose this data -- just try it again
@@ -3479,7 +3670,7 @@ impl StacksNode {
} else {
LeaderKeyRegistrationState::Inactive
};
let relayer_thread = RelayerThread::new(runloop, local_peer.clone(), relayer);
let relayer_thread_handle = thread::Builder::new()
.name(format!("relayer-{}", &local_peer.data_url))
@@ -3678,7 +3869,6 @@ impl StacksNode {
}
}
// no-op on UserBurnSupport ops are not supported / produced at this point.
self.globals.set_last_sortition(block_snapshot);
last_sortitioned_block.map(|x| x.0)
}