diff --git a/stackslib/src/net/stackerdb/db.rs b/stackslib/src/net/stackerdb/db.rs index 6cdebb69d..907ce29cc 100644 --- a/stackslib/src/net/stackerdb/db.rs +++ b/stackslib/src/net/stackerdb/db.rs @@ -323,6 +323,8 @@ impl<'a> StackerDBTx<'a> { } } + debug!("Reset slot {} of {}", slot_id, smart_contract); + // new slot, or existing slot with a different signer let qry = "INSERT OR REPLACE INTO chunks (stackerdb_id,signer,slot_id,version,write_time,data,data_hash,signature) VALUES (?1,?2,?3,?4,?5,?6,?7,?8)"; let mut stmt = self.sql_tx.prepare(&qry)?; diff --git a/stackslib/src/net/stackerdb/mod.rs b/stackslib/src/net/stackerdb/mod.rs index da3ffa455..2cf0e0ddf 100644 --- a/stackslib/src/net/stackerdb/mod.rs +++ b/stackslib/src/net/stackerdb/mod.rs @@ -388,6 +388,8 @@ pub struct StackerDBSync { /// whether or not we should immediately re-fetch chunks because we learned about new chunks /// from our peers when they replied to our chunk-pushes with new inventory state need_resync: bool, + /// whether or not the fetched inventory was determined to be stale + stale_inv: bool, /// Track stale neighbors pub(crate) stale_neighbors: HashSet, /// How many attempted connections have been made in the last pass (gets reset) @@ -466,7 +468,9 @@ impl PeerNetwork { Err(e) => { debug!( "{:?}: failed to get chunk versions for {}: {:?}", - self.local_peer, contract_id, &e + self.get_local_peer(), + contract_id, + &e ); // most likely indicates that this DB doesn't exist @@ -475,6 +479,14 @@ impl PeerNetwork { }; let num_outbound_replicas = self.count_outbound_stackerdb_replicas(contract_id) as u32; + + debug!( + "{:?}: inventory for {} has {} outbound replicas; versions are {:?}", + self.get_local_peer(), + contract_id, + num_outbound_replicas, + &slot_versions + ); StacksMessageType::StackerDBChunkInv(StackerDBChunkInvData { slot_versions, num_outbound_replicas, diff --git a/stackslib/src/net/stackerdb/sync.rs b/stackslib/src/net/stackerdb/sync.rs index c3e61acbc..3eb0d86ae 100644 --- a/stackslib/src/net/stackerdb/sync.rs +++ b/stackslib/src/net/stackerdb/sync.rs @@ -71,6 +71,7 @@ impl StackerDBSync { total_pushed: 0, last_run_ts: 0, need_resync: false, + stale_inv: false, stale_neighbors: HashSet::new(), num_connections: 0, num_attempted_connections: 0, @@ -210,6 +211,7 @@ impl StackerDBSync { self.write_freq = config.write_freq; self.need_resync = false; + self.stale_inv = false; self.last_run_ts = get_epoch_time_secs(); self.state = StackerDBSyncState::ConnectBegin; @@ -253,7 +255,7 @@ impl StackerDBSync { .get_slot_write_timestamps(&self.smart_contract_id)?; if local_slot_versions.len() != local_write_timestamps.len() { - let msg = format!("Local slot versions ({}) out of sync with DB slot versions ({}); abandoning sync and trying again", local_slot_versions.len(), local_write_timestamps.len()); + let msg = format!("Local slot versions ({}) out of sync with DB slot versions ({}) for {}; abandoning sync and trying again", local_slot_versions.len(), local_write_timestamps.len(), &self.smart_contract_id); warn!("{}", &msg); return Err(net_error::Transient(msg)); } @@ -267,12 +269,13 @@ impl StackerDBSync { let write_ts = local_write_timestamps[i]; if write_ts + self.write_freq > now { debug!( - "{:?}: Chunk {} was written too frequently ({} + {} >= {}), so will not fetch chunk", + "{:?}: Chunk {} was written too frequently ({} + {} >= {}) in {}, so will not fetch chunk", network.get_local_peer(), i, write_ts, self.write_freq, - now + now, + &self.smart_contract_id, ); continue; } @@ -340,10 +343,11 @@ impl StackerDBSync { schedule.reverse(); debug!( - "{:?}: Will request up to {} chunks for {}", + "{:?}: Will request up to {} chunks for {}. Schedule: {:?}", network.get_local_peer(), &schedule.len(), &self.smart_contract_id, + &schedule ); Ok(schedule) } @@ -507,12 +511,13 @@ impl StackerDBSync { if *old_version < new_inv.slot_versions[old_slot_id] { // remote peer indicated that it has a newer version of this chunk. debug!( - "{:?}: peer {:?} has a newer version of slot {} ({} < {})", + "{:?}: peer {:?} has a newer version of slot {} ({} < {}) in {}", _network.get_local_peer(), &naddr, old_slot_id, old_version, - new_inv.slot_versions[old_slot_id] + new_inv.slot_versions[old_slot_id], + &self.smart_contract_id, ); resync = true; break; @@ -621,9 +626,10 @@ impl StackerDBSync { self.replicas = replicas; } debug!( - "{:?}: connect_begin: establish StackerDB sessions to {} neighbors", + "{:?}: connect_begin: establish StackerDB sessions to {} neighbors (out of {} p2p peers)", network.get_local_peer(), - self.replicas.len() + self.replicas.len(), + network.get_num_p2p_convos() ); if self.replicas.len() == 0 { // nothing to do @@ -820,9 +826,10 @@ impl StackerDBSync { } StacksMessageType::Nack(data) => { debug!( - "{:?}: remote peer {:?} NACK'ed our StackerDBGetChunksInv us with code {}", + "{:?}: remote peer {:?} NACK'ed our StackerDBGetChunksInv us (on {}) with code {}", &network.get_local_peer(), &naddr, + &self.smart_contract_id, data.error_code ); self.connected_replicas.remove(&naddr); @@ -838,9 +845,10 @@ impl StackerDBSync { } }; debug!( - "{:?}: getchunksinv_try_finish: Received StackerDBChunkInv from {:?}", + "{:?}: getchunksinv_try_finish: Received StackerDBChunkInv from {:?}: {:?}", network.get_local_peer(), - &naddr + &naddr, + &chunk_inv_opt ); if let Some(chunk_inv) = chunk_inv_opt { @@ -956,14 +964,17 @@ impl StackerDBSync { StacksMessageType::StackerDBChunk(data) => data, StacksMessageType::Nack(data) => { debug!( - "{:?}: remote peer {:?} NACK'ed our StackerDBGetChunk with code {}", + "{:?}: remote peer {:?} NACK'ed our StackerDBGetChunk (on {}) with code {}", network.get_local_peer(), &naddr, + &self.smart_contract_id, data.error_code ); - self.connected_replicas.remove(&naddr); if data.error_code == NackErrorCodes::StaleView { self.stale_neighbors.insert(naddr); + } else if data.error_code == NackErrorCodes::StaleVersion { + // try again immediately, without throttling + self.stale_inv = true; } continue; } @@ -1068,7 +1079,6 @@ impl StackerDBSync { &selected_neighbor, &e ); - self.connected_replicas.remove(&selected_neighbor); continue; } @@ -1107,7 +1117,6 @@ impl StackerDBSync { &naddr, data.error_code ); - self.connected_replicas.remove(&naddr); if data.error_code == NackErrorCodes::StaleView { self.stale_neighbors.insert(naddr); } @@ -1199,8 +1208,11 @@ impl StackerDBSync { let done = self.connect_begin(network)?; if done { self.state = StackerDBSyncState::ConnectFinish; - blocked = false; + } else { + // no replicas; try again + self.state = StackerDBSyncState::Finished; } + blocked = false; } StackerDBSyncState::ConnectFinish => { let done = self.connect_try_finish(network)?; @@ -1248,6 +1260,11 @@ impl StackerDBSync { { // someone pushed newer chunk data to us, and getting chunks is // enabled, so immediately go request them + debug!( + "{:?}: immediately retry StackerDB GetChunks on {} due to PushChunk NACK", + network.get_local_peer(), + &self.smart_contract_id + ); self.recalculate_chunk_request_schedule(network)?; self.state = StackerDBSyncState::GetChunks; } else { @@ -1259,8 +1276,19 @@ impl StackerDBSync { } } StackerDBSyncState::Finished => { + let stale_inv = self.stale_inv; + let result = self.reset(Some(network), config); self.state = StackerDBSyncState::ConnectBegin; + + if stale_inv { + debug!( + "{:?}: immediately retry StackerDB sync on {} due to stale inventory", + network.get_local_peer(), + &self.smart_contract_id + ); + self.wakeup(); + } return Ok(Some(result)); } };