Merge branch 'fix/marf-1557' into next

This commit is contained in:
Jude Nelson
2020-08-12 11:59:27 -04:00
5 changed files with 24 additions and 25 deletions

View File

@@ -619,6 +619,11 @@ impl <'a> SortitionHandleTx <'a> {
/// and calculate and store its arrival index.
/// If this Stacks block extends the canonical stacks chain tip, then also update the memoized canonical
/// stacks chain tip metadata on the burn chain tip.
// TODO: this method's inner call to get_indexed() occurs within a MARF transaction, which
// means it will clone() the underlying TrieRAM. Until this is rectified, care should be taken
// to ensure that no keys are inserted until after this method is called. This should already
// be the case, since the only time keys are inserted into the sortition DB MARF is when the
// next snapshot is processed (whereas this method is called when a Stacks epoch is processed).
fn set_stacks_block_accepted_at_tip(&mut self, burn_tip: &BlockSnapshot, consensus_hash: &ConsensusHash,
parent_stacks_block_hash: &BlockHeaderHash, stacks_block_hash: &BlockHeaderHash, stacks_block_height: u64) -> Result<(), db_error> {
@@ -653,12 +658,13 @@ impl <'a> SortitionHandleTx <'a> {
let height_opt = match SortitionDB::get_accepted_stacks_block_pointer(self, &burn_tip.consensus_hash, parent_stacks_block_hash)? {
// this block builds on a block accepted _after_ this burn chain tip was processed?
Some(accepted_header) => Some(accepted_header.height),
None =>
None => {
match self.get_indexed(&burn_tip.sortition_id, &parent_key)? {
// this block builds on a block accepted _before_ this burn chain tip was processed?
Some(height_str) => Some(height_str.parse::<u64>().expect(&format!("BUG: MARF stacks block key '{}' does not map to a u64", parent_key))),
None => None
}
}
};
match height_opt {
Some(height) => {
@@ -2141,7 +2147,6 @@ impl <'a> SortitionHandleTx <'a> {
self.put_indexed_begin(&parent_snapshot.sortition_id, &snapshot.sortition_id)?;
let root_hash = self.put_indexed_all(&keys, &values)?;
self.indexed_commit()?;
self.context.chain_tip = snapshot.sortition_id.clone();
Ok(root_hash)
}

View File

@@ -674,9 +674,10 @@ impl StacksChainState {
let first_index_hash = StacksBlockHeader::make_index_block_hash(&FIRST_BURNCHAIN_CONSENSUS_HASH, &FIRST_STACKS_BLOCK_HASH);
test_debug!("Boot code headers index_put_begin {}-{}", &parent_hash, &first_index_hash);
headers_tx.put_indexed_begin(&parent_hash, &first_index_hash)?;
let first_root_hash = headers_tx.put_indexed_all(&vec![], &vec![])?;
headers_tx.indexed_commit()?;
test_debug!("Boot code headers index_commit {}-{}", &parent_hash, &first_index_hash);
let first_tip_info = StacksHeaderInfo::genesis_block_header_info(first_root_hash);
@@ -1057,9 +1058,8 @@ impl StacksChainState {
test_debug!("Headers index_put_begin {}-{}", &parent_hash, &new_tip.index_block_hash(new_consensus_hash));
headers_tx.put_indexed_begin(&parent_hash, &new_tip.index_block_hash(new_consensus_hash))?;
let root_hash = headers_tx.put_indexed_all(&indexed_keys, &indexed_values)?;
headers_tx.indexed_commit()?;
test_debug!("Headers index_commit {}-{}", &parent_hash, &new_tip.index_block_hash(new_consensus_hash));
test_debug!("Headers index_indexed_all finished {}-{}", &parent_hash, &new_tip.index_block_hash(new_consensus_hash));
let new_tip_info = StacksHeaderInfo {
anchored_header: new_tip.clone(),
microblock_tail: microblock_tail_opt,

View File

@@ -705,21 +705,17 @@ impl <T: MarfTrieId> TrieFileStorage <T> {
}
pub fn reopen_readonly(&self) -> Result<TrieFileStorage<T>, Error> {
if let Some((ref block_bhh, _)) = self.last_extended {
error!("MARF storage already opened to in-progress block {}", block_bhh);
return Err(Error::InProgressError);
}
let db = Connection::open_with_flags(&self.db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
db.busy_handler(Some(tx_busy_handler))?;
trace!("Make read-only view of TrieFileStorage: {}", &self.db_path);
// TODO: borrow self.last_extended and self.block_hash_cache; don't copy them
let ret = TrieFileStorage {
db_path: self.db_path.clone(),
db: db,
last_extended: None,
last_extended: self.last_extended.clone(),
cur_block: self.cur_block.clone(),
cur_block_id: self.cur_block_id.clone(),

View File

@@ -2173,7 +2173,7 @@ impl PeerNetwork {
}
};
test_debug!("{:?}: Process BlocksAvailable from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_blocks.available.len());
debug!("{:?}: Process BlocksAvailable from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_blocks.available.len());
for (consensus_hash, _) in new_blocks.available.iter() {
let block_sortition_height = match self.handle_unsolicited_inv_update(sortdb, event_id, &outbound_neighbor_key, consensus_hash, false) {
@@ -2204,7 +2204,7 @@ impl PeerNetwork {
}
};
test_debug!("{:?}: Process MicroblocksAvailable from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_mblocks.available.len());
debug!("{:?}: Process MicroblocksAvailable from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_mblocks.available.len());
for (consensus_hash, _) in new_mblocks.available.iter() {
let mblock_sortition_height = match self.handle_unsolicited_inv_update(sortdb, event_id, &outbound_neighbor_key, consensus_hash, true) {
@@ -2235,7 +2235,7 @@ impl PeerNetwork {
}
};
test_debug!("{:?}: Process BlocksData from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_blocks.blocks.len());
debug!("{:?}: Process BlocksData from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_blocks.blocks.len());
for (burn_header_hash, block) in new_blocks.blocks.iter() {
// TODO(PoX): burn_header_hash to be replaced with a ConsensusHash, so we can just use
@@ -2264,6 +2264,7 @@ impl PeerNetwork {
/// Handle unsolicited messages propagated up to us from our ongoing ConversationP2Ps.
/// Return messages that we couldn't handle here, but key them by neighbor, not event.
/// Drop invalid messages.
fn handle_unsolicited_messages(&mut self, sortdb: &SortitionDB, mut unsolicited: HashMap<usize, Vec<StacksMessage>>) -> Result<HashMap<NeighborKey, Vec<StacksMessage>>, net_error> {
let mut unhandled : HashMap<NeighborKey, Vec<StacksMessage>> = HashMap::new();
for (event_id, messages) in unsolicited.drain() {

View File

@@ -626,6 +626,10 @@ impl<'a, C: Clone, T: MarfTrieId> IndexDBTx<'a, C, T> {
}
/// Get a value from the fork index
/// NOTE: until the TrieFileStorage implementation of reopen_readonly() is made zero-copy --
/// namely, made so it doesn't just naively clone the underlying TrieRAM when reopening
/// read-only, the caller should make sure to only use the get_indexed() _before_ writing any
/// MARF key/value pairs. Doing so afterwards will clone all uncommitted trie state.
pub fn get_indexed(&self, header_hash: &T, key: &str) -> Result<Option<String>, Error> {
get_indexed(self.tx(), &self.index, header_hash, key)
}
@@ -660,20 +664,13 @@ impl<'a, C: Clone, T: MarfTrieId> IndexDBTx<'a, C, T> {
Ok(root_hash)
}
/// Commit the indexed data
pub fn indexed_commit(&mut self) -> Result<(), Error> {
if self.block_linkage.is_some() {
self.index.commit().map_err(Error::IndexError)?;
self.block_linkage = None;
}
Ok(())
}
/// Commit the tx
pub fn commit(mut self) -> Result<(), Error> {
let tx = self._tx.take();
test_debug!("Indexed-commit: storage");
tx.unwrap().commit().map_err(Error::SqliteError)?;
if self.block_linkage.is_some() {
test_debug!("Indexed-commit: MARF index");
self.index.commit().map_err(Error::IndexError)?;
self.block_linkage = None;
}