mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-28 19:55:20 +08:00
make coord channel singleton more conventional, factor it away from the coordinator impl
This commit is contained in:
@@ -619,7 +619,7 @@ impl Burnchain {
|
||||
|
||||
let _blockstack_txs = burnchain_db.store_new_burnchain_block(&block)?;
|
||||
|
||||
CoordinatorCommunication::announce_burn_block();
|
||||
CoordinatorCommunication::announce_new_burn_block();
|
||||
|
||||
let header = block.header();
|
||||
|
||||
|
||||
271
src/chainstate/coordinator/comm.rs
Normal file
271
src/chainstate/coordinator/comm.rs
Normal file
@@ -0,0 +1,271 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::{
|
||||
thread, process
|
||||
};
|
||||
use std::time::{
|
||||
Duration, Instant
|
||||
};
|
||||
use std::sync::{
|
||||
Arc, RwLock,
|
||||
atomic::{Ordering, AtomicU64, AtomicBool}
|
||||
};
|
||||
|
||||
use crossbeam_channel::{select, bounded, Sender, Receiver, Select, TrySendError};
|
||||
|
||||
use core;
|
||||
use burnchains::{
|
||||
BurnchainHeaderHash, Error as BurnchainError,
|
||||
Burnchain, BurnchainBlockHeader,
|
||||
db::{
|
||||
BurnchainDB, BurnchainBlockData
|
||||
}
|
||||
};
|
||||
use chainstate::burn::{BlockHeaderHash, BlockSnapshot};
|
||||
use chainstate::burn::db::sortdb::{SortitionDB, PoxId, SortitionId};
|
||||
use chainstate::stacks::{
|
||||
StacksBlock, StacksBlockId, TransactionPayload,
|
||||
Error as ChainstateError, events::StacksTransactionReceipt,
|
||||
};
|
||||
use chainstate::stacks::db::{
|
||||
StacksHeaderInfo, StacksChainState, ClarityTx
|
||||
};
|
||||
use monitoring::{
|
||||
increment_stx_blocks_processed_counter,
|
||||
};
|
||||
use vm::{
|
||||
costs::ExecutionCost,
|
||||
types::PrincipalData
|
||||
};
|
||||
use util::db::{
|
||||
Error as DBError
|
||||
};
|
||||
|
||||
/// Trait for use by the ChainsCoordinator
|
||||
///
|
||||
pub trait CoordinatorNotices {
|
||||
fn notify_stacks_block_processed(&mut self);
|
||||
fn notify_sortition_processed(&mut self);
|
||||
}
|
||||
|
||||
pub struct ArcCounterCoordinatorNotices {
|
||||
pub stacks_blocks_processed: Arc<AtomicU64>,
|
||||
pub sortitions_processed: Arc<AtomicU64>
|
||||
}
|
||||
|
||||
impl CoordinatorNotices for () {
|
||||
fn notify_stacks_block_processed(&mut self) {}
|
||||
fn notify_sortition_processed(&mut self) {}
|
||||
}
|
||||
|
||||
impl CoordinatorNotices for ArcCounterCoordinatorNotices {
|
||||
fn notify_stacks_block_processed(&mut self) {
|
||||
self.stacks_blocks_processed.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
fn notify_sortition_processed(&mut self) {
|
||||
self.sortitions_processed.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
/// Structure used for communication _with_ a running
|
||||
/// ChainsCoordinator
|
||||
#[derive(Clone)]
|
||||
pub struct CoordinatorChannels {
|
||||
// ChainsCoordinator takes two kinds of signals:
|
||||
// new stacks block & new burn block
|
||||
// These signals can be coalesced -- the coordinator doesn't need
|
||||
// handles _all_ new blocks whenever it processes an event
|
||||
// because of this, we can avoid trying to set large bounds on these
|
||||
// event channels by using a coalescing thread.
|
||||
new_stacks_block_channel: Sender<()>,
|
||||
new_burn_block_channel: Sender<()>,
|
||||
/// how many stacks blocks have been processed by this Coordinator thread since startup?
|
||||
stacks_blocks_processed: Arc<AtomicU64>,
|
||||
/// how many sortitions have been processed by this Coordinator thread since startup?
|
||||
sortitions_processed: Arc<AtomicU64>,
|
||||
stop: Sender<()>
|
||||
}
|
||||
|
||||
/// Structure used by the Coordinator's run-loop
|
||||
/// to receive signals
|
||||
pub struct CoordinatorReceivers {
|
||||
pub event_stacks_block: Receiver<()>,
|
||||
pub event_burn_block: Receiver<()>,
|
||||
pub stop: Receiver<()>,
|
||||
pub stacks_blocks_processed: Arc<AtomicU64>,
|
||||
pub sortitions_processed: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
// Singletons for ChainsCoordinator communication
|
||||
//
|
||||
// these channels allow any thread to notify the ChainsCoordinator
|
||||
// instance that a new staging block is ready or a new bitcoin
|
||||
// block has arrived
|
||||
//
|
||||
// using a singleton for this pretty dramatically simplifies state
|
||||
// management in the stacks-node, bitcoin indexer, and relayer, because they
|
||||
// don't need to pass around instances of the channels. however,
|
||||
// this _does_ step on the cargo test framework in silly ways, so any
|
||||
// tests which instantiate a coordinator need to call
|
||||
// CoordinatorCommunication::stop_chains_coordinator()
|
||||
// when they are done.
|
||||
lazy_static! {
|
||||
static ref COORDINATOR_CHANNELS: RwLock<Option<CoordinatorChannels>> = RwLock::new(None);
|
||||
}
|
||||
|
||||
/// Static struct used to hold all the static methods
|
||||
/// for communication with the singleton
|
||||
pub struct CoordinatorCommunication;
|
||||
|
||||
impl CoordinatorChannels {
|
||||
fn handle_result(r: Result<(), TrySendError<()>>) {
|
||||
match r {
|
||||
// don't need to do anything if the channel is full -- the coordinator
|
||||
// will check for the new block when it processes the next block anyways
|
||||
Ok(_) | Err(TrySendError::Full(_)) => {},
|
||||
Err(TrySendError::Disconnected(_)) => {
|
||||
warn!("ChainsCoordinator hung up, exiting...");
|
||||
process::exit(-1);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn announce_new_stacks_block(&self) {
|
||||
CoordinatorChannels::handle_result(
|
||||
self.new_stacks_block_channel.try_send(()));
|
||||
}
|
||||
|
||||
pub fn announce_new_burn_block(&self) {
|
||||
CoordinatorChannels::handle_result(
|
||||
self.new_burn_block_channel.try_send(()));
|
||||
}
|
||||
|
||||
pub fn stop_chains_coordinator(&self) {
|
||||
CoordinatorChannels::handle_result(
|
||||
self.stop.try_send(()));
|
||||
}
|
||||
|
||||
pub fn get_stacks_blocks_processed(&self) -> u64 {
|
||||
self.stacks_blocks_processed.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub fn get_sortitions_processed(&self) -> u64 {
|
||||
self.sortitions_processed.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub fn wait_for_sortitions_processed(&self, current: u64, timeout_millis: u64) -> bool {
|
||||
let start = Instant::now();
|
||||
while self.get_sortitions_processed() <= current {
|
||||
if start.elapsed() > Duration::from_millis(timeout_millis) {
|
||||
return false;
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
std::sync::atomic::spin_loop_hint();
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
pub fn wait_for_stacks_blocks_processed(&self, current: u64, timeout_millis: u64) -> bool {
|
||||
let start = Instant::now();
|
||||
while self.get_stacks_blocks_processed() <= current {
|
||||
if start.elapsed() > Duration::from_millis(timeout_millis) {
|
||||
return false;
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
std::sync::atomic::spin_loop_hint();
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl CoordinatorCommunication {
|
||||
pub fn cleanup_singleton() {
|
||||
info!("Dropping coordinator channel instance");
|
||||
COORDINATOR_CHANNELS.write().unwrap().take()
|
||||
.expect("FAIL: ChainsCoordinator cleaning up channels, but send channels non-existant");
|
||||
}
|
||||
|
||||
pub fn shared() -> CoordinatorChannels {
|
||||
COORDINATOR_CHANNELS.read().unwrap()
|
||||
.as_ref().cloned()
|
||||
.expect("FAIL: attempted to obtain chains coordinator channels, but instance not constructed.")
|
||||
}
|
||||
|
||||
pub fn announce_new_stacks_block() {
|
||||
COORDINATOR_CHANNELS.read().unwrap()
|
||||
.as_ref().expect("FAIL: attempted to obtain chains coordinator channels, but instance not constructed.")
|
||||
.announce_new_stacks_block()
|
||||
}
|
||||
|
||||
pub fn announce_new_burn_block() {
|
||||
COORDINATOR_CHANNELS.read().unwrap()
|
||||
.as_ref().expect("FAIL: attempted to obtain chains coordinator channels, but instance not constructed.")
|
||||
.announce_new_burn_block()
|
||||
}
|
||||
|
||||
pub fn stop_chains_coordinator() {
|
||||
COORDINATOR_CHANNELS.read().unwrap()
|
||||
.as_ref().expect("FAIL: attempted to obtain chains coordinator channels, but instance not constructed.")
|
||||
.stop_chains_coordinator()
|
||||
}
|
||||
|
||||
pub fn get_stacks_blocks_processed() -> u64 {
|
||||
COORDINATOR_CHANNELS.read().unwrap()
|
||||
.as_ref().expect("FAIL: attempted to obtain chains coordinator channels, but instance not constructed.")
|
||||
.get_stacks_blocks_processed()
|
||||
}
|
||||
|
||||
pub fn get_sortitions_processed() -> u64 {
|
||||
COORDINATOR_CHANNELS.read().unwrap()
|
||||
.as_ref().expect("FAIL: attempted to obtain chains coordinator channels, but instance not constructed.")
|
||||
.get_sortitions_processed()
|
||||
}
|
||||
|
||||
/// wait for `current` to be surpassed, or timeout
|
||||
/// returns `false` if timeout is reached
|
||||
/// returns `true` if sortitions processed is passed
|
||||
pub fn wait_for_sortitions_processed(current: u64, timeout_millis: u64) -> bool {
|
||||
COORDINATOR_CHANNELS.read().unwrap()
|
||||
.as_ref().expect("FAIL: attempted to obtain chains coordinator channels, but instance not constructed.")
|
||||
.wait_for_sortitions_processed(current, timeout_millis)
|
||||
}
|
||||
|
||||
/// wait for `current` to be surpassed, or timeout
|
||||
/// returns `false` if timeout is reached
|
||||
/// returns `true` if sortitions processed is passed
|
||||
pub fn wait_for_stacks_blocks_processed(current: u64, timeout_millis: u64) -> bool {
|
||||
COORDINATOR_CHANNELS.read().unwrap()
|
||||
.as_ref().expect("FAIL: attempted to obtain chains coordinator channels, but instance not constructed.")
|
||||
.wait_for_stacks_blocks_processed(current, timeout_millis)
|
||||
}
|
||||
|
||||
|
||||
pub fn instantiate_singleton() -> CoordinatorReceivers {
|
||||
let mut channel_storage = COORDINATOR_CHANNELS.write().unwrap();
|
||||
if channel_storage.is_some() {
|
||||
panic!("FAIL: attempted to start chains coordinator, but instance already constructed.");
|
||||
}
|
||||
|
||||
let (stacks_block_sender, stacks_block_receiver) = bounded(1);
|
||||
let (burn_block_sender, burn_block_receiver) = bounded(1);
|
||||
let (stop_sender, stop_receiver) = bounded(1);
|
||||
let stacks_blocks_processed = Arc::new(AtomicU64::new(0));
|
||||
let sortitions_processed = Arc::new(AtomicU64::new(0));
|
||||
|
||||
channel_storage.replace(CoordinatorChannels {
|
||||
new_stacks_block_channel: stacks_block_sender,
|
||||
new_burn_block_channel: burn_block_sender,
|
||||
stacks_blocks_processed: stacks_blocks_processed.clone(),
|
||||
sortitions_processed: sortitions_processed.clone(),
|
||||
stop: stop_sender,
|
||||
});
|
||||
|
||||
CoordinatorReceivers {
|
||||
event_stacks_block: stacks_block_receiver,
|
||||
event_burn_block: burn_block_receiver,
|
||||
stop: stop_receiver,
|
||||
stacks_blocks_processed,
|
||||
sortitions_processed
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
#[warn(unused_imports)]
|
||||
use std::collections::VecDeque;
|
||||
use std::{
|
||||
thread, process
|
||||
@@ -40,9 +41,17 @@ use util::db::{
|
||||
Error as DBError
|
||||
};
|
||||
|
||||
mod comm;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub use self::comm::CoordinatorCommunication;
|
||||
|
||||
use chainstate::coordinator::comm::{
|
||||
CoordinatorNotices, CoordinatorReceivers, ArcCounterCoordinatorNotices
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct RewardCycleInfo {
|
||||
/// what was the elected PoX anchor, if any?
|
||||
@@ -56,7 +65,7 @@ pub trait BlockEventDispatcher {
|
||||
receipts: Vec<StacksTransactionReceipt>, parent: &StacksBlockId);
|
||||
}
|
||||
|
||||
pub struct ChainsCoordinator<'a, T: BlockEventDispatcher> {
|
||||
pub struct ChainsCoordinator <'a, T: BlockEventDispatcher, N: CoordinatorNotices> {
|
||||
canonical_sortition_tip: Option<SortitionId>,
|
||||
canonical_chain_tip: Option<StacksBlockId>,
|
||||
canonical_pox_id: Option<PoxId>,
|
||||
@@ -64,7 +73,8 @@ pub struct ChainsCoordinator<'a, T: BlockEventDispatcher> {
|
||||
chain_state_db: StacksChainState,
|
||||
sortition_db: SortitionDB,
|
||||
burnchain: Burnchain,
|
||||
dispatcher: Option<&'a T>
|
||||
dispatcher: Option<&'a T>,
|
||||
notifier: N,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -96,155 +106,90 @@ impl From<DBError> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
struct CoordinatorChannels {
|
||||
new_stacks_block_channel: Sender<()>,
|
||||
new_burn_block_channel: Sender<()>,
|
||||
}
|
||||
impl <'a, T: BlockEventDispatcher> ChainsCoordinator <'a, T, ArcCounterCoordinatorNotices> {
|
||||
pub fn run<F>(state_path: &str, burnchain: &str, stacks_mainnet: bool, stacks_chain_id: u32,
|
||||
initial_balances: Option<Vec<(PrincipalData, u64)>>,
|
||||
block_limit: ExecutionCost, dispatcher: &T, comms: CoordinatorReceivers,
|
||||
boot_block_exec: F)
|
||||
where F: FnOnce(&mut ClarityTx), T: BlockEventDispatcher {
|
||||
|
||||
struct CoordinatorReceivers {
|
||||
event_stacks_block: Receiver<()>,
|
||||
event_burn_block: Receiver<()>,
|
||||
}
|
||||
let CoordinatorReceivers {
|
||||
event_stacks_block: stacks_block_channel,
|
||||
event_burn_block: burn_block_channel,
|
||||
stop: stop_channel,
|
||||
stacks_blocks_processed, sortitions_processed } = comms;
|
||||
|
||||
// Singletons for ChainsCoordinator communication
|
||||
//
|
||||
// these channels allow any thread to notify the ChainsCoordinator
|
||||
// instance that a new staging block is ready or a new bitcoin
|
||||
// block has arrived
|
||||
//
|
||||
// using a singleton for this pretty dramatically simplifies state
|
||||
// management in the stacks-node, bitcoin indexer, and relayer, because they
|
||||
// don't need to pass around instances of the channels. however,
|
||||
// this _does_ step on the cargo test framework in silly ways, so any
|
||||
// tests which instantiate a coordinator need to call
|
||||
// CoordinatorCommunication::stop_chains_coordinator()
|
||||
// when they are done.
|
||||
lazy_static! {
|
||||
// ChainsCoordinator takes two kinds of signals:
|
||||
// new stacks block & new burn block
|
||||
// These signals can be coalesced -- the coordinator doesn't need
|
||||
// handles _all_ new blocks whenever it processes an event
|
||||
// because of this, we can avoid trying to set large bounds on these
|
||||
// event channels by using a coalescing thread.
|
||||
static ref COORDINATOR_CHANNELS: RwLock<Option<CoordinatorChannels>> = RwLock::new(None);
|
||||
// how many stacks blocks have been processed by this Coordinator thread since startup?
|
||||
static ref STACKS_BLOCKS_PROCESSED: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
|
||||
// how many sortitions have been processed by this Coordinator thread since startup?
|
||||
static ref SORTITIONS_PROCESSED: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
|
||||
// receive channels for the coordinator
|
||||
static ref COORDINATOR_RECEIVERS: RwLock<Option<CoordinatorReceivers>> = RwLock::new(None);
|
||||
static ref STOP: RwLock<AtomicBool> = RwLock::new(AtomicBool::new(false));
|
||||
}
|
||||
let mut event_receiver = Select::new();
|
||||
let event_stacks_block = event_receiver.recv(&stacks_block_channel);
|
||||
let event_burn_block = event_receiver.recv(&burn_block_channel);
|
||||
let event_stop = event_receiver.recv(&stop_channel);
|
||||
|
||||
pub struct CoordinatorCommunication;
|
||||
let burnchain = Burnchain::new(&format!("{}/burnchain/db/", state_path), "bitcoin", burnchain).unwrap();
|
||||
|
||||
impl CoordinatorCommunication {
|
||||
pub fn instantiate() {
|
||||
let (event_stacks_block, event_burn_block) = {
|
||||
let mut channel_storage = COORDINATOR_CHANNELS.write().unwrap();
|
||||
if channel_storage.is_some() {
|
||||
panic!("FAIL: attempted to start chains coordinator, but instance already constructed.");
|
||||
}
|
||||
let sortition_db = SortitionDB::open(&burnchain.get_db_path(), true).unwrap();
|
||||
let burnchain_blocks_db = BurnchainDB::open(&burnchain.get_burnchaindb_path(), false).unwrap();
|
||||
let chain_state_db = StacksChainState::open_and_exec(
|
||||
stacks_mainnet, stacks_chain_id, &format!("{}/chainstate/", state_path),
|
||||
initial_balances,
|
||||
boot_block_exec,
|
||||
block_limit)
|
||||
.unwrap();
|
||||
|
||||
let (stacks_block_sender, stacks_block_receiver) = bounded(1);
|
||||
let (burn_block_sender, burn_block_receiver) = bounded(1);
|
||||
let canonical_sortition_tip = SortitionDB::get_canonical_sortition_tip(sortition_db.conn()).unwrap();
|
||||
|
||||
channel_storage.replace(CoordinatorChannels {
|
||||
new_stacks_block_channel: stacks_block_sender,
|
||||
new_burn_block_channel: burn_block_sender
|
||||
});
|
||||
let arc_notices = ArcCounterCoordinatorNotices { stacks_blocks_processed, sortitions_processed };
|
||||
|
||||
(stacks_block_receiver, burn_block_receiver)
|
||||
let mut inst = ChainsCoordinator {
|
||||
canonical_chain_tip: None,
|
||||
canonical_sortition_tip: Some(canonical_sortition_tip),
|
||||
canonical_pox_id: None,
|
||||
burnchain_blocks_db,
|
||||
chain_state_db,
|
||||
sortition_db,
|
||||
burnchain,
|
||||
dispatcher: Some(dispatcher),
|
||||
notifier: arc_notices
|
||||
};
|
||||
|
||||
STOP.write().unwrap().store(false, Ordering::SeqCst);
|
||||
loop {
|
||||
// timeout so that we handle Ctrl-C a little gracefully
|
||||
let ready_oper = match event_receiver.select_timeout(Duration::from_millis(500)) {
|
||||
Ok(op) => op,
|
||||
Err(_) => continue
|
||||
};
|
||||
|
||||
let mut receiver_storage = COORDINATOR_RECEIVERS.write().unwrap();
|
||||
receiver_storage.replace(CoordinatorReceivers {
|
||||
event_burn_block, event_stacks_block
|
||||
});
|
||||
}
|
||||
|
||||
pub fn announce_new_stacks_block() {
|
||||
let result = COORDINATOR_CHANNELS.read().unwrap()
|
||||
.as_ref()
|
||||
.expect("FAIL: attempted to announce new stacks block to chains coordinator, but instance not constructed.")
|
||||
.new_stacks_block_channel
|
||||
.try_send(());
|
||||
match result {
|
||||
// don't need to do anything if the channel is full -- the coordinator
|
||||
// will check for the new block when it processes the next block anyways
|
||||
Ok(_) | Err(TrySendError::Full(_)) => {},
|
||||
Err(TrySendError::Disconnected(_)) => {
|
||||
warn!("ChainsCoordinator hung up, exiting...");
|
||||
process::exit(-1);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn announce_burn_block() {
|
||||
let result = COORDINATOR_CHANNELS.read().unwrap()
|
||||
.as_ref()
|
||||
.expect("FAIL: attempted to announce new stacks block to chains coordinator, but instance not constructed.")
|
||||
.new_burn_block_channel
|
||||
.try_send(());
|
||||
match result {
|
||||
// don't need to do anything if the channel is full -- the coordinator
|
||||
// will check for the new block when it processes the next block anyways
|
||||
Ok(_) | Err(TrySendError::Full(_)) => {},
|
||||
Err(TrySendError::Disconnected(_)) => {
|
||||
warn!("ChainsCoordinator hung up, exiting...");
|
||||
process::exit(-1);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_stacks_blocks_processed() -> u64 {
|
||||
STACKS_BLOCKS_PROCESSED.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub fn get_sortitions_processed() -> u64 {
|
||||
SORTITIONS_PROCESSED.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub fn stop_chains_coordinator() {
|
||||
STOP.write().unwrap().store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
/// wait for `current` to be surpassed, or timeout
|
||||
/// returns `false` if timeout is reached
|
||||
/// returns `true` if sortitions processed is passed
|
||||
pub fn wait_for_sortitions_processed(current: u64, timeout_millis: u64) -> bool {
|
||||
let start = Instant::now();
|
||||
while SORTITIONS_PROCESSED.load(Ordering::SeqCst) <= current {
|
||||
if start.elapsed() > Duration::from_millis(timeout_millis) {
|
||||
return false;
|
||||
match ready_oper.index() {
|
||||
i if i == event_stacks_block => {
|
||||
debug!("Received new stacks block notice");
|
||||
// pop operation off of receiver
|
||||
ready_oper.recv(&stacks_block_channel).unwrap();
|
||||
if let Err(e) = inst.process_ready_blocks() {
|
||||
warn!("Error processing new stacks block: {:?}", e);
|
||||
}
|
||||
},
|
||||
i if i == event_burn_block => {
|
||||
// pop operation off of receiver
|
||||
debug!("Received new burn block notice");
|
||||
ready_oper.recv(&burn_block_channel).unwrap();
|
||||
if let Err(e) = inst.handle_new_burnchain_block() {
|
||||
warn!("Error processing new burn block: {:?}", e);
|
||||
}
|
||||
},
|
||||
i if i == event_stop => {
|
||||
CoordinatorCommunication::cleanup_singleton();
|
||||
return
|
||||
},
|
||||
_ => {
|
||||
unreachable!("Ready channel for non-registered channel");
|
||||
},
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
std::sync::atomic::spin_loop_hint();
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
/// wait for `current` to be surpassed, or timeout
|
||||
/// returns `false` if timeout is reached
|
||||
/// returns `true` if sortitions processed is passed
|
||||
pub fn wait_for_stacks_blocks_processed(current: u64, timeout_millis: u64) -> bool {
|
||||
let start = Instant::now();
|
||||
while STACKS_BLOCKS_PROCESSED.load(Ordering::SeqCst) <= current {
|
||||
if start.elapsed() > Duration::from_millis(timeout_millis) {
|
||||
return false;
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
std::sync::atomic::spin_loop_hint();
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
impl <'a, T: BlockEventDispatcher> ChainsCoordinator <'a, T> {
|
||||
impl <'a, T: BlockEventDispatcher> ChainsCoordinator <'a, T, ()> {
|
||||
#[cfg(test)]
|
||||
pub fn test_new(burnchain: &Burnchain, path: &str) -> ChainsCoordinator<'a, T> {
|
||||
pub fn test_new(burnchain: &Burnchain, path: &str) -> ChainsCoordinator<'a, T, ()> {
|
||||
let burnchain = burnchain.clone();
|
||||
|
||||
let sortition_db = SortitionDB::open(&burnchain.get_db_path(), true).unwrap();
|
||||
@@ -261,86 +206,13 @@ impl <'a, T: BlockEventDispatcher> ChainsCoordinator <'a, T> {
|
||||
chain_state_db,
|
||||
sortition_db,
|
||||
burnchain,
|
||||
dispatcher: None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run<F>(state_path: &str, burnchain: &str, stacks_mainnet: bool, stacks_chain_id: u32,
|
||||
initial_balances: Option<Vec<(PrincipalData, u64)>>,
|
||||
block_limit: ExecutionCost, dispatcher: &T,
|
||||
boot_block_exec: F)
|
||||
where F: FnOnce(&mut ClarityTx), T: BlockEventDispatcher {
|
||||
let receivers = COORDINATOR_RECEIVERS.write().unwrap().take()
|
||||
.expect("FAIL: run() called before receiver channels set up, or ChainsCoordinator already running");
|
||||
|
||||
let mut event_receiver = Select::new();
|
||||
let event_stacks_block = event_receiver.recv(&receivers.event_stacks_block);
|
||||
let event_burn_block = event_receiver.recv(&receivers.event_burn_block);
|
||||
|
||||
let burnchain = Burnchain::new(&format!("{}/burnchain/db/", state_path), "bitcoin", burnchain).unwrap();
|
||||
|
||||
let sortition_db = SortitionDB::open(&burnchain.get_db_path(), true).unwrap();
|
||||
let burnchain_blocks_db = BurnchainDB::open(&burnchain.get_burnchaindb_path(), false).unwrap();
|
||||
let chain_state_db = StacksChainState::open_and_exec(
|
||||
stacks_mainnet, stacks_chain_id, &format!("{}/chainstate/", state_path),
|
||||
initial_balances,
|
||||
boot_block_exec,
|
||||
block_limit)
|
||||
.unwrap();
|
||||
|
||||
let canonical_sortition_tip = SortitionDB::get_canonical_sortition_tip(sortition_db.conn()).unwrap();
|
||||
|
||||
let mut inst = ChainsCoordinator {
|
||||
canonical_chain_tip: None,
|
||||
canonical_sortition_tip: Some(canonical_sortition_tip),
|
||||
canonical_pox_id: None,
|
||||
burnchain_blocks_db,
|
||||
chain_state_db,
|
||||
sortition_db,
|
||||
burnchain,
|
||||
dispatcher: Some(dispatcher)
|
||||
};
|
||||
|
||||
loop {
|
||||
// timeout so that we handle Ctrl-C a little gracefully
|
||||
let ready_oper = match event_receiver.select_timeout(Duration::from_millis(500)) {
|
||||
Ok(op) => op,
|
||||
Err(_) => if STOP.read().unwrap().load(Ordering::SeqCst) {
|
||||
info!("Dropping coordinator channel instance");
|
||||
COORDINATOR_CHANNELS.write().unwrap().take()
|
||||
.expect("FAIL: ChainsCoordinator cleaning up channels, but send channels non-existant");
|
||||
STACKS_BLOCKS_PROCESSED.store(0, Ordering::SeqCst);
|
||||
SORTITIONS_PROCESSED.store(0, Ordering::SeqCst);
|
||||
return
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
match ready_oper.index() {
|
||||
i if i == event_stacks_block => {
|
||||
debug!("Received new stacks block notice");
|
||||
// pop operation off of receiver
|
||||
ready_oper.recv(&receivers.event_stacks_block).unwrap();
|
||||
if let Err(e) = inst.process_ready_blocks() {
|
||||
warn!("Error processing new stacks block: {:?}", e);
|
||||
}
|
||||
},
|
||||
i if i == event_burn_block => {
|
||||
// pop operation off of receiver
|
||||
debug!("Received new burn block notice");
|
||||
ready_oper.recv(&receivers.event_burn_block).unwrap();
|
||||
if let Err(e) = inst.handle_new_burnchain_block() {
|
||||
warn!("Error processing new burn block: {:?}", e);
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
unreachable!("Ready channel for non-registered channel");
|
||||
},
|
||||
}
|
||||
dispatcher: None,
|
||||
notifier: ()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl <'a, T: BlockEventDispatcher, N: CoordinatorNotices> ChainsCoordinator <'a, T, N> {
|
||||
pub fn handle_new_burnchain_block(&mut self) -> Result<(), Error> {
|
||||
// Retrieve canonical burnchain chain tip from the BurnchainBlocksDB
|
||||
let canonical_burnchain_tip = self.burnchain_blocks_db.get_canonical_chain_tip()?;
|
||||
@@ -384,7 +256,7 @@ impl <'a, T: BlockEventDispatcher> ChainsCoordinator <'a, T> {
|
||||
})?
|
||||
.0.sortition_id;
|
||||
|
||||
SORTITIONS_PROCESSED.fetch_add(1, Ordering::SeqCst);
|
||||
self.notifier.notify_sortition_processed();
|
||||
|
||||
debug!("Sortition processed: {}", &sortition_id);
|
||||
|
||||
@@ -446,7 +318,7 @@ impl <'a, T: BlockEventDispatcher> ChainsCoordinator <'a, T> {
|
||||
.get_canonical_stacks_block_id();
|
||||
self.canonical_chain_tip = Some(new_canonical_stacks_block);
|
||||
debug!("Bump blocks processed");
|
||||
STACKS_BLOCKS_PROCESSED.fetch_add(1, Ordering::SeqCst);
|
||||
self.notifier.notify_stacks_block_processed();
|
||||
increment_stx_blocks_processed_counter();
|
||||
let block_hash = block_receipt.header.anchored_header.block_hash();
|
||||
|
||||
|
||||
@@ -142,7 +142,7 @@ impl BlockEventDispatcher for NullEventDispatcher {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn make_coordinator<'a>(path: &str) -> ChainsCoordinator<'a, NullEventDispatcher> {
|
||||
pub fn make_coordinator<'a>(path: &str) -> ChainsCoordinator<'a, NullEventDispatcher, ()> {
|
||||
ChainsCoordinator::test_new(&get_burnchain(path), path)
|
||||
}
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ impl RunLoop {
|
||||
/// the nodes, taking turns on tenures.
|
||||
pub fn start(&mut self, _expected_num_rounds: u64) {
|
||||
|
||||
CoordinatorCommunication::instantiate();
|
||||
let coordinator_receivers = CoordinatorCommunication::instantiate_singleton();
|
||||
|
||||
// Initialize and start the burnchain.
|
||||
let mut burnchain = BitcoinRegtestController::new(self.config.clone());
|
||||
@@ -120,7 +120,8 @@ impl RunLoop {
|
||||
thread::spawn(move || {
|
||||
ChainsCoordinator::run(&workdir, "regtest", mainnet, chainid,
|
||||
Some(initial_balances),
|
||||
block_limit, &coordinator_dispatcher, |_| {});
|
||||
block_limit, &coordinator_dispatcher,
|
||||
coordinator_receivers, |_| {});
|
||||
});
|
||||
|
||||
let mut burnchain_tip = burnchain.resync(None);
|
||||
|
||||
Reference in New Issue
Block a user