diff --git a/src/net/neighbors/comms.rs b/src/net/neighbors/comms.rs index 3887244b1..e0f9f487d 100644 --- a/src/net/neighbors/comms.rs +++ b/src/net/neighbors/comms.rs @@ -20,7 +20,8 @@ use crate::net::{ connection::{ConnectionOptions, ReplyHandleP2P}, db::{LocalPeer, PeerDB}, neighbors::{ - NeighborWalk, NeighborWalkDB, MAX_NEIGHBOR_BLOCK_DELAY, NEIGHBOR_MINIMUM_CONTACT_INTERVAL, + NeighborWalk, NeighborWalkDB, NeighborWalkResult, MAX_NEIGHBOR_BLOCK_DELAY, + NEIGHBOR_MINIMUM_CONTACT_INTERVAL, }, p2p::PeerNetwork, Error as net_error, HandshakeData, Neighbor, NeighborAddress, NeighborKey, PeerAddress, @@ -42,9 +43,9 @@ use stacks_common::types::chainstate::StacksPublicKey; use stacks_common::util::hash::Hash160; use stacks_common::util::log; -/// A trait for representing a set of connected neighbors, for the purposes of executing some P2P +/// A trait for representing session state for a set of connected neighbors, for the purposes of executing some P2P /// algorithm. -pub trait NeighborSet { +pub trait NeighborComms { /// Add a neighbor and its event ID as connecting fn add_connecting( &mut self, @@ -64,8 +65,29 @@ pub trait NeighborSet { fn pin_connection(&mut self, event_id: usize); /// Unpin a connection -- allow it to get pruned fn unpin_connection(&mut self, event_id: usize); + /// Get the collection of pinned connections + fn get_pinned_connections(&self) -> &HashSet; + /// Clear all pinned connections and return them. + /// List items are guaranteed to be unique + fn clear_pinned_connections(&mut self) -> HashSet; /// Is the connection pinned? fn is_pinned(&self, event_id: usize) -> bool; + /// Add an in-flight request to begin polling on + fn add_batch_request(&mut self, naddr: NeighborAddress, rh: ReplyHandleP2P); + /// Get the number of inflight requests + fn count_inflight(&self) -> usize; + /// Poll for any received messages. + fn collect_replies( + &mut self, + network: &mut PeerNetwork, + ) -> Vec<(NeighborAddress, StacksMessage)>; + /// Take all dead neighbors + fn take_dead_neighbors(&mut self) -> HashSet; + /// Take all broken neighbors + fn take_broken_neighbors(&mut self) -> HashSet; + /// Cancel any ongoing requests. Any messages that had been enqueued from + /// `add_batch_request()` will not be delivered after this call completes. + fn cancel_inflight(&mut self); /// Send off a handshake to a remote peer. /// Fails if not connected. @@ -198,10 +220,15 @@ pub trait NeighborSet { /// Connect to a remote neighbor, and get back a reply handle which we can use to wait for a /// handshake response. If the neighbor is already connected, then just send a handshake. + /// + /// Normally, the caller would track the returned reply handle with a call to + /// `add_batch_request()`. However, this is ommitted here for callers who want to do their own + /// polling. + /// /// Return Ok(Some(handle)) if we connected. /// Return Ok(None) if we're in the process of connecting, and should try again. /// Return Err(..) if we fail - fn neighbor_session_begin( + fn neighbor_session_begin_only( &mut self, network: &mut PeerNetwork, neighbor_addr: &NK, @@ -256,9 +283,36 @@ pub trait NeighborSet { } } + /// Connect to a remote neighbor, optionally connecting to it first. + /// If successful, a Handshake message will be sent, and the returned HandshakeAccept or + /// HandshakeReject can be obtained with a follow up call to `collect_replies()` + /// + /// Return Ok(true) if we connected. + /// Return Ok(false) if we're in the process of connecting, and should try again. + fn neighbor_session_begin( + &mut self, + network: &mut PeerNetwork, + neighbor_addr: &NeighborAddress, + neighbor_pubkh: &Hash160, + ) -> Result { + let handle_opt = + self.neighbor_session_begin_only(network, neighbor_addr, neighbor_pubkh)?; + if let Some(handle) = handle_opt { + self.add_batch_request(neighbor_addr.clone(), handle); + return Ok(true); + } + + // still trying + Ok(false) + } + /// Send a message to a connected neighbor. /// Fails if the neighbor is not connected. - fn neighbor_send( + /// + /// If successful, the caller usually calls `add_batch_request()`. This + /// is not carried out here because the caller may instead want to do a blocking wait + /// with the given reply handle (or do its own batching). + fn neighbor_send_only( network: &mut PeerNetwork, neighbor_addr: &NK, msg_payload: StacksMessageType, @@ -268,6 +322,21 @@ pub trait NeighborSet { network.send_message(&nk, msg, network.get_connection_opts().timeout) } + /// Send a message to a connected neighbor. + /// If successful, the reply handle is then tracked via a follow-up call to + /// `add_batch_request()`. + /// Fails if the neighbor is not connected. + fn neighbor_send( + &mut self, + network: &mut PeerNetwork, + neighbor_addr: &NeighborAddress, + msg_payload: StacksMessageType, + ) -> Result<(), net_error> { + let handle = Self::neighbor_send_only(network, neighbor_addr, msg_payload)?; + self.add_batch_request(neighbor_addr.clone(), handle); + Ok(()) + } + /// Try to receive a message from a peer handle. /// On success, consume the reply handle and return the StacksMessage. /// On error, either return the reply handle so we can try again, or return an error if we @@ -328,6 +397,148 @@ pub trait NeighborSet { None => Err(net_error::PeerNotConnected), } } + + /// Reset all comms + fn reset(&mut self) { + let _ = self.take_broken_neighbors(); + let _ = self.take_dead_neighbors(); + self.cancel_inflight(); + self.clear_pinned_connections(); + } +} + +/// Transport-level API for peer network state machines. +/// Prod implementation of NeighborComms. +pub struct PeerNetworkComms { + /// Set of PeerNetwork event IDs that this walk is tracking (so they won't get pruned) + events: HashSet, + /// Map of neighbors we're currently trying to connect to (binds their addresses to their event IDs) + connecting: HashMap, + /// Set of neighbors that died during our comms session + dead_connections: HashSet, + /// Set of neighbors who misbehaved during our comms session + broken_connections: HashSet, + /// Ongoing batch of requests. Will be `None` if there are no inflight requests. + ongoing_batch_request: Option, +} + +impl PeerNetworkComms { + pub fn new() -> PeerNetworkComms { + PeerNetworkComms { + events: HashSet::new(), + connecting: HashMap::new(), + dead_connections: HashSet::new(), + broken_connections: HashSet::new(), + ongoing_batch_request: None, + } + } +} + +impl NeighborComms for PeerNetworkComms { + fn add_connecting( + &mut self, + network: &PeerNetwork, + nk: &NK, + event_id: usize, + ) { + self.connecting + .insert(nk.to_neighbor_key(network), event_id); + self.pin_connection(event_id); + } + + fn get_connecting(&self, network: &PeerNetwork, nk: &NK) -> Option { + self.connecting + .get(&nk.to_neighbor_key(network)) + .map(|event_ref| *event_ref) + } + + fn remove_connecting(&mut self, network: &PeerNetwork, nk: &NK) { + let event_id_opt = self.connecting.remove(&nk.to_neighbor_key(network)); + if let Some(event_id) = event_id_opt { + self.unpin_connection(event_id); + } + } + + fn add_dead(&mut self, network: &PeerNetwork, nk: &NK) { + self.dead_connections.insert(nk.to_neighbor_key(network)); + } + + fn add_broken(&mut self, network: &PeerNetwork, nk: &NK) { + self.broken_connections.insert(nk.to_neighbor_key(network)); + } + + fn pin_connection(&mut self, event_id: usize) { + self.events.insert(event_id); + } + + fn unpin_connection(&mut self, event_id: usize) { + self.events.remove(&event_id); + } + + fn get_pinned_connections(&self) -> &HashSet { + &self.events + } + + fn clear_pinned_connections(&mut self) -> HashSet { + let events = mem::replace(&mut self.events, HashSet::new()); + events + } + + fn is_pinned(&self, event_id: usize) -> bool { + self.events.contains(&event_id) + } + + fn add_batch_request(&mut self, naddr: NeighborAddress, rh: ReplyHandleP2P) { + if let Some(ref mut batch) = self.ongoing_batch_request.as_mut() { + batch.add(naddr, rh); + } else { + let mut batch = NeighborCommsRequest::new(); + batch.add(naddr, rh); + self.ongoing_batch_request = Some(batch); + } + } + + fn count_inflight(&self) -> usize { + self.ongoing_batch_request + .as_ref() + .map(|batch| batch.count_inflight()) + .unwrap_or(0) + } + + fn collect_replies( + &mut self, + network: &mut PeerNetwork, + ) -> Vec<(NeighborAddress, StacksMessage)> { + let mut ret = vec![]; + let mut clear = false; + let mut ongoing_batch_request = self.ongoing_batch_request.take(); + if let Some(batch) = ongoing_batch_request.as_mut() { + ret.extend(batch.new_replies(self, network)); + if batch.count_inflight() == 0 { + clear = true; + } + } + if clear { + self.ongoing_batch_request = None; + } else { + self.ongoing_batch_request = ongoing_batch_request; + } + ret + } + + fn cancel_inflight(&mut self) { + self.ongoing_batch_request = None; + } + + fn take_dead_neighbors(&mut self) -> HashSet { + let dead = mem::replace(&mut self.dead_connections, HashSet::new()); + dead + } + + fn take_broken_neighbors(&mut self) -> HashSet { + let broken = mem::replace(&mut self.broken_connections, HashSet::new()); + broken + } } /// This is a helper trait to ensure that a given struct can be turned into a NeighborKey for the @@ -360,20 +571,20 @@ impl ToNeighborKey for NeighborAddress { /// This struct represents a batch of in-flight requests to a set of peers, identified by a /// neighbor key (or something that converts to it) #[derive(Debug)] -pub struct NeighborSetRequest { +pub struct NeighborCommsRequest { state: HashMap, } /// This struct represents everything we need to iterate through a set of ongoing requests, in /// order to pull out completed replies. -pub struct NeighborSetMessageIterator<'a, NS: NeighborSet> { +pub struct NeighborCommsMessageIterator<'a, NS: NeighborComms> { network: &'a mut PeerNetwork, state: &'a mut HashMap, neighbor_set: &'a mut NS, } /// This is an iterator over completed requests -impl Iterator for NeighborSetMessageIterator<'_, NS> { +impl Iterator for NeighborCommsMessageIterator<'_, NS> { type Item = (NeighborAddress, StacksMessage); fn next(&mut self) -> Option { @@ -408,7 +619,7 @@ impl Iterator for NeighborSetMessageIterator<'_, NS> { } }; - if NeighborSetRequest::is_message_stale(&message, stable_block_height) { + if NeighborCommsRequest::is_message_stale(&message, stable_block_height) { debug!( "{:?}: Remote neighbor {:?} is still bootstrapping (at block {})", &self.network.get_local_peer(), @@ -425,9 +636,9 @@ impl Iterator for NeighborSetMessageIterator<'_, NS> { } } -impl NeighborSetRequest { - pub fn new() -> NeighborSetRequest { - NeighborSetRequest { +impl NeighborCommsRequest { + pub fn new() -> NeighborCommsRequest { + NeighborCommsRequest { state: HashMap::new(), } } @@ -443,12 +654,12 @@ impl NeighborSetRequest { } /// Iterate over all in-flight requests - pub fn new_replies<'a, NS: NeighborSet>( + pub fn new_replies<'a, NS: NeighborComms>( &'a mut self, neighbor_set: &'a mut NS, network: &'a mut PeerNetwork, - ) -> NeighborSetMessageIterator { - NeighborSetMessageIterator { + ) -> NeighborCommsMessageIterator { + NeighborCommsMessageIterator { network, state: &mut self.state, neighbor_set, @@ -460,76 +671,3 @@ impl NeighborSetRequest { self.state.len() } } - -/// Transport-level API for the neighbor walk -impl NeighborWalk { - /// Try and get the next reply from the single neighbor we're talking to - pub(crate) fn try_get_reply( - &mut self, - network: &mut PeerNetwork, - ) -> Result, net_error> { - let mut req_opt = self.ongoing_request.take(); - let cur_neighbor_addr = mem::replace(&mut self.cur_neighbor.addr, NeighborKey::empty()); - let msg_opt = self.poll_next_reply(network, &cur_neighbor_addr, &mut req_opt)?; - self.ongoing_request = req_opt; - self.cur_neighbor.addr = cur_neighbor_addr; - - if let Some(message) = msg_opt.as_ref() { - if NeighborSetRequest::is_message_stale( - &message, - network.get_chain_view().burn_stable_block_height, - ) { - test_debug!( - "{:?}: neighbor {:?} is still bootstrapping (on block {})", - network.get_local_peer(), - &self.cur_neighbor.addr, - message.preamble.burn_block_height - ); - return Err(net_error::StaleNeighbor); - } - } - - Ok(msg_opt) - } - - /// Add a request to the ongoing batch request, instantiating it if need be - pub(crate) fn add_batch_request(&mut self, naddr: NeighborAddress, rh: ReplyHandleP2P) { - if let Some(ref mut batch) = self.ongoing_batch_request.as_mut() { - batch.add(naddr, rh); - } else { - let mut batch = NeighborSetRequest::new(); - batch.add(naddr, rh); - self.ongoing_batch_request = Some(batch); - } - } - - /// Count up the number of batched requests in-flight - pub(crate) fn count_inflight(&self) -> usize { - self.ongoing_batch_request - .as_ref() - .map(|batch| batch.count_inflight()) - .unwrap_or(0) - } - - /// Collect all newly-arrived replies from the ongoing network batch request - pub(crate) fn collect_replies( - &mut self, - network: &mut PeerNetwork, - ) -> Vec<(NeighborAddress, StacksMessage)> { - let mut ret = vec![]; - let mut clear = false; - let mut ongoing_batch_request = self.ongoing_batch_request.take(); - if let Some(batch) = ongoing_batch_request.as_mut() { - ret.extend(batch.new_replies(self, network)); - if batch.count_inflight() == 0 { - clear = true; - } - } - if clear { - self.ongoing_batch_request = None; - } else { - self.ongoing_batch_request = ongoing_batch_request; - } - ret - } -}