allow tests to run in same cargo test process

This commit is contained in:
Aaron Blankstein
2020-08-03 14:50:34 -05:00
parent 4b71cb5d0d
commit cb9679989d
5 changed files with 49 additions and 12 deletions

View File

@@ -442,9 +442,7 @@ impl Burnchain {
let db_path = self.get_db_path();
let burnchain_db_path = self.get_burnchaindb_path();
info!("Paths: {} {}", db_path, burnchain_db_path);
let sortitiondb = SortitionDB::connect(&db_path, first_block_height, &first_block_header_hash, first_block_header_timestamp, readwrite)?;
let burnchaindb = BurnchainDB::connect(&burnchain_db_path, first_block_height, &first_block_header_hash, first_block_header_timestamp, readwrite)?;
Ok((sortitiondb, burnchaindb))

View File

@@ -7,7 +7,7 @@ use std::time::{
};
use std::sync::{
Arc, RwLock,
atomic::{Ordering, AtomicU64}
atomic::{Ordering, AtomicU64, AtomicBool}
};
use crossbeam_channel::{select, bounded, Sender, Receiver, Select, TrySendError};
@@ -103,8 +103,19 @@ struct CoordinatorReceivers {
event_burn_block: Receiver<()>,
}
// Singleton for ChainsCoordinator
// Singletons for ChainsCoordinator communication
//
// these channels allow any thread to notify the ChainsCoordinator
// instance that a new staging block is ready or a new bitcoin
// block has arrived
//
// using a singleton for this pretty dramatically simplifies state
// management in the stacks-node, bitcoin indexer, and relayer, because they
// don't need to pass around instances of the channels. however,
// this _does_ step on the cargo test framework in silly ways, so any
// tests which instantiate a coordinator need to call
// CoordinatorCommunication::stop_chains_coordinator()
// when they are done.
lazy_static! {
// ChainsCoordinator takes two kinds of signals:
// new stacks block & new burn block
@@ -117,8 +128,9 @@ lazy_static! {
static ref STACKS_BLOCKS_PROCESSED: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
// how many sortitions have been processed by this Coordinator thread since startup?
static ref SORTITIONS_PROCESSED: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
//
// receive channels for the coordinator
static ref COORDINATOR_RECEIVERS: RwLock<Option<CoordinatorReceivers>> = RwLock::new(None);
static ref STOP: RwLock<AtomicBool> = RwLock::new(AtomicBool::new(false));
}
pub struct CoordinatorCommunication;
@@ -142,6 +154,8 @@ impl CoordinatorCommunication {
(stacks_block_receiver, burn_block_receiver)
};
STOP.write().unwrap().store(false, Ordering::SeqCst);
let mut receiver_storage = COORDINATOR_RECEIVERS.write().unwrap();
receiver_storage.replace(CoordinatorReceivers {
event_burn_block, event_stacks_block
@@ -190,6 +204,10 @@ impl CoordinatorCommunication {
SORTITIONS_PROCESSED.load(Ordering::SeqCst)
}
pub fn stop_chains_coordinator() {
STOP.write().unwrap().store(true, Ordering::SeqCst);
}
/// wait for `current` to be surpassed, or timeout
/// returns `false` if timeout is reached
/// returns `true` if sortitions processed is passed
@@ -221,6 +239,17 @@ impl CoordinatorCommunication {
}
}
// Destroy the singleton communication channels
impl <'a, T: BlockEventDispatcher> Drop for ChainsCoordinator<'a, T> {
fn drop(&mut self) {
info!("Dropping chain coordinator instance");
COORDINATOR_CHANNELS.write().unwrap().take()
.expect("FAIL: ChainsCoordinator cleaning up channels, but send channels non-existant");
STACKS_BLOCKS_PROCESSED.store(0, Ordering::SeqCst);
SORTITIONS_PROCESSED.store(0, Ordering::SeqCst);
}
}
impl <'a, T: BlockEventDispatcher> ChainsCoordinator <'a, T> {
pub fn run<F>(state_path: &str, burnchain: &str, stacks_mainnet: bool, stacks_chain_id: u32,
initial_balances: Option<Vec<(PrincipalData, u64)>>,
@@ -262,12 +291,16 @@ impl <'a, T: BlockEventDispatcher> ChainsCoordinator <'a, T> {
// timeout so that we handle Ctrl-C a little gracefully
let ready_oper = match event_receiver.select_timeout(Duration::from_millis(500)) {
Ok(op) => op,
Err(_) => continue,
Err(_) => if STOP.read().unwrap().load(Ordering::SeqCst) {
return
} else {
continue
}
};
match ready_oper.index() {
i if i == event_stacks_block => {
info!("Received new stacks block notice");
debug!("Received new stacks block notice");
// pop operation off of receiver
ready_oper.recv(&receivers.event_stacks_block).unwrap();
if let Err(e) = inst.process_ready_blocks() {
@@ -276,7 +309,7 @@ impl <'a, T: BlockEventDispatcher> ChainsCoordinator <'a, T> {
},
i if i == event_burn_block => {
// pop operation off of receiver
info!("Received new burn block notice");
debug!("Received new burn block notice");
ready_oper.recv(&receivers.event_burn_block).unwrap();
if let Err(e) = inst.handle_new_burnchain_block() {
warn!("Error processing new burn block: {:?}", e);
@@ -392,7 +425,7 @@ impl <'a, T: BlockEventDispatcher> ChainsCoordinator <'a, T> {
.expect(&format!("FAIL: could not find data for the canonical sortition {}", canonical_sortition_tip))
.get_canonical_stacks_block_id();
self.canonical_chain_tip = Some(new_canonical_stacks_block);
info!("Bump blocks processed");
debug!("Bump blocks processed");
STACKS_BLOCKS_PROCESSED.fetch_add(1, Ordering::SeqCst);
increment_stx_blocks_processed_counter();
let block_hash = block_receipt.header.anchored_header.block_hash();

View File

@@ -274,7 +274,7 @@ impl <'a> StacksMicroblockBuilder <'a> {
impl <'a> Drop for StacksMicroblockBuilder<'a> {
fn drop(&mut self) {
info!("Drop StacksMicroblockBuilder");
debug!("Drop StacksMicroblockBuilder");
self.clarity_tx.take().expect("Attempted to reclose closed microblock builder")
.rollback_block()
}

View File

@@ -190,7 +190,6 @@ impl RunLoop {
}
block_height = next_height;
}
}
}

View File

@@ -20,6 +20,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Instant, Duration};
use stacks::util::hash::bytes_to_hex;
use stacks::util::hash::Hash160;
use stacks::chainstate::coordinator::CoordinatorCommunication;
fn neon_integration_test_conf() -> (Config, StacksAddress) {
let mut conf = super::new_test_conf();
@@ -207,6 +208,8 @@ fn bitcoind_integration_test() {
eprintln!("Response: {:#?}", res);
assert_eq!(u128::from_str_radix(&res.balance[2..], 16).unwrap(), 0);
assert_eq!(res.nonce, 1);
CoordinatorCommunication::stop_chains_coordinator();
}
#[test]
@@ -394,6 +397,8 @@ fn microblock_integration_test() {
eprintln!("{:#?}", res);
assert_eq!(res.nonce, 2);
assert_eq!(u128::from_str_radix(&res.balance[2..], 16).unwrap(), 96300);
CoordinatorCommunication::stop_chains_coordinator();
}
#[test]
@@ -544,4 +549,6 @@ fn size_check_integration_test() {
assert_eq!(anchor_block_txs, 2);
assert_eq!(micro_block_txs, 1);
CoordinatorCommunication::stop_chains_coordinator();
}