fix: combine coordinator block announcements into a bitvector so we can handle both announcements of new burn and stacks blocks in one coordinator pass (which is required for handling the case where we need to rewind sortitions, process stacks blocks, and then replay sortitions)

This commit is contained in:
Jude Nelson
2023-01-04 10:34:58 -05:00
parent 371b226cd3
commit aa66dedae8
3 changed files with 60 additions and 53 deletions

View File

@@ -89,34 +89,38 @@ pub struct CoordinatorReceivers {
/// for setting up the coordinator channels
pub struct CoordinatorCommunication;
#[repr(u8)]
pub enum CoordinatorEvents {
NEW_STACKS_BLOCK,
NEW_BURN_BLOCK,
STOP,
TIMEOUT,
NEW_STACKS_BLOCK = 0x01,
NEW_BURN_BLOCK = 0x02,
STOP = 0x04,
TIMEOUT = 0x08,
}
impl SignalBools {
fn activated_signal(&self) -> bool {
self.stop || self.new_stacks_block || self.new_burn_block
}
fn receive_signal(&mut self) -> CoordinatorEvents {
fn receive_signal(&mut self) -> u8 {
let mut bits = 0;
if self.stop {
return CoordinatorEvents::STOP;
} else if self.new_burn_block {
self.new_burn_block = false;
return CoordinatorEvents::NEW_BURN_BLOCK;
} else if self.new_stacks_block {
self.new_stacks_block = false;
return CoordinatorEvents::NEW_STACKS_BLOCK;
} else {
return CoordinatorEvents::TIMEOUT;
bits |= CoordinatorEvents::STOP as u8;
}
if self.new_burn_block {
bits |= CoordinatorEvents::NEW_BURN_BLOCK as u8;
}
if self.new_stacks_block {
bits |= CoordinatorEvents::NEW_STACKS_BLOCK as u8;
}
if bits == 0 {
bits = CoordinatorEvents::TIMEOUT as u8;
}
bits
}
}
impl CoordinatorReceivers {
pub fn wait_on(&self) -> CoordinatorEvents {
pub fn wait_on(&self) -> u8 {
let mut signal_bools = self.signal_bools.lock().unwrap();
if !signal_bools.activated_signal() {
signal_bools = self.signal_wakeup.wait(signal_bools).unwrap();
@@ -130,6 +134,7 @@ impl CoordinatorChannels {
let mut bools = self.signal_bools.lock().unwrap();
bools.new_stacks_block = true;
self.signal_wakeup.notify_all();
debug!("Announce new stacks block");
!bools.stop
}
@@ -137,6 +142,7 @@ impl CoordinatorChannels {
let mut bools = self.signal_bools.lock().unwrap();
bools.new_burn_block = true;
self.signal_wakeup.notify_all();
debug!("Announce new burn block");
!bools.stop
}
@@ -144,6 +150,7 @@ impl CoordinatorChannels {
let mut bools = self.signal_bools.lock().unwrap();
bools.stop = true;
self.signal_wakeup.notify_all();
debug!("Stop chains coordinator");
false
}

View File

@@ -360,50 +360,48 @@ impl<'a, T: BlockEventDispatcher, CE: CostEstimator + ?Sized, FE: FeeEstimator +
loop {
// timeout so that we handle Ctrl-C a little gracefully
match comms.wait_on() {
CoordinatorEvents::NEW_STACKS_BLOCK => {
signal_mining_blocked(miner_status.clone());
debug!("Received new stacks block notice");
match inst.handle_new_stacks_block() {
Ok(missing_block_opt) => {
if missing_block_opt.is_some() {
debug!(
"Missing affirmed anchor block: {:?}",
&missing_block_opt.as_ref().expect("unreachable")
);
}
}
Err(e) => {
warn!("Error processing new stacks block: {:?}", e);
let bits = comms.wait_on();
if (bits & (CoordinatorEvents::NEW_STACKS_BLOCK as u8)) != 0 {
signal_mining_blocked(miner_status.clone());
debug!("Received new stacks block notice");
match inst.handle_new_stacks_block() {
Ok(missing_block_opt) => {
if missing_block_opt.is_some() {
debug!(
"Missing affirmed anchor block: {:?}",
&missing_block_opt.as_ref().expect("unreachable")
);
}
}
Err(e) => {
warn!("Error processing new stacks block: {:?}", e);
}
}
signal_mining_ready(miner_status.clone());
}
CoordinatorEvents::NEW_BURN_BLOCK => {
signal_mining_blocked(miner_status.clone());
debug!("Received new burn block notice");
match inst.handle_new_burnchain_block() {
Ok(missing_block_opt) => {
if missing_block_opt.is_some() {
debug!(
"Missing canonical anchor block {}",
&missing_block_opt.clone().unwrap()
);
}
}
Err(e) => {
warn!("Error processing new burn block: {:?}", e);
signal_mining_ready(miner_status.clone());
}
if (bits & (CoordinatorEvents::NEW_BURN_BLOCK as u8)) != 0 {
signal_mining_blocked(miner_status.clone());
debug!("Received new burn block notice");
match inst.handle_new_burnchain_block() {
Ok(missing_block_opt) => {
if missing_block_opt.is_some() {
debug!(
"Missing canonical anchor block {}",
&missing_block_opt.clone().unwrap()
);
}
}
signal_mining_ready(miner_status.clone());
Err(e) => {
warn!("Error processing new burn block: {:?}", e);
}
}
CoordinatorEvents::STOP => {
signal_mining_blocked(miner_status.clone());
debug!("Received stop notice");
return;
}
CoordinatorEvents::TIMEOUT => {}
signal_mining_ready(miner_status.clone());
}
if (bits & (CoordinatorEvents::STOP as u8)) != 0 {
signal_mining_blocked(miner_status.clone());
debug!("Received stop notice");
return;
}
}
}

View File

@@ -808,6 +808,7 @@ impl RunLoop {
{
debug!("Drive burn block processing: possible PoX reorg (sortition tip: {}, heaviest: {}, {} <? {})", &sortition_tip_affirmation_map, &heaviest_affirmation_map, sn.block_height, highest_sn.block_height);
globals.coord().announce_new_burn_block();
globals.coord().announce_new_stacks_block();
} else if sortition_tip_affirmation_map.len() >= heaviest_affirmation_map.len()
&& sortition_tip_affirmation_map.len() <= canonical_affirmation_map.len()
{
@@ -818,6 +819,7 @@ impl RunLoop {
// we have unaffirmed PoX anchor blocks that are not yet processed in the sortition history
debug!("Drive burnchain processing: possible PoX reorg from unprocessed anchor block(s) (sortition tip: {}, heaviest: {}, canonical: {})", &sortition_tip_affirmation_map, &heaviest_affirmation_map, &canonical_affirmation_map);
globals.coord().announce_new_burn_block();
globals.coord().announce_new_stacks_block();
}
}
} else {