mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-06-15 09:07:47 +08:00
feat: StackerDBConfig now includes the list of signers, and the PeerNetwork will wake up a state machine if it receives pushed data
This commit is contained in:
@@ -115,6 +115,7 @@
|
||||
pub mod tests;
|
||||
|
||||
pub mod bits;
|
||||
pub mod config;
|
||||
pub mod db;
|
||||
pub mod sync;
|
||||
|
||||
@@ -143,6 +144,7 @@ use crate::net::neighbors::NeighborComms;
|
||||
|
||||
use crate::net::p2p::PeerNetwork;
|
||||
|
||||
use stacks_common::types::chainstate::StacksAddress;
|
||||
use stacks_common::util::get_epoch_time_secs;
|
||||
|
||||
/// maximum chunk inventory size
|
||||
@@ -167,16 +169,16 @@ pub struct StackerDBSyncResult {
|
||||
pub struct StackerDBConfig {
|
||||
/// maximum chunk size
|
||||
pub chunk_size: u64,
|
||||
/// number of chunks in this DB. Cannot be bigger than STACERDB_INV_MAX.
|
||||
pub num_slots: u64,
|
||||
/// list of who writes and how many slots they have
|
||||
pub signers: Vec<(StacksAddress, u64)>,
|
||||
/// minimum wall-clock time between writes to the same slot.
|
||||
pub write_freq: u64,
|
||||
/// maximum number of times a slot may be written to during a reward cycle.
|
||||
pub max_writes: u32,
|
||||
/// hint for some initial peers that have replicas of this DB
|
||||
pub hint_peers: Vec<NeighborAddress>,
|
||||
pub hint_replicas: Vec<NeighborAddress>,
|
||||
/// hint for how many neighbors to connect to
|
||||
pub num_neighbors: usize,
|
||||
pub max_neighbors: usize,
|
||||
}
|
||||
|
||||
impl StackerDBConfig {
|
||||
@@ -186,11 +188,16 @@ impl StackerDBConfig {
|
||||
chunk_size: u64::MAX,
|
||||
write_freq: 0,
|
||||
max_writes: u32::MAX,
|
||||
hint_peers: vec![],
|
||||
num_neighbors: 8,
|
||||
num_slots: STACKERDB_INV_MAX.into(),
|
||||
hint_replicas: vec![],
|
||||
max_neighbors: 8,
|
||||
signers: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// How many slots are in this DB total?
|
||||
pub fn num_slots(&self) -> u64 {
|
||||
self.signers.iter().fold(0, |acc, s| acc + s.1)
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the set of replicated chunks in all stacker DBs that this node subscribes to.
|
||||
@@ -241,7 +248,7 @@ pub struct StackerDBSync<NC: NeighborComms> {
|
||||
pub smart_contract_id: ContractId,
|
||||
/// number of chunks in this DB
|
||||
pub num_slots: usize,
|
||||
/// how frequently we accept chunk writes
|
||||
/// how frequently we accept chunk writes, in seconds
|
||||
pub write_freq: u64,
|
||||
/// What versions of each chunk does each neighbor have?
|
||||
pub chunk_invs: HashMap<NeighborAddress, StackerDBChunkInvData>,
|
||||
@@ -260,9 +267,9 @@ pub struct StackerDBSync<NC: NeighborComms> {
|
||||
/// Downloaded chunks
|
||||
pub downloaded_chunks: HashMap<NeighborAddress, Vec<StackerDBChunkData>>,
|
||||
/// Replicas to contact
|
||||
pub(crate) replicas: Vec<NeighborAddress>,
|
||||
pub(crate) replicas: HashSet<NeighborAddress>,
|
||||
/// Replicas that have connected
|
||||
pub(crate) connected_replicas: Vec<NeighborAddress>,
|
||||
pub(crate) connected_replicas: HashSet<NeighborAddress>,
|
||||
/// Comms with neigbors
|
||||
pub(crate) comms: NC,
|
||||
/// Handle to StackerDBs
|
||||
@@ -275,6 +282,8 @@ pub struct StackerDBSync<NC: NeighborComms> {
|
||||
pub total_stored: u64,
|
||||
/// total chunks pushed
|
||||
pub total_pushed: u64,
|
||||
/// last time the state-transition function ran to completion
|
||||
last_run_ts: u64,
|
||||
}
|
||||
|
||||
impl StackerDBSyncResult {
|
||||
@@ -323,7 +332,12 @@ impl PeerNetwork {
|
||||
"Failed to run StackerDB state machine for {}: {:?}",
|
||||
&sc, &e
|
||||
);
|
||||
stacker_db_sync.reset(Some(self), config)?;
|
||||
if let Err(e) = stacker_db_sync.reset(Some(self), config) {
|
||||
info!(
|
||||
"Failed to reset StackerDB state machine for {}: {:?}",
|
||||
&sc, &e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -424,6 +438,10 @@ impl PeerNetwork {
|
||||
/// Handle unsolicited StackerDBPushChunk messages.
|
||||
/// Generate a reply handle for a StackerDBChunksInv to be sent to the remote peer, in which
|
||||
/// the inventory vector is updated with this chunk's data.
|
||||
///
|
||||
/// Note that this can happen *during* a StackerDB sync's execution, so be very careful about
|
||||
/// modifying a state machine's contents!
|
||||
///
|
||||
/// Return Ok(true) if we should store the chunk
|
||||
/// Return Ok(false) if we should drop it.
|
||||
pub fn handle_unsolicited_StackerDBPushChunk(
|
||||
@@ -489,6 +507,13 @@ impl PeerNetwork {
|
||||
// patch inventory -- we'll accept this chunk
|
||||
data.slot_versions[chunk_data.chunk_data.slot_id as usize] =
|
||||
chunk_data.chunk_data.slot_version;
|
||||
|
||||
// wake up the state machine -- force it to begin a new sync if it's asleep
|
||||
if let Some(stackerdb_syncs) = self.stacker_db_syncs.as_mut() {
|
||||
if let Some(stackerdb_sync) = stackerdb_syncs.get_mut(&chunk_data.contract_id) {
|
||||
stackerdb_sync.wakeup();
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user