Merge branch 'develop' into feat/3595

This commit is contained in:
Igor Sylvester
2023-03-07 12:14:42 -06:00
committed by GitHub
14 changed files with 1004 additions and 239 deletions

View File

@@ -258,6 +258,10 @@ this version of the software on it.
- The `blockstack-core` binary has been renamed to `stacks-inspect`.
This binary provides CLI tools for chain and mempool inspection.
### Fixed
- The AtlasDB previously could lose `AttachmentInstance` data during shutdown
or crashes (#3082). This release resolves that.
## [2.05.0.1.0]
### Added

View File

@@ -12,6 +12,7 @@ You can find information on joining online community forums (Discord, mailing li
[How Can I Contribute?](#how-can-i-contribute)
* [Development Workflow](#development-workflow)
* [Contributing Conventions](#contributing-conventions)
* [Developer Setup](#recommended-developer-setup)
[Style](#style)
* [Git Commit Messages](#git-commit-messages)
@@ -105,6 +106,35 @@ Each module should include an `Error` enumeration in its `mod.rs` that encodes
errors specific to the module. All error code paths in the module should return
an `Err` type with one of the module's errors.
## Recommended developer setup
### Recommended githooks
It is helpful to set up the pre-commit git hook set up, so that Rust formatting issues are caught before
you push your code. Follow these instruction to set it up:
1. Rename `.git/hooks/pre-commit.sample` to `.git/hooks/pre-commit`
2. Change the content of `.git/hooks/pre-commit` to be the following
```bash
#!/bin/sh
HAS_ISSUES=0
for file in $(git diff --name-only --staged); do
FMT_RESULT="$(cargo fmt -- $file --check --config group_imports=StdExternalCrate 2>/dev/null || true)"
if [ "$FMT_RESULT" != "" ]; then
HAS_ISSUES=1
fi
done
if [ $HAS_ISSUES -eq 1 ]
then
echo 'rustfmt failed: run "cargo fmt --all -- --config group_imports=StdExternalCrate"'
fi
exit $HAS_ISSUES
```
3. Make it executable by running `chmod +x .git/hooks/pre-commit`
That's it! Now your pre-commit hook should be configured on your local machine.
# Style
## Git Commit Messages
Aim to use descriptive git commit messages. We try to follow [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/).

View File

@@ -58,7 +58,7 @@ use crate::core::{StacksEpoch, StacksEpochId};
use crate::monitoring::{
increment_contract_calls_processed, increment_stx_blocks_processed_counter,
};
use crate::net::atlas::{AtlasConfig, AttachmentInstance};
use crate::net::atlas::{AtlasConfig, AtlasDB, AttachmentInstance};
use crate::util_lib::db::DBConn;
use crate::util_lib::db::DBTx;
use crate::util_lib::db::Error as DBError;
@@ -201,7 +201,7 @@ pub struct ChainsCoordinator<
chain_state_db: StacksChainState,
sortition_db: SortitionDB,
burnchain: Burnchain,
attachments_tx: SyncSender<HashSet<AttachmentInstance>>,
atlas_db: Option<AtlasDB>,
dispatcher: Option<&'a T>,
cost_estimator: Option<&'a mut CE>,
fee_estimator: Option<&'a mut FE>,
@@ -319,7 +319,6 @@ impl<
config: ChainsCoordinatorConfig,
chain_state_db: StacksChainState,
burnchain: Burnchain,
attachments_tx: SyncSender<HashSet<AttachmentInstance>>,
dispatcher: &'a mut T,
comms: CoordinatorReceivers,
atlas_config: AtlasConfig,
@@ -327,6 +326,7 @@ impl<
fee_estimator: Option<&mut FE>,
miner_status: Arc<Mutex<MinerStatus>>,
burnchain_indexer: B,
atlas_db: AtlasDB,
) where
T: BlockEventDispatcher,
{
@@ -356,13 +356,13 @@ impl<
chain_state_db,
sortition_db,
burnchain,
attachments_tx,
dispatcher: Some(dispatcher),
notifier: arc_notices,
reward_set_provider: OnChainRewardSetProvider(),
cost_estimator,
fee_estimator,
atlas_config,
atlas_db: Some(atlas_db),
config,
burnchain_indexer,
};
@@ -419,35 +419,36 @@ impl<
impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader>
ChainsCoordinator<'a, T, (), U, (), (), B>
{
/// Create a coordinator for testing, with some parameters defaulted to None
#[cfg(test)]
pub fn test_new(
burnchain: &Burnchain,
chain_id: u32,
path: &str,
reward_set_provider: U,
attachments_tx: SyncSender<HashSet<AttachmentInstance>>,
indexer: B,
) -> ChainsCoordinator<'a, T, (), U, (), (), B> {
ChainsCoordinator::test_new_with_observer(
ChainsCoordinator::test_new_full(
burnchain,
chain_id,
path,
reward_set_provider,
attachments_tx,
None,
indexer,
None,
)
}
/// Create a coordinator for testing allowing for all configurable params
#[cfg(test)]
pub fn test_new_with_observer(
pub fn test_new_full(
burnchain: &Burnchain,
chain_id: u32,
path: &str,
reward_set_provider: U,
attachments_tx: SyncSender<HashSet<AttachmentInstance>>,
dispatcher: Option<&'a T>,
burnchain_indexer: B,
atlas_config: Option<AtlasConfig>,
) -> ChainsCoordinator<'a, T, (), U, (), (), B> {
let burnchain = burnchain.clone();
@@ -472,6 +473,10 @@ impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader
let canonical_sortition_tip =
SortitionDB::get_canonical_sortition_tip(sortition_db.conn()).unwrap();
let atlas_config = atlas_config.unwrap_or(AtlasConfig::default(false));
let atlas_db =
AtlasDB::connect(atlas_config.clone(), &format!("{}/atlas", path), true).unwrap();
ChainsCoordinator {
canonical_sortition_tip: Some(canonical_sortition_tip),
burnchain_blocks_db,
@@ -483,8 +488,8 @@ impl<'a, T: BlockEventDispatcher, U: RewardSetProvider, B: BurnchainHeaderReader
fee_estimator: None,
reward_set_provider,
notifier: (),
attachments_tx,
atlas_config: AtlasConfig::default(false),
atlas_config,
atlas_db: Some(atlas_db),
config: ChainsCoordinatorConfig::new(),
burnchain_indexer,
}
@@ -2557,7 +2562,8 @@ impl<
/// Process any Atlas attachment events and forward them to the Atlas subsystem
fn process_atlas_attachment_events(
&self,
atlas_db: Option<&mut AtlasDB>,
atlas_config: &AtlasConfig,
block_receipt: &StacksEpochReceipt,
canonical_stacks_tip_height: u64,
) {
@@ -2567,7 +2573,7 @@ impl<
if let TransactionPayload::ContractCall(ref contract_call) = transaction.payload {
let contract_id = contract_call.to_clarity_contract_id();
increment_contract_calls_processed();
if self.atlas_config.contracts.contains(&contract_id) {
if atlas_config.contracts.contains(&contract_id) {
for event in receipt.events.iter() {
if let StacksTransactionEvent::SmartContractEvent(ref event_data) =
event
@@ -2591,15 +2597,26 @@ impl<
}
if !attachments_instances.is_empty() {
info!(
"Atlas: {} attachment instances emitted from events",
attachments_instances.len()
"Atlas: New attachment instances emitted by block";
"attachments_count" => attachments_instances.len(),
"index_block_hash" => %block_receipt.header.index_block_hash(),
"stacks_height" => block_receipt.header.stacks_block_height,
);
match self.attachments_tx.send(attachments_instances) {
Ok(_) => {}
Err(e) => {
error!("Atlas: error dispatching attachments {}", e);
if let Some(atlas_db) = atlas_db {
for new_attachment in attachments_instances.into_iter() {
if let Err(e) = atlas_db.queue_attachment_instance(&new_attachment) {
warn!(
"Atlas: Error writing attachment instance to DB";
"err" => ?e,
"index_block_hash" => %new_attachment.index_block_hash,
"contract_id" => %new_attachment.contract_id,
"attachment_index" => %new_attachment.attachment_index,
);
}
}
};
} else {
warn!("Atlas: attempted to write attachments, but stacks-node not configured with Atlas DB");
}
}
}
@@ -2897,7 +2914,9 @@ impl<
self.notifier.notify_stacks_block_processed();
increment_stx_blocks_processed_counter();
self.process_atlas_attachment_events(
Self::process_atlas_attachment_events(
self.atlas_db.as_mut(),
&self.atlas_config,
&block_receipt,
new_canonical_block_snapshot.canonical_stacks_tip_height,
);

View File

@@ -51,6 +51,7 @@ use crate::core;
use crate::core::*;
use crate::monitoring::increment_stx_blocks_processed_counter;
use crate::util_lib::boot::boot_code_addr;
use crate::util_lib::strings::StacksString;
use crate::vm::errors::Error as InterpreterError;
use clarity::vm::{
costs::{ExecutionCost, LimitedCostTracker},
@@ -88,6 +89,14 @@ lazy_static! {
pub static ref STACKS_BLOCK_HEADERS: Arc<AtomicU64> = Arc::new(AtomicU64::new(1));
}
fn test_path(name: &str) -> String {
format!(
"/tmp/stacks-node-tests/coordinator-tests/{}/{}",
get_epoch_time_secs(),
name
)
}
pub fn next_block_hash() -> BlockHeaderHash {
let cur = STACKS_BLOCK_HEADERS.fetch_add(1, Ordering::SeqCst);
let mut bytes = vec![];
@@ -452,7 +461,6 @@ pub fn make_coordinator<'a>(
burnchain: Option<Burnchain>,
) -> ChainsCoordinator<'a, NullEventDispatcher, (), OnChainRewardSetProvider, (), (), BitcoinIndexer>
{
let (tx, _) = sync_channel(100000);
let burnchain = burnchain.unwrap_or_else(|| get_burnchain(path, None));
let indexer = BitcoinIndexer::new_unit_test(&burnchain.working_dir);
ChainsCoordinator::test_new(
@@ -460,11 +468,29 @@ pub fn make_coordinator<'a>(
0x80000000,
path,
OnChainRewardSetProvider(),
tx,
indexer,
)
}
pub fn make_coordinator_atlas<'a>(
path: &str,
burnchain: Option<Burnchain>,
atlas_config: Option<AtlasConfig>,
) -> ChainsCoordinator<'a, NullEventDispatcher, (), OnChainRewardSetProvider, (), (), BitcoinIndexer>
{
let burnchain = burnchain.unwrap_or_else(|| get_burnchain(path, None));
let indexer = BitcoinIndexer::new_unit_test(&burnchain.working_dir);
ChainsCoordinator::test_new_full(
&burnchain,
0x80000000,
path,
OnChainRewardSetProvider(),
None,
indexer,
atlas_config,
)
}
struct StubbedRewardSetProvider(Vec<PoxAddress>);
impl RewardSetProvider for StubbedRewardSetProvider {
@@ -491,7 +517,6 @@ fn make_reward_set_coordinator<'a>(
pox_consts: Option<PoxConstants>,
) -> ChainsCoordinator<'a, NullEventDispatcher, (), StubbedRewardSetProvider, (), (), BitcoinIndexer>
{
let (tx, _) = sync_channel(100000);
let burnchain = get_burnchain(path, None);
let indexer = BitcoinIndexer::new_unit_test(&burnchain.working_dir);
ChainsCoordinator::test_new(
@@ -499,7 +524,6 @@ fn make_reward_set_coordinator<'a>(
0x80000000,
path,
StubbedRewardSetProvider(addrs),
tx,
indexer,
)
}
@@ -729,6 +753,7 @@ fn make_stacks_block_from_parent_sortition(
false,
(Txid([0; 32]), 0),
Some(parent_sortition),
&[],
)
}
@@ -795,12 +820,14 @@ fn make_stacks_block_with_recipients_and_sunset_burn(
post_sunset_burn,
(Txid([0; 32]), 0),
None,
&[],
)
}
/// build a stacks block with just the coinbase off of
/// parent_block, in the canonical sortition fork of SortitionDB.
/// parent_block _must_ be included in the StacksChainState
/// `txs`: transactions to try to include in block
fn make_stacks_block_with_input(
sort_db: &SortitionDB,
state: &mut StacksChainState,
@@ -816,6 +843,7 @@ fn make_stacks_block_with_input(
post_sunset_burn: bool,
input: (Txid, u32),
parents_sortition_opt: Option<BlockSnapshot>,
txs: &[StacksTransaction],
) -> (BlockstackOperationType, StacksBlock) {
let tx_auth = TransactionAuth::from_p2pkh(miner).unwrap();
@@ -888,6 +916,10 @@ fn make_stacks_block_with_input(
.try_mine_tx(&mut epoch_tx, &coinbase_op, ast_rules)
.unwrap();
for tx in txs {
builder.try_mine_tx(&mut epoch_tx, tx, ast_rules).unwrap();
}
let block = builder.mine_anchored_block(&mut epoch_tx);
builder.epoch_finish(epoch_tx);
@@ -940,7 +972,7 @@ fn make_stacks_block_with_input(
#[test]
fn missed_block_commits_2_05() {
let path = "/tmp/stacks-blockchain-missed_block_commits_2_05";
let path = &test_path("missed_block_commits_2_05");
let _r = std::fs::remove_dir_all(path);
let sunset_ht = 8000;
@@ -1047,6 +1079,7 @@ fn missed_block_commits_2_05() {
false,
last_input.as_ref().unwrap().clone(),
None,
&[],
);
// NOTE: intended for block block_height - 2
last_input = Some((
@@ -1100,6 +1133,7 @@ fn missed_block_commits_2_05() {
false,
last_input.as_ref().unwrap().clone(),
None,
&[],
)
};
@@ -1256,7 +1290,7 @@ fn missed_block_commits_2_05() {
/// in 2.1 due to the bad missed block-commit *not* counting towards the miner's sortition weight.
#[test]
fn missed_block_commits_2_1() {
let path = "/tmp/stacks-blockchain-missed_block_commits_2_1";
let path = &test_path("missed_block_commits_2_1");
let _r = std::fs::remove_dir_all(path);
let sunset_ht = 8000;
@@ -1368,6 +1402,7 @@ fn missed_block_commits_2_1() {
false,
last_input.as_ref().unwrap().clone(),
None,
&[],
);
// NOTE: intended for block block_height - 2
last_input = Some((
@@ -1423,6 +1458,7 @@ fn missed_block_commits_2_1() {
false,
last_input.as_ref().unwrap().clone(),
None,
&[],
)
};
@@ -1596,7 +1632,7 @@ fn missed_block_commits_2_1() {
/// the UTXO chain
#[test]
fn late_block_commits_2_1() {
let path = "/tmp/stacks-blockchain-late_block_commits_2_1";
let path = &test_path("late_block_commits_2_1");
let _r = std::fs::remove_dir_all(path);
let sunset_ht = 8000;
@@ -1705,6 +1741,7 @@ fn late_block_commits_2_1() {
false,
last_input.as_ref().unwrap().clone(),
None,
&[],
);
// NOTE: intended for block block_height - 3
last_input = Some((
@@ -1760,6 +1797,7 @@ fn late_block_commits_2_1() {
false,
last_input.as_ref().unwrap().clone(),
None,
&[],
)
};
@@ -1933,9 +1971,9 @@ fn late_block_commits_2_1() {
#[test]
fn test_simple_setup() {
let path = "/tmp/stacks-blockchain-simple-setup";
let path = &test_path("simple-setup");
// setup a second set of states that won't see the broadcasted blocks
let path_blinded = "/tmp/stacks-blockchain-simple-setup.blinded";
let path_blinded = &test_path("simple-setup.blinded");
let _r = std::fs::remove_dir_all(path);
let _r = std::fs::remove_dir_all(path_blinded);
@@ -2146,7 +2184,7 @@ fn test_simple_setup() {
#[test]
fn test_sortition_with_reward_set() {
let path = "/tmp/stacks-blockchain-simple-reward-set";
let path = &test_path("simple-reward-set");
let _r = std::fs::remove_dir_all(path);
let mut vrf_keys: Vec<_> = (0..150).map(|_| VRFPrivateKey::new()).collect();
@@ -2415,7 +2453,7 @@ fn test_sortition_with_reward_set() {
#[test]
fn test_sortition_with_burner_reward_set() {
let path = "/tmp/stacks-blockchain-burner-reward-set";
let path = &test_path("burner-reward-set");
let _r = std::fs::remove_dir_all(path);
let mut vrf_keys: Vec<_> = (0..150).map(|_| VRFPrivateKey::new()).collect();
@@ -2658,7 +2696,7 @@ fn test_sortition_with_burner_reward_set() {
#[test]
fn test_pox_btc_ops() {
let path = "/tmp/stacks-blockchain-pox-btc-ops";
let path = &test_path("pox-btc-ops");
let _r = std::fs::remove_dir_all(path);
let sunset_ht = 8000;
@@ -2935,7 +2973,7 @@ fn test_pox_btc_ops() {
#[test]
fn test_stx_transfer_btc_ops() {
let path = "/tmp/stacks-blockchain-stx_transfer-btc-ops";
let path = &test_path("stx_transfer-btc-ops");
let _r = std::fs::remove_dir_all(path);
let pox_v1_unlock_ht = u32::max_value();
@@ -3326,7 +3364,7 @@ fn get_delegation_info_pox_2(
// \ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ S30 -> S31 -> ...
#[test]
fn test_delegate_stx_btc_ops() {
let path = "/tmp/stacks-blockchain-delegate-stx-btc-ops";
let path = &test_path("delegate-stx-btc-ops");
let _r = std::fs::remove_dir_all(path);
let pox_v1_unlock_ht = 12;
@@ -3867,7 +3905,7 @@ fn test_initial_coinbase_reward_distributions() {
// panic when trying to re-create the costs-2 contract.
#[test]
fn test_epoch_switch_cost_contract_instantiation() {
let path = "/tmp/stacks-blockchain-epoch-switch-cost-contract-instantiation";
let path = &test_path("epoch-switch-cost-contract-instantiation");
let _r = std::fs::remove_dir_all(path);
let sunset_ht = 8000;
@@ -4066,7 +4104,7 @@ fn test_epoch_switch_cost_contract_instantiation() {
// the test would panic when trying to re-create the pox-2 contract.
#[test]
fn test_epoch_switch_pox_contract_instantiation() {
let path = "/tmp/stacks-blockchain-epoch-switch-pox-contract-instantiation";
let path = &test_path("epoch-switch-pox-contract-instantiation");
let _r = std::fs::remove_dir_all(path);
let sunset_ht = 8000;
@@ -4270,6 +4308,265 @@ fn test_epoch_switch_pox_contract_instantiation() {
}
}
#[test]
fn atlas_stop_start() {
let path = &test_path("atlas_stop_start");
let _r = std::fs::remove_dir_all(path);
let sunset_ht = 8000;
let pox_consts = Some(PoxConstants::new(6, 3, 3, 25, 5, 10, sunset_ht, 10));
let burnchain_conf = get_burnchain(path, pox_consts.clone());
// publish a simple contract used to generate atlas attachment instances
let atlas_contract_content = "
(define-data-var attachment-index uint u1)
(define-public (make-attach (zonefile-hash (buff 20)))
(let ((current-index (var-get attachment-index)))
(print {
attachment: {
hash: zonefile-hash,
attachment-index: current-index,
metadata: \"test-meta\"
}
})
(var-set attachment-index (+ u1 current-index))
(ok true)))";
let atlas_name: clarity::vm::ContractName = "atlas-test".into();
let vrf_keys: Vec<_> = (0..15).map(|_| VRFPrivateKey::new()).collect();
let committers: Vec<_> = (0..15).map(|_| StacksPrivateKey::new()).collect();
let signer_sk = StacksPrivateKey::new();
let signer_pk = p2pkh_from(&signer_sk);
let balance = 6_000_000_000 * (core::MICROSTACKS_PER_STACKS as u64);
let stacked_amt = 1_000_000_000 * (core::MICROSTACKS_PER_STACKS as u128);
let initial_balances = vec![(signer_pk.clone().into(), balance)];
let atlas_qci = QualifiedContractIdentifier::new(signer_pk.clone().into(), atlas_name.clone());
// include our simple contract in the atlas config
let mut atlas_config = AtlasConfig::default(false);
atlas_config.contracts.insert(atlas_qci.clone());
setup_states(
&[path],
&vrf_keys,
&committers,
pox_consts.clone(),
Some(initial_balances),
StacksEpochId::Epoch21,
);
let mut coord = make_coordinator_atlas(
path,
Some(burnchain_conf.clone()),
Some(atlas_config.clone()),
);
coord.handle_new_burnchain_block().unwrap();
let sort_db = get_sortition_db(path, pox_consts.clone());
let tip = SortitionDB::get_canonical_burn_chain_tip(sort_db.conn()).unwrap();
assert_eq!(tip.block_height, 1);
assert_eq!(tip.sortition, false);
let (_, ops) = sort_db
.get_sortition_result(&tip.sortition_id)
.unwrap()
.unwrap();
// we should have all the VRF registrations accepted
assert_eq!(ops.accepted_ops.len(), vrf_keys.len());
assert_eq!(ops.consumed_leader_keys.len(), 0);
// process sequential blocks, and their sortitions...
let mut stacks_blocks: Vec<(SortitionId, StacksBlock)> = vec![];
let mut contract_publish = StacksTransaction::new(
TransactionVersion::Testnet,
TransactionAuth::from_p2pkh(&signer_sk).unwrap(),
TransactionPayload::SmartContract(
TransactionSmartContract {
name: atlas_name.clone(),
code_body: StacksString::from_str(atlas_contract_content).unwrap(),
},
None,
),
);
contract_publish.chain_id = 0x80000000;
contract_publish.anchor_mode = TransactionAnchorMode::OnChainOnly;
contract_publish.auth.set_origin_nonce(0);
contract_publish.auth.set_tx_fee(100);
let mut signer = StacksTransactionSigner::new(&contract_publish);
signer.sign_origin(&signer_sk).unwrap();
let contract_publish = signer.get_tx().unwrap();
let make_attachments: Vec<StacksTransaction> = (0..5)
.map(|ix| {
(
ix,
StacksTransaction::new(
TransactionVersion::Testnet,
TransactionAuth::from_p2pkh(&signer_sk).unwrap(),
TransactionPayload::ContractCall(TransactionContractCall {
address: signer_pk.clone().into(),
contract_name: atlas_name.clone(),
function_name: "make-attach".into(),
function_args: vec![Value::buff_from(vec![ix; 20]).unwrap()],
}),
),
)
})
.map(|(ix, mut cc_tx)| {
cc_tx.chain_id = 0x80000000;
cc_tx.anchor_mode = TransactionAnchorMode::OnChainOnly;
cc_tx.auth.set_origin_nonce(ix as u64 + 1);
cc_tx.auth.set_tx_fee(100);
let mut signer = StacksTransactionSigner::new(&cc_tx);
signer.sign_origin(&signer_sk).unwrap();
signer.get_tx().unwrap()
})
.collect();
for ix in 0..3 {
let vrf_key = &vrf_keys[ix];
let miner = &committers[ix];
let mut burnchain = get_burnchain_db(path, pox_consts.clone());
let mut chainstate = get_chainstate(path);
let parent = if ix == 0 {
BlockHeaderHash([0; 32])
} else {
stacks_blocks[ix - 1].1.header.block_hash()
};
let burnchain_tip = burnchain.get_canonical_chain_tip().unwrap();
let b = get_burnchain(path, pox_consts.clone());
let next_mock_header = BurnchainBlockHeader {
block_height: burnchain_tip.block_height + 1,
block_hash: BurnchainHeaderHash([0; 32]),
parent_block_hash: burnchain_tip.block_hash,
num_txs: 0,
timestamp: 1,
};
let reward_cycle_info = coord.get_reward_cycle_info(&next_mock_header).unwrap();
let txs = if ix == 1 {
vec![contract_publish.clone()]
} else if ix == 2 {
make_attachments.clone()
} else {
vec![]
};
let (good_op, block) = if ix == 0 {
make_genesis_block_with_recipients(
&sort_db,
&mut chainstate,
&parent,
miner,
10000,
vrf_key,
ix as u32,
None,
)
} else {
make_stacks_block_with_input(
&sort_db,
&mut chainstate,
&b,
&parent,
burnchain_tip.block_height,
miner,
1000,
vrf_key,
ix as u32,
None,
0,
false,
(Txid([0; 32]), 0),
None,
&txs,
)
};
let expected_winner = good_op.txid();
let ops = vec![good_op];
let burnchain_tip = burnchain.get_canonical_chain_tip().unwrap();
produce_burn_block(
&b,
&mut burnchain,
&burnchain_tip.block_hash,
ops,
vec![].iter_mut(),
);
// handle the sortition
coord.handle_new_burnchain_block().unwrap();
let tip = SortitionDB::get_canonical_burn_chain_tip(sort_db.conn()).unwrap();
assert_eq!(&tip.winning_block_txid, &expected_winner);
// load the block into staging
let block_hash = block.header.block_hash();
assert_eq!(&tip.winning_stacks_block_hash, &block_hash);
stacks_blocks.push((tip.sortition_id.clone(), block.clone()));
preprocess_block(&mut chainstate, &sort_db, &tip, block);
// handle the stacks block
coord.handle_new_stacks_block().unwrap();
let stacks_tip = SortitionDB::get_canonical_stacks_chain_tip_hash(sort_db.conn()).unwrap();
let burn_block_height = tip.block_height;
// check that the bns contract exists
let does_bns_contract_exist = chainstate
.with_read_only_clarity_tx(
&sort_db.index_conn(),
&StacksBlockId::new(&stacks_tip.0, &stacks_tip.1),
|conn| {
conn.with_clarity_db_readonly(|db| db.get_contract(&boot_code_id("bns", false)))
},
)
.unwrap();
assert!(does_bns_contract_exist.is_ok());
}
// okay, we've broadcasted some transactions, lets check that the atlas db has a queue
let atlas_queue = coord
.atlas_db
.as_ref()
.unwrap()
.queued_attachments()
.unwrap();
assert_eq!(
atlas_queue.len(),
make_attachments.len(),
"Should be as many queued attachments, as attachment txs submitted"
);
// now, we'll shut down all the coordinator connections and reopen them
// to ensure that the queue remains in place
let coord = (); // dispose of the coordinator, closing all its connections
let coord = make_coordinator_atlas(path, Some(burnchain_conf), Some(atlas_config));
let atlas_queue = coord
.atlas_db
.as_ref()
.unwrap()
.queued_attachments()
.unwrap();
assert_eq!(
atlas_queue.len(),
make_attachments.len(),
"Should be as many queued attachments, as attachment txs submitted"
);
}
fn get_total_stacked_info(
chainstate: &mut StacksChainState,
burn_dbconn: &dyn BurnStateDB,
@@ -4309,7 +4606,7 @@ fn get_total_stacked_info(
// sent should occur in the "pox.clar" contract.
#[test]
fn test_epoch_verify_active_pox_contract() {
let path = "/tmp/stacks-blockchain-verify-active-pox-contract";
let path = &test_path("verify-active-pox-contract");
let _r = std::fs::remove_dir_all(path);
let pox_v1_unlock_ht = 12;
@@ -4598,7 +4895,7 @@ fn test_epoch_verify_active_pox_contract() {
}
fn test_sortition_with_sunset() {
let path = "/tmp/stacks-blockchain-sortition-with-sunset";
let path = &test_path("sortition-with-sunset");
let _r = std::fs::remove_dir_all(path);
@@ -4903,7 +5200,7 @@ fn test_sortition_with_sunset() {
/// Epoch 2.1 activates at block 50 (n.b. reward cycles are 6 blocks long)
#[test]
fn test_sortition_with_sunset_and_epoch_switch() {
let path = "/tmp/stacks-blockchain-sortition-with-sunset-and-epoch-switch";
let path = &test_path("sortition-with-sunset-and-epoch-switch");
let _r = std::fs::remove_dir_all(path);
let rc_len = 6;
@@ -5251,10 +5548,9 @@ fn test_sortition_with_sunset_and_epoch_switch() {
/// (because its parent is block `0`, and nobody stacks in
/// this test, all block commits must burn)
fn test_pox_processable_block_in_different_pox_forks() {
let path = "/tmp/stacks-blockchain.test.pox_processable_block_in_different_pox_forks";
let path = &test_path("pox_processable_block_in_different_pox_forks");
// setup a second set of states that won't see the broadcasted blocks
let path_blinded =
"/tmp/stacks-blockchain.test.pox_processable_block_in_different_pox_forks.blinded";
let path_blinded = &test_path("pox_processable_block_in_different_pox_forks.blinded");
let _r = std::fs::remove_dir_all(path);
let _r = std::fs::remove_dir_all(path_blinded);
@@ -5551,9 +5847,9 @@ fn test_pox_processable_block_in_different_pox_forks() {
#[test]
fn test_pox_no_anchor_selected() {
let path = "/tmp/stacks-blockchain.test.pox_fork_no_anchor_selected";
let path = &test_path("pox_fork_no_anchor_selected");
// setup a second set of states that won't see the broadcasted blocks
let path_blinded = "/tmp/stacks-blockchain.test.pox_fork_no_anchor_selected.blinded";
let path_blinded = &test_path("pox_fork_no_anchor_selected.blinded");
let _r = std::fs::remove_dir_all(path);
let _r = std::fs::remove_dir_all(path_blinded);
@@ -5765,9 +6061,9 @@ fn test_pox_no_anchor_selected() {
#[test]
fn test_pox_fork_out_of_order() {
let path = "/tmp/stacks-blockchain.test.pox_fork_out_of_order";
let path = &test_path("pox_fork_out_of_order");
// setup a second set of states that won't see the broadcasted blocks
let path_blinded = "/tmp/stacks-blockchain.test.pox_fork_out_of_order.blinded";
let path_blinded = &test_path("pox_fork_out_of_order.blinded");
let _r = std::fs::remove_dir_all(path);
let _r = std::fs::remove_dir_all(path_blinded);
@@ -6179,7 +6475,7 @@ fn preprocess_block(
#[test]
fn test_check_chainstate_db_versions() {
let path = "/tmp/stacks-blockchain-check_chainstate_db_versions";
let path = &test_path("check_chainstate_db_versions");
let _ = std::fs::remove_dir_all(path);
let sortdb_path = format!("{}/sortdb", &path);

View File

@@ -14,7 +14,31 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//!
//! The `AtlasDB` stores `Attachment` and `AttachmentInstance` objects.
//! `AttachmentInstance` objects indicate what corresponding `Attachment`
//! data a node is interesting in fetching and storing.
//!
//! In the `AtlasDB`, `AttachmentInstance` objects have a status field
//! and an availability field. The status field indicates whether or
//! not the attachment instance has been checked by the
//! `AttachmentDownloader`. The `AttachmentDownloader` does not
//! immediately check entries before insertion in the database:
//! Atlas event processing and downloader logic proceed in
//! different threads.
//!
//! Once the `AttachmentDownloader` checks an attachment instance, it
//! either marks the instance as available (if the content data is
//! already stored on the node) or it adds the attachment instance
//! to its download queue.
//!
use rusqlite::types::FromSql;
use rusqlite::types::FromSqlError;
use rusqlite::types::ToSql;
use rusqlite::types::ToSqlOutput;
use rusqlite::types::ValueRef;
use rusqlite::OptionalExtension;
use rusqlite::Row;
use rusqlite::Transaction;
use rusqlite::{Connection, OpenFlags, NO_PARAMS};
@@ -47,7 +71,16 @@ use crate::types::chainstate::StacksBlockId;
use super::{AtlasConfig, Attachment, AttachmentInstance};
pub const ATLASDB_VERSION: &'static str = "1";
pub const ATLASDB_VERSION: &'static str = "2";
/// The maximum number of atlas attachment instances that should be
/// checked at once (this is used to limit the return size of
/// `queued_attachments`). Because these checks will sometimes surface
/// existing attachment data associated with those instances, the
/// memory impact of these checks is not limited to the
/// AttachmentInstance size (which is small), but can include the
/// Attachment as well (which is larger).
pub const MAX_PROCESS_PER_ROUND: u32 = 1_000;
const ATLASDB_INITIAL_SCHEMA: &'static [&'static str] = &[
r#"
@@ -73,8 +106,41 @@ const ATLASDB_INITIAL_SCHEMA: &'static [&'static str] = &[
"CREATE TABLE db_config(version TEXT NOT NULL);",
];
const ATLASDB_INDEXES: &'static [&'static str] =
&["CREATE INDEX IF NOT EXISTS index_was_instantiated ON attachments(was_instantiated);"];
const ATLASDB_SCHEMA_2: &'static [&'static str] = &[
// We have to allow status to be null, because SQLite won't let us add
// a not null column without a default. The default defeats the point of
// having not-null here anyways, so we leave this field nullable.
r#"
ALTER TABLE attachment_instances
ADD status INTEGER
;"#,
// All of the attachment instances that previously existed in the database
// already were "checked", so set status to 2 (which corresponds to "checked").
r#"
UPDATE attachment_instances SET status = 2;
"#,
];
const ATLASDB_INDEXES: &'static [&'static str] = &[
"CREATE INDEX IF NOT EXISTS index_was_instantiated ON attachments(was_instantiated);",
"CREATE INDEX IF NOT EXISTS index_instance_status ON attachment_instances(status);",
];
/// Attachment instances pass through different states once written to the AtlasDB.
/// These instances are initially written as a new Stacks block is processed, and marked
/// as `Queued`. These queued instances contain all the data of the new attachment instance,
/// but they have not yet been checked against the AtlasDB to determine if there is extant
/// Attachment content/data associated with them. The network run loop (`p2p` thread) checks
/// for any queued attachment instances on each pass, and performs that check. Once the check
/// is completed, any checked instances are updated to `Checked`.
pub enum AttachmentInstanceStatus {
/// This variant indicates that the attachments instance has been written,
/// but the AtlasDownloader has not yet checked that the attachment matched
Queued,
/// This variant indicates that the attachments instance has been written,
/// and checked for whether or not an already existing attachment matched
Checked,
}
impl FromRow<Attachment> for Attachment {
fn from_row<'a>(row: &'a Row) -> Result<Attachment, db_error> {
@@ -117,6 +183,27 @@ impl FromRow<(u32, u32)> for (u32, u32) {
}
}
impl ToSql for AttachmentInstanceStatus {
fn to_sql(&self) -> Result<ToSqlOutput<'_>, rusqlite::Error> {
let integer_rep: i64 = match self {
AttachmentInstanceStatus::Queued => 1,
AttachmentInstanceStatus::Checked => 2,
};
Ok(integer_rep.into())
}
}
impl FromSql for AttachmentInstanceStatus {
fn column_result(value: ValueRef<'_>) -> Result<Self, FromSqlError> {
let integer_rep: i64 = value.as_i64()?;
match integer_rep {
1 => Ok(AttachmentInstanceStatus::Queued),
2 => Ok(AttachmentInstanceStatus::Checked),
x => Err(FromSqlError::OutOfRange(x)),
}
}
}
#[derive(Debug)]
pub struct AtlasDB {
pub atlas_config: AtlasConfig,
@@ -134,20 +221,32 @@ impl AtlasDB {
Ok(())
}
/// Get the database schema version, given a DB connection
fn get_schema_version(conn: &Connection) -> Result<String, db_error> {
let version = conn.query_row(
"SELECT MAX(version) from db_config",
rusqlite::NO_PARAMS,
|row| row.get(0),
)?;
Ok(version)
}
fn instantiate(&mut self) -> Result<(), db_error> {
let genesis_attachments = self.atlas_config.genesis_attachments.take();
let tx = self.tx_begin()?;
for row_text in ATLASDB_INITIAL_SCHEMA {
tx.execute_batch(row_text).map_err(db_error::SqliteError)?;
tx.execute_batch(row_text)?;
}
for row_text in ATLASDB_SCHEMA_2 {
tx.execute_batch(row_text)?;
}
tx.execute(
"INSERT INTO db_config (version) VALUES (?1)",
&[&ATLASDB_VERSION],
)
.map_err(db_error::SqliteError)?;
)?;
if let Some(attachments) = genesis_attachments {
let now = util::get_epoch_time_secs() as i64;
@@ -193,7 +292,7 @@ impl AtlasDB {
// If opened for read/write and it doesn't exist, instantiate it.
pub fn connect(
atlas_config: AtlasConfig,
path: &String,
path: &str,
readwrite: bool,
) -> Result<AtlasDB, db_error> {
let mut create_flag = false;
@@ -213,8 +312,17 @@ impl AtlasDB {
OpenFlags::SQLITE_OPEN_READ_ONLY
}
};
let conn = sqlite_open(path, open_flags, false)?;
Self::check_instantiate_db(atlas_config, conn, readwrite, create_flag)
}
/// Inner method for instantiating the db if necessary, updating the schema, or adding indexes
fn check_instantiate_db(
atlas_config: AtlasConfig,
conn: Connection,
readwrite: bool,
create_flag: bool,
) -> Result<AtlasDB, db_error> {
let mut db = AtlasDB {
atlas_config,
conn,
@@ -224,11 +332,65 @@ impl AtlasDB {
db.instantiate()?;
}
if readwrite {
db.check_schema_version_and_update()?;
db.add_indexes()?;
} else {
db.check_schema_version_or_error()?;
}
Ok(db)
}
fn check_schema_version_or_error(&mut self) -> Result<(), db_error> {
match Self::get_schema_version(self.conn()) {
Ok(version) => {
let expected_version = ATLASDB_VERSION.to_string();
if version == expected_version {
Ok(())
} else {
let version = version.parse().expect(
"Invalid schema version for AtlasDB: should be a parseable integer",
);
Err(db_error::OldSchema(version))
}
}
Err(e) => panic!("Error obtaining the version of the Atlas DB: {:?}", e),
}
}
fn apply_schema_2(db_conn: &Connection) -> Result<(), db_error> {
for row_text in ATLASDB_SCHEMA_2 {
db_conn.execute_batch(row_text)?;
}
db_conn.execute(
"INSERT OR REPLACE INTO db_config (version) VALUES (?1)",
&["2"],
)?;
Ok(())
}
fn check_schema_version_and_update(&mut self) -> Result<(), db_error> {
let tx = self.tx_begin()?;
match AtlasDB::get_schema_version(&tx) {
Ok(version) => {
let expected_version = ATLASDB_VERSION.to_string();
if version == expected_version {
return Ok(());
}
if version == "1" {
Self::apply_schema_2(&tx)?;
tx.commit()?;
Ok(())
} else {
panic!("The schema version of the Atlas DB is invalid.")
}
}
Err(e) => panic!("Error obtaining the version of the Atlas DB: {:?}", e),
}
}
// Open an atlas database in memory (used for testing)
#[cfg(test)]
pub fn connect_memory(atlas_config: AtlasConfig) -> Result<AtlasDB, db_error> {
@@ -243,6 +405,60 @@ impl AtlasDB {
Ok(db)
}
#[cfg(test)]
/// Only ever to be used in testing, open and instantiate a V1 atlasdb
pub fn connect_memory_db_v1(atlas_config: AtlasConfig) -> Result<AtlasDB, db_error> {
let conn = Connection::open_in_memory()?;
let mut db = AtlasDB {
atlas_config,
conn,
readwrite: true,
};
let genesis_attachments = db.atlas_config.genesis_attachments.take();
let tx = db.tx_begin()?;
for row_text in ATLASDB_INITIAL_SCHEMA {
tx.execute_batch(row_text)?;
}
tx.execute("INSERT INTO db_config (version) VALUES (?1)", &["1"])?;
if let Some(attachments) = genesis_attachments {
let now = util::get_epoch_time_secs() as i64;
for attachment in attachments {
tx.execute(
"INSERT INTO attachments (hash, content, was_instantiated, created_at) VALUES (?, ?, 1, ?)",
rusqlite::params![
&attachment.hash(),
&attachment.content,
&now,
],
)?;
}
}
tx.commit()?;
let tx = db.tx_begin()?;
for row_text in &ATLASDB_INDEXES[0..1] {
tx.execute_batch(row_text)?;
}
tx.commit()?;
Ok(db)
}
#[cfg(test)]
/// Only ever to be used in testing, connect to db, but using existing sqlconn
pub fn connect_with_sqlconn(
atlas_config: AtlasConfig,
conn: Connection,
) -> Result<AtlasDB, db_error> {
Self::check_instantiate_db(atlas_config, conn, true, false)
}
pub fn conn(&self) -> &Connection {
&self.conn
}
@@ -387,19 +603,13 @@ impl AtlasDB {
let tx = self.tx_begin()?;
tx.execute(
"INSERT OR REPLACE INTO attachments (hash, content, was_instantiated, created_at) VALUES (?, ?, 1, ?)",
&[
&attachment.hash() as &dyn ToSql,
&attachment.content as &dyn ToSql,
&now as &dyn ToSql,
],
)
.map_err(db_error::SqliteError)?;
rusqlite::params![&attachment.hash(), &attachment.content, &now],
)?;
tx.execute(
"UPDATE attachment_instances SET is_available = 1 WHERE content_hash = ?1",
&[&attachment.hash() as &dyn ToSql],
)
.map_err(db_error::SqliteError)?;
tx.commit().map_err(db_error::SqliteError)?;
"UPDATE attachment_instances SET is_available = 1 WHERE content_hash = ?1 AND status = ?2",
rusqlite::params![&attachment.hash(), &AttachmentInstanceStatus::Checked],
)?;
tx.commit()?;
Ok(())
}
@@ -434,19 +644,19 @@ impl AtlasDB {
pub fn find_unresolved_attachment_instances(
&mut self,
) -> Result<Vec<AttachmentInstance>, db_error> {
let qry = "SELECT * FROM attachment_instances WHERE is_available = 0".to_string();
let rows = query_rows::<AttachmentInstance, _>(&self.conn, &qry, NO_PARAMS)?;
let qry = "SELECT * FROM attachment_instances WHERE is_available = 0 AND status = ?";
let rows = query_rows(&self.conn, qry, &[&AttachmentInstanceStatus::Checked])?;
Ok(rows)
}
pub fn find_all_attachment_instances(
&mut self,
&self,
content_hash: &Hash160,
) -> Result<Vec<AttachmentInstance>, db_error> {
let hex_content_hash = to_hex(&content_hash.0[..]);
let qry = "SELECT * FROM attachment_instances WHERE content_hash = ?1".to_string();
let args = [&hex_content_hash as &dyn ToSql];
let rows = query_rows::<AttachmentInstance, _>(&self.conn, &qry, &args)?;
let qry = "SELECT * FROM attachment_instances WHERE content_hash = ?1 AND status = ?2";
let args = rusqlite::params![&hex_content_hash, &AttachmentInstanceStatus::Checked];
let rows = query_rows(&self.conn, qry, args)?;
Ok(rows)
}
@@ -462,31 +672,88 @@ impl AtlasDB {
Ok(row)
}
pub fn insert_uninstantiated_attachment_instance(
/// Queue a new attachment instance, status will be set to "queued",
/// and the is_available field set to false.
///
/// This is invoked after block processing by the coordinator thread (which
/// handles atlas event logic).
pub fn queue_attachment_instance(
&mut self,
attachment: &AttachmentInstance,
) -> Result<(), db_error> {
self.insert_attachment_instance(attachment, AttachmentInstanceStatus::Queued, false)
}
/// Insert an attachment instance from an initial batch.
/// All such instances are marked "checked", and is_available = true
///
/// This is invoked by the AtlasDownloader when it first runs. The AtlasDownloader
/// is currently managed in the P2P thread.
pub fn insert_initial_attachment_instance(
&mut self,
attachment: &AttachmentInstance,
) -> Result<(), db_error> {
self.insert_attachment_instance(attachment, AttachmentInstanceStatus::Checked, true)
}
/// Return all the queued attachment instances, limited by `MAX_PROCESS_PER_ROUND`
pub fn queued_attachments(&self) -> Result<Vec<AttachmentInstance>, db_error> {
query_rows(
&self.conn,
"SELECT * FROM attachment_instances WHERE status = ?1 LIMIT ?2",
rusqlite::params![&AttachmentInstanceStatus::Queued, MAX_PROCESS_PER_ROUND],
)
}
/// Update a queued attachment to "checked", setting the `is_available` field.
pub fn mark_attachment_instance_checked(
&mut self,
attachment: &AttachmentInstance,
is_available: bool,
) -> Result<(), db_error> {
let hex_content_hash = to_hex(&attachment.content_hash.0[..]);
let hex_tx_id = attachment.tx_id.to_hex();
let tx = self.tx_begin()?;
self.conn.execute(
"UPDATE attachment_instances SET status = ?1, is_available = ?2
WHERE index_block_hash = ?3 AND contract_id = ?4 AND attachment_index = ?5",
rusqlite::params![
&AttachmentInstanceStatus::Checked,
&is_available,
&attachment.index_block_hash,
&attachment.contract_id.to_string(),
&attachment.attachment_index,
],
)?;
Ok(())
}
/// Insert an attachment instance.
fn insert_attachment_instance(
&mut self,
attachment: &AttachmentInstance,
status: AttachmentInstanceStatus,
is_available: bool,
) -> Result<(), db_error> {
let sql_tx = self.tx_begin()?;
let now = util::get_epoch_time_secs() as i64;
let res = tx.execute(
"INSERT OR REPLACE INTO attachment_instances (content_hash, created_at, index_block_hash, attachment_index, block_height, is_available, metadata, contract_id, tx_id) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
&[
&hex_content_hash as &dyn ToSql,
&now as &dyn ToSql,
&attachment.index_block_hash as &dyn ToSql,
&attachment.attachment_index as &dyn ToSql,
sql_tx.execute(
"INSERT OR REPLACE INTO attachment_instances (
content_hash, created_at, index_block_hash,
attachment_index, block_height, is_available,
metadata, contract_id, tx_id, status)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
rusqlite::params![
&attachment.content_hash,
&now,
&attachment.index_block_hash,
&attachment.attachment_index,
&u64_to_sql(attachment.stacks_block_height)?,
&is_available as &dyn ToSql,
&attachment.metadata as &dyn ToSql,
&attachment.contract_id.to_string() as &dyn ToSql,
&hex_tx_id as &dyn ToSql,
]
);
res.map_err(db_error::SqliteError)?;
tx.commit().map_err(db_error::SqliteError)?;
&is_available,
&attachment.metadata,
&attachment.contract_id.to_string(),
&attachment.tx_id,
&status
],
)?;
sql_tx.commit()?;
Ok(())
}
}

View File

@@ -114,12 +114,7 @@ impl AttachmentsDownloader {
// Handle initial batch
if self.initial_batch.len() > 0 {
let mut batch = HashSet::new();
for attachment_instance in self.initial_batch.drain(..) {
batch.insert(attachment_instance);
}
let mut resolved =
self.enqueue_new_attachments(&mut batch, &mut network.atlasdb, true)?;
let mut resolved = self.enqueue_initial_attachments(&mut network.atlasdb)?;
resolved_attachments.append(&mut resolved);
}
@@ -236,64 +231,69 @@ impl AttachmentsDownloader {
Ok((resolved_attachments, events_to_deregister))
}
pub fn enqueue_new_attachments(
/// Given a list of `AttachmentInstance`, check if the content corresponding to that
/// instance is (1) already validated (2) inboxed or (3) unknown.
///
/// In the event of (1) or (2), `do_if_found` is invoked, and the attachment instance will
/// be returned (with the attachment data) in the result set. If the attachment was inboxed (case 2),
/// the attachment is marked as instantiated in the atlas db.
///
/// In the event of (3), `do_if_not_found` is invoked, and the attachment instance is added
/// to `self.priority_queue`.
///
/// The return value of this function is a vector of all the instances from `iterator` which
/// resolved to Attachment data, paired with that data.
fn check_attachment_instances<F, G>(
&mut self,
new_attachments: &mut HashSet<AttachmentInstance>,
atlasdb: &mut AtlasDB,
initial_batch: bool,
) -> Result<Vec<(AttachmentInstance, Attachment)>, DBError> {
if new_attachments.is_empty() {
return Ok(vec![]);
}
atlas_db: &mut AtlasDB,
iterator: Vec<AttachmentInstance>,
do_if_found: F,
do_if_not_found: G,
) -> Result<Vec<(AttachmentInstance, Attachment)>, DBError>
where
F: Fn(&mut AtlasDB, &AttachmentInstance) -> Result<(), DBError>,
G: Fn(&mut AtlasDB, &AttachmentInstance) -> Result<(), DBError>,
{
let mut attachments_batches: HashMap<StacksBlockId, AttachmentsBatch> = HashMap::new();
let mut resolved_attachments = vec![];
for attachment_instance in new_attachments.drain() {
// Are we dealing with an empty hash - allowed for undoing onchain binding
for attachment_instance in iterator {
if attachment_instance.content_hash == Hash160::empty() {
// todo(ludo) insert or update ?
atlasdb.insert_uninstantiated_attachment_instance(&attachment_instance, true)?;
// Are we dealing with an empty hash - allowed for undoing onchain binding
do_if_found(atlas_db, &attachment_instance)?;
debug!("Atlas: inserting and pairing new attachment instance with empty hash");
resolved_attachments.push((attachment_instance, Attachment::empty()));
continue;
}
// Do we already have a matching validated attachment
if let Ok(Some(entry)) = atlasdb.find_attachment(&attachment_instance.content_hash) {
atlasdb.insert_uninstantiated_attachment_instance(&attachment_instance, true)?;
} else if let Ok(Some(entry)) =
atlas_db.find_attachment(&attachment_instance.content_hash)
{
// Do we already have a matching validated attachment
do_if_found(atlas_db, &attachment_instance)?;
debug!(
"Atlas: inserting and pairing new attachment instance to existing attachment"
);
resolved_attachments.push((attachment_instance, entry));
continue;
}
// Do we already have a matching inboxed attachment
if let Ok(Some(attachment)) =
atlasdb.find_uninstantiated_attachment(&attachment_instance.content_hash)
} else if let Ok(Some(attachment)) =
atlas_db.find_uninstantiated_attachment(&attachment_instance.content_hash)
{
atlasdb.insert_instantiated_attachment(&attachment)?;
atlasdb.insert_uninstantiated_attachment_instance(&attachment_instance, true)?;
// Do we already have a matching inboxed attachment
atlas_db.insert_instantiated_attachment(&attachment)?;
do_if_found(atlas_db, &attachment_instance)?;
debug!("Atlas: inserting and pairing new attachment instance to inboxed attachment, now validated");
resolved_attachments.push((attachment_instance, attachment));
continue;
}
} else {
// This attachment refers to an unknown attachment.
// Let's append it to the batch being constructed in this routine.
match attachments_batches.entry(attachment_instance.index_block_hash) {
Entry::Occupied(entry) => {
entry.into_mut().track_attachment(&attachment_instance);
}
Entry::Vacant(v) => {
let mut batch = AttachmentsBatch::new();
batch.track_attachment(&attachment_instance);
v.insert(batch);
}
};
// This attachment in refering to an unknown attachment.
// Let's append it to the batch being constructed in this routine.
match attachments_batches.entry(attachment_instance.index_block_hash) {
Entry::Occupied(entry) => {
entry.into_mut().track_attachment(&attachment_instance);
}
Entry::Vacant(v) => {
let mut batch = AttachmentsBatch::new();
batch.track_attachment(&attachment_instance);
v.insert(batch);
}
};
if !initial_batch {
atlasdb.insert_uninstantiated_attachment_instance(&attachment_instance, false)?;
do_if_not_found(atlas_db, &attachment_instance)?;
}
}
@@ -303,6 +303,58 @@ impl AttachmentsDownloader {
Ok(resolved_attachments)
}
/// Check any queued attachment instances to see if we already have data for them,
/// returning a vector of (instance, attachment) pairs for any of the queued attachments
/// which already had the associated data
/// Marks any processed attachments as checked
///
/// This method is invoked in the thread managing the AttachmentDownloader. This is currently
/// the P2P thread.
pub fn check_queued_attachment_instances(
&mut self,
atlas_db: &mut AtlasDB,
) -> Result<Vec<(AttachmentInstance, Attachment)>, DBError> {
let new_attachments = atlas_db.queued_attachments()?;
self.check_attachment_instances(
atlas_db,
new_attachments,
|atlas_db, attachment_instance| {
atlas_db.mark_attachment_instance_checked(&attachment_instance, true)
},
|atlas_db, attachment_instance| {
atlas_db.mark_attachment_instance_checked(&attachment_instance, false)
},
)
}
/// Insert the initial attachments set. Only add the attachment instance if associated data
/// was found.
pub fn enqueue_initial_attachments(
&mut self,
atlas_db: &mut AtlasDB,
) -> Result<Vec<(AttachmentInstance, Attachment)>, DBError> {
if self.initial_batch.is_empty() {
return Ok(vec![]);
}
// we're draining the initial batch, so to avoid angering The Borrow Checker
// use mem replace to just take the whole vec.
let initial_batch = std::mem::replace(&mut self.initial_batch, vec![]);
self.check_attachment_instances(
atlas_db,
initial_batch,
|atlas_db, attachment_instance| {
atlas_db.insert_initial_attachment_instance(&attachment_instance)
},
|_atlas_db, _attachment_instance| {
// If attachment not found, don't insert attachment instance
Ok(())
},
)
}
}
#[derive(Debug)]

View File

@@ -80,6 +80,7 @@ impl AtlasConfig {
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
/// Attachments are the content associated with an AttachmentInstance
pub struct Attachment {
pub content: Vec<u8>,
}
@@ -99,6 +100,10 @@ impl Attachment {
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
/// An attachment instance is a reference to atlas data: a commitment
/// to track the content that is the inverse of `content_hash`.
/// Attachment instances are created by atlas events issued by contracts
/// specified in a node's `AtlasConfig`.
pub struct AttachmentInstance {
pub content_hash: Hash160,
pub attachment_index: u32,

View File

@@ -28,6 +28,7 @@ use crate::net::{
PeerHost, Requestable,
};
use crate::util_lib::boot::boot_code_id;
use crate::util_lib::db::u64_to_sql;
use crate::util_lib::strings::UrlString;
use clarity::vm::types::QualifiedContractIdentifier;
use stacks_common::types::chainstate::BlockHeaderHash;
@@ -774,6 +775,104 @@ fn test_keep_uninstantiated_attachments() {
);
}
#[test]
fn schema_2_migration() {
let atlas_config = AtlasConfig {
contracts: HashSet::new(),
attachments_max_size: 1024,
max_uninstantiated_attachments: 10,
uninstantiated_attachments_expire_after: 0,
unresolved_attachment_instances_expire_after: 10,
genesis_attachments: None,
};
let atlas_db = AtlasDB::connect_memory_db_v1(atlas_config.clone()).unwrap();
let conn = atlas_db.conn;
let attachments = [
AttachmentInstance {
// content_hash, index_block_hash, and txid must contain hex letters!
// because their fields are declared `STRING`, if you supply all numerals,
// sqlite assigns the field a REAL affinity (instead of TEXT)
content_hash: Hash160([0xa0; 20]),
attachment_index: 1,
stacks_block_height: 1,
index_block_hash: StacksBlockId([0xb1; 32]),
metadata: "".into(),
contract_id: QualifiedContractIdentifier::transient(),
tx_id: Txid([0x2f; 32]),
canonical_stacks_tip_height: None,
},
AttachmentInstance {
content_hash: Hash160([0x00; 20]),
attachment_index: 1,
stacks_block_height: 1,
index_block_hash: StacksBlockId([0x0a; 32]),
metadata: "".into(),
contract_id: QualifiedContractIdentifier::transient(),
tx_id: Txid([0x0b; 32]),
canonical_stacks_tip_height: None,
},
];
for attachment in attachments.iter() {
// need to manually insert data, because the insertion routine in the codebase
// sets `status` which doesn't exist in v1
conn.execute(
"INSERT OR REPLACE INTO attachment_instances (
content_hash, created_at, index_block_hash,
attachment_index, block_height, is_available,
metadata, contract_id, tx_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
rusqlite::params![
&attachment.content_hash,
&0,
&attachment.index_block_hash,
&attachment.attachment_index,
&u64_to_sql(attachment.stacks_block_height).unwrap(),
&true,
&attachment.metadata,
&attachment.contract_id.to_string(),
&attachment.tx_id,
],
)
.unwrap();
}
// perform the migration and unwrap() to assert that it runs okay
let atlas_db = AtlasDB::connect_with_sqlconn(atlas_config, conn).unwrap();
let mut attachments_fetched_a0 = atlas_db
.find_all_attachment_instances(&Hash160([0xa0; 20]))
.unwrap();
assert_eq!(
attachments_fetched_a0.len(),
1,
"Should have one attachment instance marked 'checked' with hash `0xa0a0a0..`"
);
let attachment_a0 = attachments_fetched_a0.pop().unwrap();
assert_eq!(&attachment_a0, &attachments[0]);
let mut attachments_fetched_00 = atlas_db
.find_all_attachment_instances(&Hash160([0x00; 20]))
.unwrap();
assert_eq!(
attachments_fetched_00.len(),
1,
"Should have one attachment instance marked 'checked' with hash `0x000000..`"
);
let attachment_00 = attachments_fetched_00.pop().unwrap();
assert_eq!(&attachment_00, &attachments[1]);
assert_eq!(
atlas_db.queued_attachments().unwrap().len(),
0,
"Should have no attachment instance marked 'queued'"
);
}
#[test]
fn test_evict_k_oldest_uninstantiated_attachments() {
let atlas_config = AtlasConfig {
@@ -1003,7 +1102,7 @@ fn test_evict_expired_unresolved_attachment_instances() {
};
let mut atlas_db = AtlasDB::connect_memory(atlas_config).unwrap();
// Insert some uninstanciated attachments
// Insert some uninstantiated attachments
let uninstantiated_attachment_instances = [
new_attachment_instance_from(&new_attachment_from("facade11"), 0, 1),
new_attachment_instance_from(&new_attachment_from("facade12"), 1, 1),
@@ -1016,7 +1115,10 @@ fn test_evict_expired_unresolved_attachment_instances() {
];
for attachment_instance in uninstantiated_attachment_instances.iter() {
atlas_db
.insert_uninstantiated_attachment_instance(attachment_instance, false)
.queue_attachment_instance(attachment_instance)
.unwrap();
atlas_db
.mark_attachment_instance_checked(attachment_instance, false)
.unwrap();
}
@@ -1029,7 +1131,10 @@ fn test_evict_expired_unresolved_attachment_instances() {
];
for attachment_instance in instantiated_attachment_instances.iter() {
atlas_db
.insert_uninstantiated_attachment_instance(attachment_instance, true)
.queue_attachment_instance(attachment_instance)
.unwrap();
atlas_db
.mark_attachment_instance_checked(attachment_instance, true)
.unwrap();
}
@@ -1043,7 +1148,10 @@ fn test_evict_expired_unresolved_attachment_instances() {
];
for attachment_instance in uninstantiated_attachment_instances.iter() {
atlas_db
.insert_uninstantiated_attachment_instance(attachment_instance, false)
.queue_attachment_instance(attachment_instance)
.unwrap();
atlas_db
.mark_attachment_instance_checked(attachment_instance, false)
.unwrap();
}
@@ -1092,7 +1200,7 @@ fn test_bit_vectors() {
let mut atlas_db = AtlasDB::connect_memory(atlas_config).unwrap();
// Insert some uninstanciated attachments
// Insert some uninstantiated attachments
let uninstantiated_attachment_instances = [
new_attachment_instance_from(&new_attachment_from("facade11"), 0, 1),
new_attachment_instance_from(&new_attachment_from("facade12"), 1, 1),
@@ -1101,7 +1209,10 @@ fn test_bit_vectors() {
];
for attachment_instance in uninstantiated_attachment_instances.iter() {
atlas_db
.insert_uninstantiated_attachment_instance(attachment_instance, false)
.queue_attachment_instance(attachment_instance)
.unwrap();
atlas_db
.mark_attachment_instance_checked(attachment_instance, false)
.unwrap();
}
let block_id_1 = uninstantiated_attachment_instances[0].index_block_hash;
@@ -1118,7 +1229,10 @@ fn test_bit_vectors() {
];
for attachment_instance in uninstantiated_attachment_instances.iter() {
atlas_db
.insert_uninstantiated_attachment_instance(attachment_instance, false)
.queue_attachment_instance(attachment_instance)
.unwrap();
atlas_db
.mark_attachment_instance_checked(attachment_instance, false)
.unwrap();
}
let bit_vector = atlas_db
@@ -1134,7 +1248,10 @@ fn test_bit_vectors() {
];
for attachment_instance in instantiated_attachment_instances.iter() {
atlas_db
.insert_uninstantiated_attachment_instance(attachment_instance, true)
.queue_attachment_instance(attachment_instance)
.unwrap();
atlas_db
.mark_attachment_instance_checked(attachment_instance, true)
.unwrap();
}
@@ -1160,7 +1277,10 @@ fn test_bit_vectors() {
let block_id_2 = instantiated_attachment_instances[0].index_block_hash;
for attachment_instance in instantiated_attachment_instances.iter() {
atlas_db
.insert_uninstantiated_attachment_instance(attachment_instance, true)
.queue_attachment_instance(attachment_instance)
.unwrap();
atlas_db
.mark_attachment_instance_checked(attachment_instance, true)
.unwrap();
}

View File

@@ -2788,17 +2788,15 @@ pub mod test {
)
.unwrap();
let (tx, _) = sync_channel(100000);
let indexer = BitcoinIndexer::new_unit_test(&config.burnchain.working_dir);
let mut coord = ChainsCoordinator::test_new_with_observer(
let mut coord = ChainsCoordinator::test_new_full(
&config.burnchain,
config.network_id,
&test_path,
OnChainRewardSetProvider(),
tx,
observer,
indexer,
None,
);
coord.handle_new_burnchain_block().unwrap();
@@ -2968,7 +2966,6 @@ pub mod test {
ibd,
100,
&RPCHandlerArgs::default(),
&mut HashSet::new(),
);
self.sortdb = Some(sortdb);
@@ -3010,7 +3007,6 @@ pub mod test {
ibd,
100,
&RPCHandlerArgs::default(),
&mut HashSet::new(),
);
self.sortdb = Some(sortdb);

View File

@@ -5340,7 +5340,6 @@ impl PeerNetwork {
ibd: bool,
poll_timeout: u64,
handler_args: &RPCHandlerArgs,
attachment_requests: &mut HashSet<AttachmentInstance>,
) -> Result<NetworkResult, net_error> {
debug!(">>>>>>>>>>>>>>>>>>>>>>> Begin Network Dispatch (poll for {}) >>>>>>>>>>>>>>>>>>>>>>>>>>>>", poll_timeout);
let mut poll_states = match self.network {
@@ -5393,7 +5392,7 @@ impl PeerNetwork {
// enqueue them.
PeerNetwork::with_attachments_downloader(self, |network, attachments_downloader| {
let mut known_attachments = attachments_downloader
.enqueue_new_attachments(attachment_requests, &mut network.atlasdb, false)
.check_queued_attachment_instances(&mut network.atlasdb)
.expect("FATAL: failed to store new attachments to the atlas DB");
network_result.attachments.append(&mut known_attachments);
Ok(())

View File

@@ -139,7 +139,7 @@
/// This file may be refactored in the future into a full-fledged module.
use std::cmp;
use std::collections::HashMap;
use std::collections::{HashSet, VecDeque};
use std::collections::VecDeque;
use std::convert::{TryFrom, TryInto};
use std::default::Default;
use std::mem;
@@ -184,7 +184,7 @@ use stacks::cost_estimates::UnitEstimator;
use stacks::cost_estimates::{CostEstimator, FeeEstimator};
use stacks::monitoring::{increment_stx_blocks_mined_counter, update_active_miners_count_gauge};
use stacks::net::{
atlas::{AtlasConfig, AtlasDB, AttachmentInstance},
atlas::{AtlasConfig, AtlasDB},
db::{LocalPeer, PeerDB},
dns::DNSClient,
dns::DNSResolver,
@@ -3524,8 +3524,6 @@ pub struct PeerThread {
globals: Globals,
/// how long to wait for network messages on each poll, in millis
poll_timeout: u64,
/// receiver for attachments discovered by the chains coordinator thread
attachments_rx: Receiver<HashSet<AttachmentInstance>>,
/// handle to the sortition DB (optional so we can take/replace it)
sortdb: Option<SortitionDB>,
/// handle to the chainstate DB (optional so we can take/replace it)
@@ -3577,11 +3575,7 @@ impl PeerThread {
/// Binds the addresses in the config (which may panic if the port is blocked).
/// This is so the node will crash "early" before any new threads start if there's going to be
/// a bind error anyway.
pub fn new(
runloop: &RunLoop,
mut net: PeerNetwork,
attachments_rx: Receiver<HashSet<AttachmentInstance>>,
) -> PeerThread {
pub fn new(runloop: &RunLoop, mut net: PeerNetwork) -> PeerThread {
let config = runloop.config().clone();
let mempool = Self::connect_mempool_db(&config);
let burn_db_path = config.get_burn_db_file_path();
@@ -3611,7 +3605,6 @@ impl PeerThread {
net: Some(net),
globals: runloop.get_globals(),
poll_timeout,
attachments_rx,
sortdb: Some(sortdb),
chainstate: Some(chainstate),
mempool: Some(mempool),
@@ -3693,17 +3686,6 @@ impl PeerThread {
self.poll_timeout
};
let mut expected_attachments = match self.attachments_rx.try_recv() {
Ok(expected_attachments) => {
debug!("Atlas: received attachments: {:?}", &expected_attachments);
expected_attachments
}
_ => {
debug!("Atlas: attachment channel is empty");
HashSet::new()
}
};
// move over unconfirmed state obtained from the relayer
self.with_chainstate(|p2p_thread, sortdb, chainstate, _mempool| {
let _ = Relayer::setup_unconfirmed_state_readonly(chainstate, sortdb);
@@ -3739,7 +3721,6 @@ impl PeerThread {
ibd,
poll_ms,
&handler_args,
&mut expected_attachments,
)
})
});
@@ -4133,9 +4114,6 @@ impl StacksNode {
globals: Globals,
// relay receiver endpoint for the p2p thread, so the relayer can feed it data to push
relay_recv: Receiver<RelayerDirective>,
// attachments receiver endpoint for the p2p thread, so the chains coordinator can feed it
// attachments it discovers
attachments_receiver: Receiver<HashSet<AttachmentInstance>>,
) -> StacksNode {
let config = runloop.config().clone();
let is_miner = runloop.is_miner();
@@ -4195,7 +4173,7 @@ impl StacksNode {
.expect("FATAL: failed to start relayer thread");
let p2p_event_dispatcher = runloop.get_event_dispatcher();
let p2p_thread = PeerThread::new(runloop, p2p_net, attachments_receiver);
let p2p_thread = PeerThread::new(runloop, p2p_net);
let p2p_thread_handle = thread::Builder::new()
.stack_size(BLOCK_PROCESSOR_STACK_SIZE)
.name(format!(

View File

@@ -1,7 +1,6 @@
use std::convert::TryFrom;
use std::default::Default;
use std::net::SocketAddr;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::{collections::HashSet, env};
use std::{thread, thread::JoinHandle, time};
@@ -102,7 +101,6 @@ pub struct Node {
last_sortitioned_block: Option<BurnchainTip>,
event_dispatcher: EventDispatcher,
nonce: u64,
attachments_tx: SyncSender<HashSet<AttachmentInstance>>,
leader_key_registers: HashSet<Txid>,
block_commits: HashSet<Txid>,
}
@@ -178,7 +176,6 @@ fn spawn_peer(
exit_at_block_height: Option<u64>,
genesis_chainstate_hash: Sha256Sum,
poll_timeout: u64,
attachments_rx: Receiver<HashSet<AttachmentInstance>>,
config: Config,
) -> Result<JoinHandle<()>, NetError> {
this.bind(p2p_sock, rpc_sock).unwrap();
@@ -242,14 +239,6 @@ fn spawn_peer(
}
};
let mut expected_attachments = match attachments_rx.try_recv() {
Ok(expected_attachments) => expected_attachments,
_ => {
debug!("Atlas: attachment channel is empty");
HashSet::new()
}
};
let indexer = make_bitcoin_indexer(&config);
let net_result = this
@@ -263,7 +252,6 @@ fn spawn_peer(
false,
poll_timeout,
&handler_args,
&mut expected_attachments,
)
.unwrap();
if net_result.has_transactions() {
@@ -292,11 +280,7 @@ pub fn use_test_genesis_chainstate(config: &Config) -> bool {
impl Node {
/// Instantiate and initialize a new node, given a config
pub fn new(
config: Config,
boot_block_exec: Box<dyn FnOnce(&mut ClarityTx) -> ()>,
attachments_tx: SyncSender<HashSet<AttachmentInstance>>,
) -> Self {
pub fn new(config: Config, boot_block_exec: Box<dyn FnOnce(&mut ClarityTx) -> ()>) -> Self {
let use_test_genesis_data = if config.burnchain.mode == "mocknet" {
use_test_genesis_chainstate(&config)
} else {
@@ -390,7 +374,6 @@ impl Node {
burnchain_tip: None,
nonce: 0,
event_dispatcher,
attachments_tx,
leader_key_registers: HashSet::new(),
block_commits: HashSet::new(),
}
@@ -423,7 +406,6 @@ impl Node {
Err(_e) => panic!(),
};
let (attachments_tx, attachments_rx) = sync_channel(1);
let mut node = Node {
active_registered_key: None,
bootstraping_chain: false,
@@ -435,12 +417,11 @@ impl Node {
burnchain_tip: None,
nonce: 0,
event_dispatcher,
attachments_tx,
leader_key_registers: HashSet::new(),
block_commits: HashSet::new(),
};
node.spawn_peer_server(attachments_rx);
node.spawn_peer_server();
let pox_constants = burnchain_controller.sortdb_ref().pox_constants.clone();
loop {
@@ -464,7 +445,20 @@ impl Node {
node
}
pub fn spawn_peer_server(&mut self, attachments_rx: Receiver<HashSet<AttachmentInstance>>) {
fn make_atlas_config() -> AtlasConfig {
AtlasConfig::default(false)
}
pub fn make_atlas_db(&self) -> AtlasDB {
AtlasDB::connect(
Self::make_atlas_config(),
&self.config.get_atlas_db_file_path(),
true,
)
.unwrap()
}
pub fn spawn_peer_server(&mut self) {
// we can call _open_ here rather than _connect_, since connect is first called in
// make_genesis_block
let burnchain = self.config.get_burnchain();
@@ -550,9 +544,7 @@ impl Node {
}
tx.commit().unwrap();
}
let atlas_config = AtlasConfig::default(false);
let atlasdb =
AtlasDB::connect(atlas_config, &self.config.get_atlas_db_file_path(), true).unwrap();
let atlasdb = self.make_atlas_db();
let local_peer = match PeerDB::get_local_peer(peerdb.conn()) {
Ok(local_peer) => local_peer,
@@ -585,7 +577,6 @@ impl Node {
exit_at_block_height,
Sha256Sum::from_hex(stx_genesis::GENESIS_CHAINSTATE_HASH).unwrap(),
1000,
attachments_rx,
self.config.clone(),
)
.unwrap();
@@ -841,6 +832,7 @@ impl Node {
consensus_hash: &ConsensusHash,
microblocks: Vec<StacksMicroblock>,
db: &mut SortitionDB,
atlas_db: &mut AtlasDB,
) -> ChainTip {
let _parent_consensus_hash = {
// look up parent consensus hash
@@ -898,7 +890,7 @@ impl Node {
BurnchainDB::connect(&burnchain.get_burnchaindb_path(), &burnchain, true)
.expect("FATAL: failed to connect to burnchain DB");
let atlas_config = AtlasConfig::default(false);
let atlas_config = Self::make_atlas_config();
let mut processed_blocks = vec![];
loop {
let mut process_blocks_at_tip = {
@@ -922,13 +914,19 @@ impl Node {
let attachments_instances =
self.get_attachment_instances(epoch_receipt, &atlas_config);
if !attachments_instances.is_empty() {
match self.attachments_tx.send(attachments_instances) {
Ok(_) => {}
Err(e) => {
error!("Error dispatching attachments {}", e);
panic!();
for new_attachment in attachments_instances.into_iter() {
if let Err(e) =
atlas_db.queue_attachment_instance(&new_attachment)
{
warn!(
"Atlas: Error writing attachment instance to DB";
"err" => ?e,
"index_block_hash" => %new_attachment.index_block_hash,
"contract_id" => %new_attachment.contract_id,
"attachment_index" => %new_attachment.attachment_index,
);
}
};
}
}
}
_ => {}

View File

@@ -4,10 +4,8 @@ use crate::{
BitcoinRegtestController, BurnchainController, ChainTip, Config, MocknetController, Node,
};
use stacks::chainstate::stacks::db::ClarityTx;
use stacks::net::atlas::AttachmentInstance;
use stacks::types::chainstate::BurnchainHeaderHash;
use std::collections::HashSet;
use std::sync::mpsc::{sync_channel, Receiver};
/// RunLoop is coordinating a simulated burnchain and some simulated nodes
/// taking turns in producing blocks.
@@ -15,7 +13,6 @@ pub struct RunLoop {
config: Config,
pub node: Node,
pub callbacks: RunLoopCallbacks,
attachments_rx: Option<Receiver<HashSet<AttachmentInstance>>>,
}
impl RunLoop {
@@ -28,16 +25,13 @@ impl RunLoop {
config: Config,
boot_exec: Box<dyn FnOnce(&mut ClarityTx) -> ()>,
) -> Self {
let (attachments_tx, attachments_rx) = sync_channel(1);
// Build node based on config
let node = Node::new(config.clone(), boot_exec, attachments_tx);
let node = Node::new(config.clone(), boot_exec);
Self {
config,
node,
callbacks: RunLoopCallbacks::new(),
attachments_rx: Some(attachments_rx),
}
}
@@ -74,8 +68,7 @@ impl RunLoop {
self.node.process_burnchain_state(&burnchain_tip); // todo(ludo): should return genesis?
let mut chain_tip = ChainTip::genesis(&BurnchainHeaderHash::zero(), 0, 0);
let attachments_rx = self.attachments_rx.take().unwrap();
self.node.spawn_peer_server(attachments_rx);
self.node.spawn_peer_server();
// Bootstrap the chain: node will start a new tenure,
// using the sortition hash from block #1 for generating a VRF.
@@ -128,12 +121,14 @@ impl RunLoop {
// Have the node process its own tenure.
// We should have some additional checks here, and ensure that the previous artifacts are legit.
let mut atlas_db = self.node.make_atlas_db();
chain_tip = self.node.process_tenure(
&artifacts_from_1st_tenure.anchored_block,
&last_sortitioned_block.block_snapshot.consensus_hash,
artifacts_from_1st_tenure.microblocks.clone(),
burnchain.sortdb_mut(),
&mut atlas_db,
);
self.callbacks.invoke_new_stacks_chain_state(
@@ -204,11 +199,14 @@ impl RunLoop {
Some(ref artifacts) => {
// Have the node process its tenure.
// We should have some additional checks here, and ensure that the previous artifacts are legit.
let mut atlas_db = self.node.make_atlas_db();
chain_tip = self.node.process_tenure(
&artifacts.anchored_block,
&last_sortitioned_block.block_snapshot.consensus_hash,
artifacts.microblocks.clone(),
burnchain.sortdb_mut(),
&mut atlas_db,
);
self.callbacks.invoke_new_stacks_chain_state(

View File

@@ -5,14 +5,12 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicU64;
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;
use std::collections::HashSet;
use stacks::deps::ctrlc as termination;
use stacks::deps::ctrlc::SignalId;
@@ -28,7 +26,7 @@ use stacks::chainstate::coordinator::{
};
use stacks::chainstate::stacks::db::{ChainStateBootData, StacksChainState};
use stacks::core::StacksEpochId;
use stacks::net::atlas::{AtlasConfig, Attachment, AttachmentInstance, ATTACHMENTS_CHANNEL_SIZE};
use stacks::net::atlas::{AtlasConfig, AtlasDB, Attachment};
use stacks::util_lib::db::Error as db_error;
use stx_genesis::GenesisData;
@@ -502,7 +500,7 @@ impl RunLoop {
burnchain_config: &Burnchain,
coordinator_receivers: CoordinatorReceivers,
miner_status: Arc<Mutex<MinerStatus>>,
) -> (JoinHandle<()>, Receiver<HashSet<AttachmentInstance>>) {
) -> JoinHandle<()> {
let use_test_genesis_data = use_test_genesis_chainstate(&self.config);
// load up genesis Atlas attachments
@@ -521,7 +519,12 @@ impl RunLoop {
let moved_config = self.config.clone();
let moved_burnchain_config = burnchain_config.clone();
let mut coordinator_dispatcher = self.event_dispatcher.clone();
let (attachments_tx, attachments_rx) = sync_channel(ATTACHMENTS_CHANNEL_SIZE);
let atlas_db = AtlasDB::connect(
moved_atlas_config.clone(),
&self.config.get_atlas_db_file_path(),
true,
)
.expect("Failed to connect Atlas DB during startup");
let coordinator_indexer = make_bitcoin_indexer(&self.config);
let coordinator_thread_handle = thread::Builder::new()
@@ -549,7 +552,6 @@ impl RunLoop {
coord_config,
chain_state_db,
moved_burnchain_config,
attachments_tx,
&mut coordinator_dispatcher,
coordinator_receivers,
moved_atlas_config,
@@ -557,11 +559,12 @@ impl RunLoop {
fee_estimator.as_deref_mut(),
miner_status,
coordinator_indexer,
atlas_db,
);
})
.expect("FATAL: failed to start chains coordinator thread");
(coordinator_thread_handle, attachments_rx)
coordinator_thread_handle
}
/// Instantiate the PoX watchdog
@@ -979,7 +982,7 @@ impl RunLoop {
self.set_globals(globals.clone());
// have headers; boot up the chains coordinator and instantiate the chain state
let (coordinator_thread_handle, attachments_rx) = self.spawn_chains_coordinator(
let coordinator_thread_handle = self.spawn_chains_coordinator(
&burnchain_config,
coordinator_receivers,
globals.get_miner_status(),
@@ -1011,7 +1014,7 @@ impl RunLoop {
// Boot up the p2p network and relayer, and figure out how many sortitions we have so far
// (it could be non-zero if the node is resuming from chainstate)
let mut node = StacksNode::spawn(self, globals.clone(), relay_recv, attachments_rx);
let mut node = StacksNode::spawn(self, globals.clone(), relay_recv);
let liveness_thread = self.spawn_chain_liveness_thread(globals.clone());
// Wait for all pending sortitions to process