fix: immediately retry sync if a getchunk/putchunk fails due to stale inventory data

This commit is contained in:
Jude Nelson
2024-08-12 14:24:29 -04:00
parent 7c37f89dce
commit c443f82fbf
3 changed files with 59 additions and 17 deletions

View File

@@ -323,6 +323,8 @@ impl<'a> StackerDBTx<'a> {
}
}
debug!("Reset slot {} of {}", slot_id, smart_contract);
// new slot, or existing slot with a different signer
let qry = "INSERT OR REPLACE INTO chunks (stackerdb_id,signer,slot_id,version,write_time,data,data_hash,signature) VALUES (?1,?2,?3,?4,?5,?6,?7,?8)";
let mut stmt = self.sql_tx.prepare(&qry)?;

View File

@@ -388,6 +388,8 @@ pub struct StackerDBSync<NC: NeighborComms> {
/// whether or not we should immediately re-fetch chunks because we learned about new chunks
/// from our peers when they replied to our chunk-pushes with new inventory state
need_resync: bool,
/// whether or not the fetched inventory was determined to be stale
stale_inv: bool,
/// Track stale neighbors
pub(crate) stale_neighbors: HashSet<NeighborAddress>,
/// How many attempted connections have been made in the last pass (gets reset)
@@ -466,7 +468,9 @@ impl PeerNetwork {
Err(e) => {
debug!(
"{:?}: failed to get chunk versions for {}: {:?}",
self.local_peer, contract_id, &e
self.get_local_peer(),
contract_id,
&e
);
// most likely indicates that this DB doesn't exist
@@ -475,6 +479,14 @@ impl PeerNetwork {
};
let num_outbound_replicas = self.count_outbound_stackerdb_replicas(contract_id) as u32;
debug!(
"{:?}: inventory for {} has {} outbound replicas; versions are {:?}",
self.get_local_peer(),
contract_id,
num_outbound_replicas,
&slot_versions
);
StacksMessageType::StackerDBChunkInv(StackerDBChunkInvData {
slot_versions,
num_outbound_replicas,

View File

@@ -71,6 +71,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
total_pushed: 0,
last_run_ts: 0,
need_resync: false,
stale_inv: false,
stale_neighbors: HashSet::new(),
num_connections: 0,
num_attempted_connections: 0,
@@ -210,6 +211,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
self.write_freq = config.write_freq;
self.need_resync = false;
self.stale_inv = false;
self.last_run_ts = get_epoch_time_secs();
self.state = StackerDBSyncState::ConnectBegin;
@@ -253,7 +255,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
.get_slot_write_timestamps(&self.smart_contract_id)?;
if local_slot_versions.len() != local_write_timestamps.len() {
let msg = format!("Local slot versions ({}) out of sync with DB slot versions ({}); abandoning sync and trying again", local_slot_versions.len(), local_write_timestamps.len());
let msg = format!("Local slot versions ({}) out of sync with DB slot versions ({}) for {}; abandoning sync and trying again", local_slot_versions.len(), local_write_timestamps.len(), &self.smart_contract_id);
warn!("{}", &msg);
return Err(net_error::Transient(msg));
}
@@ -267,12 +269,13 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
let write_ts = local_write_timestamps[i];
if write_ts + self.write_freq > now {
debug!(
"{:?}: Chunk {} was written too frequently ({} + {} >= {}), so will not fetch chunk",
"{:?}: Chunk {} was written too frequently ({} + {} >= {}) in {}, so will not fetch chunk",
network.get_local_peer(),
i,
write_ts,
self.write_freq,
now
now,
&self.smart_contract_id,
);
continue;
}
@@ -340,10 +343,11 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
schedule.reverse();
debug!(
"{:?}: Will request up to {} chunks for {}",
"{:?}: Will request up to {} chunks for {}. Schedule: {:?}",
network.get_local_peer(),
&schedule.len(),
&self.smart_contract_id,
&schedule
);
Ok(schedule)
}
@@ -507,12 +511,13 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
if *old_version < new_inv.slot_versions[old_slot_id] {
// remote peer indicated that it has a newer version of this chunk.
debug!(
"{:?}: peer {:?} has a newer version of slot {} ({} < {})",
"{:?}: peer {:?} has a newer version of slot {} ({} < {}) in {}",
_network.get_local_peer(),
&naddr,
old_slot_id,
old_version,
new_inv.slot_versions[old_slot_id]
new_inv.slot_versions[old_slot_id],
&self.smart_contract_id,
);
resync = true;
break;
@@ -621,9 +626,10 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
self.replicas = replicas;
}
debug!(
"{:?}: connect_begin: establish StackerDB sessions to {} neighbors",
"{:?}: connect_begin: establish StackerDB sessions to {} neighbors (out of {} p2p peers)",
network.get_local_peer(),
self.replicas.len()
self.replicas.len(),
network.get_num_p2p_convos()
);
if self.replicas.len() == 0 {
// nothing to do
@@ -820,9 +826,10 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
}
StacksMessageType::Nack(data) => {
debug!(
"{:?}: remote peer {:?} NACK'ed our StackerDBGetChunksInv us with code {}",
"{:?}: remote peer {:?} NACK'ed our StackerDBGetChunksInv us (on {}) with code {}",
&network.get_local_peer(),
&naddr,
&self.smart_contract_id,
data.error_code
);
self.connected_replicas.remove(&naddr);
@@ -838,9 +845,10 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
}
};
debug!(
"{:?}: getchunksinv_try_finish: Received StackerDBChunkInv from {:?}",
"{:?}: getchunksinv_try_finish: Received StackerDBChunkInv from {:?}: {:?}",
network.get_local_peer(),
&naddr
&naddr,
&chunk_inv_opt
);
if let Some(chunk_inv) = chunk_inv_opt {
@@ -956,14 +964,17 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
StacksMessageType::StackerDBChunk(data) => data,
StacksMessageType::Nack(data) => {
debug!(
"{:?}: remote peer {:?} NACK'ed our StackerDBGetChunk with code {}",
"{:?}: remote peer {:?} NACK'ed our StackerDBGetChunk (on {}) with code {}",
network.get_local_peer(),
&naddr,
&self.smart_contract_id,
data.error_code
);
self.connected_replicas.remove(&naddr);
if data.error_code == NackErrorCodes::StaleView {
self.stale_neighbors.insert(naddr);
} else if data.error_code == NackErrorCodes::StaleVersion {
// try again immediately, without throttling
self.stale_inv = true;
}
continue;
}
@@ -1068,7 +1079,6 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
&selected_neighbor,
&e
);
self.connected_replicas.remove(&selected_neighbor);
continue;
}
@@ -1107,7 +1117,6 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
&naddr,
data.error_code
);
self.connected_replicas.remove(&naddr);
if data.error_code == NackErrorCodes::StaleView {
self.stale_neighbors.insert(naddr);
}
@@ -1199,8 +1208,11 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
let done = self.connect_begin(network)?;
if done {
self.state = StackerDBSyncState::ConnectFinish;
blocked = false;
} else {
// no replicas; try again
self.state = StackerDBSyncState::Finished;
}
blocked = false;
}
StackerDBSyncState::ConnectFinish => {
let done = self.connect_try_finish(network)?;
@@ -1248,6 +1260,11 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
{
// someone pushed newer chunk data to us, and getting chunks is
// enabled, so immediately go request them
debug!(
"{:?}: immediately retry StackerDB GetChunks on {} due to PushChunk NACK",
network.get_local_peer(),
&self.smart_contract_id
);
self.recalculate_chunk_request_schedule(network)?;
self.state = StackerDBSyncState::GetChunks;
} else {
@@ -1259,8 +1276,19 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
}
}
StackerDBSyncState::Finished => {
let stale_inv = self.stale_inv;
let result = self.reset(Some(network), config);
self.state = StackerDBSyncState::ConnectBegin;
if stale_inv {
debug!(
"{:?}: immediately retry StackerDB sync on {} due to stale inventory",
network.get_local_peer(),
&self.smart_contract_id
);
self.wakeup();
}
return Ok(Some(result));
}
};