Merge pull request #1774 from blockstack/fix/marf-1557

Fix/marf 1557
This commit is contained in:
Jude Nelson
2020-07-27 20:28:40 +00:00
committed by GitHub
6 changed files with 24 additions and 25 deletions

View File

@@ -586,6 +586,11 @@ impl <'a> SortitionDBTx <'a> {
/// Mark an existing snapshot's stacks block as accepted at a particular burn chain tip, and calculate and store its arrival index.
/// If this Stacks block extends the canonical stacks chain tip, then also update the memoized canonical
/// stacks chain tip metadata on the burn chain tip.
// TODO: this method's inner call to get_indexed() occurs within a MARF transaction, which
// means it will clone() the underlying TrieRAM. Until this is rectified, care should be taken
// to ensure that no keys are inserted until after this method is called. This should already
// be the case, since the only time keys are inserted into the sortition DB MARF is when the
// next snapshot is processed (whereas this method is called when a Stacks epoch is processed).
fn set_stacks_block_accepted_at_tip(&mut self, burn_tip: &BlockSnapshot, burn_header_hash: &BurnchainHeaderHash,
parent_stacks_block_hash: &BlockHeaderHash, stacks_block_hash: &BlockHeaderHash, stacks_block_height: u64) -> Result<(), db_error> {
let arrival_index = SortitionDB::get_max_arrival_index(self)?;
@@ -619,12 +624,13 @@ impl <'a> SortitionDBTx <'a> {
let height_opt = match SortitionDB::get_accepted_stacks_block_pointer(self, &burn_tip.burn_header_hash, parent_stacks_block_hash)? {
// this block builds on a block accepted _after_ this burn chain tip was processed?
Some(accepted_header) => Some(accepted_header.height),
None =>
None => {
match self.get_indexed(&burn_tip.sortition_id, &parent_key)? {
// this block builds on a block accepted _before_ this burn chain tip was processed?
Some(height_str) => Some(height_str.parse::<u64>().expect(&format!("BUG: MARF stacks block key '{}' does not map to a u64", parent_key))),
None => None
}
}
};
match height_opt {
Some(height) => {
@@ -1911,7 +1917,6 @@ impl <'a> SortitionHandleTx <'a> {
self.put_indexed_begin(&parent_snapshot.sortition_id, &snapshot.sortition_id)?;
let root_hash = self.put_indexed_all(&keys, &values)?;
self.indexed_commit()?;
self.context.chain_tip = snapshot.sortition_id.clone();
Ok(root_hash)
}

View File

@@ -716,8 +716,6 @@ impl StacksChainState {
.map_err(Error::DBError)?;
let first_root_hash = headers_tx.put_indexed_all(&vec![], &vec![])
.map_err(Error::DBError)?;
headers_tx.indexed_commit()
.map_err(Error::DBError)?;
test_debug!("Boot code headers index_commit {}-{}", &parent_hash, &first_index_hash);
let first_tip_info = StacksHeaderInfo::genesis_block_header_info(first_root_hash);
@@ -1096,8 +1094,6 @@ impl StacksChainState {
.map_err(Error::DBError)?;
let root_hash = headers_tx.put_indexed_all(&indexed_keys, &indexed_values)
.map_err(Error::DBError)?;
headers_tx.indexed_commit()
.map_err(Error::DBError)?;
test_debug!("Headers index_commit {}-{}", &parent_hash, &new_tip.index_block_hash(new_burn_block));
let new_tip_info = StacksHeaderInfo {

View File

@@ -2358,6 +2358,10 @@ mod test {
#[test]
fn test_marf_unconfirmed() {
if fs::metadata("/tmp/test_marf_unconfirmed").is_ok() {
fs::remove_file("/tmp/test_marf_unconfirmed").unwrap();
}
let f = TrieFileStorage::<StacksBlockId>::open_unconfirmed("/tmp/test_marf_unconfirmed").unwrap();
let mut marf = MARF::<StacksBlockId>::from_storage(f);

View File

@@ -705,21 +705,17 @@ impl <T: MarfTrieId> TrieFileStorage <T> {
}
pub fn reopen_readonly(&self) -> Result<TrieFileStorage<T>, Error> {
if let Some((ref block_bhh, _)) = self.last_extended {
error!("MARF storage already opened to in-progress block {}", block_bhh);
return Err(Error::InProgressError);
}
let db = Connection::open_with_flags(&self.db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
db.busy_handler(Some(tx_busy_handler))?;
trace!("Make read-only view of TrieFileStorage: {}", &self.db_path);
// TODO: borrow self.last_extended and self.block_hash_cache; don't copy them
let ret = TrieFileStorage {
db_path: self.db_path.clone(),
db: db,
last_extended: None,
last_extended: self.last_extended.clone(),
cur_block: self.cur_block.clone(),
cur_block_id: self.cur_block_id.clone(),

View File

@@ -2405,7 +2405,7 @@ impl PeerNetwork {
}
};
test_debug!("{:?}: Process BlocksAvailable from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_blocks.available.len());
debug!("{:?}: Process BlocksAvailable from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_blocks.available.len());
for (consensus_hash, burn_header_hash) in new_blocks.available.iter() {
let block_sortition_height = match self.handle_unsolicited_inv_update(sortdb, event_id, &outbound_neighbor_key, consensus_hash, burn_header_hash, false) {
@@ -2436,7 +2436,7 @@ impl PeerNetwork {
}
};
test_debug!("{:?}: Process MicroblocksAvailable from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_mblocks.available.len());
debug!("{:?}: Process MicroblocksAvailable from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_mblocks.available.len());
for (consensus_hash, burn_header_hash) in new_mblocks.available.iter() {
let mblock_sortition_height = match self.handle_unsolicited_inv_update(sortdb, event_id, &outbound_neighbor_key, consensus_hash, burn_header_hash, true) {
@@ -2467,7 +2467,7 @@ impl PeerNetwork {
}
};
test_debug!("{:?}: Process BlocksData from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_blocks.blocks.len());
debug!("{:?}: Process BlocksData from {:?} with {} entries", &self.local_peer, outbound_neighbor_key, new_blocks.blocks.len());
for (burn_header_hash, block) in new_blocks.blocks.iter() {
let sortid = SortitionId::stubbed(burn_header_hash);
@@ -2494,6 +2494,7 @@ impl PeerNetwork {
/// Handle unsolicited messages propagated up to us from our ongoing ConversationP2Ps.
/// Return messages that we couldn't handle here, but key them by neighbor, not event.
/// Drop invalid messages.
fn handle_unsolicited_messages(&mut self, sortdb: &SortitionDB, mut unsolicited: HashMap<usize, Vec<StacksMessage>>) -> Result<HashMap<NeighborKey, Vec<StacksMessage>>, net_error> {
let mut unhandled : HashMap<NeighborKey, Vec<StacksMessage>> = HashMap::new();
for (event_id, messages) in unsolicited.drain() {

View File

@@ -626,6 +626,10 @@ impl<'a, C: Clone, T: MarfTrieId> IndexDBTx<'a, C, T> {
}
/// Get a value from the fork index
/// NOTE: until the TrieFileStorage implementation of reopen_readonly() is made zero-copy --
/// namely, made so it doesn't just naively clone the underlying TrieRAM when reopening
/// read-only, the caller should make sure to only use the get_indexed() _before_ writing any
/// MARF key/value pairs. Doing so afterwards will clone all uncommitted trie state.
pub fn get_indexed(&self, header_hash: &T, key: &str) -> Result<Option<String>, Error> {
get_indexed(self.tx(), &self.index, header_hash, key)
}
@@ -660,20 +664,13 @@ impl<'a, C: Clone, T: MarfTrieId> IndexDBTx<'a, C, T> {
Ok(root_hash)
}
/// Commit the indexed data
pub fn indexed_commit(&mut self) -> Result<(), Error> {
if self.block_linkage.is_some() {
self.index.commit().map_err(Error::IndexError)?;
self.block_linkage = None;
}
Ok(())
}
/// Commit the tx
pub fn commit(mut self) -> Result<(), Error> {
let tx = self._tx.take();
test_debug!("Indexed-commit: storage");
tx.unwrap().commit().map_err(Error::SqliteError)?;
if self.block_linkage.is_some() {
test_debug!("Indexed-commit: MARF index");
self.index.commit().map_err(Error::IndexError)?;
self.block_linkage = None;
}