refactor: fully extract and implement peer network comms into its own struct, which is now going to be a member of NeighborWalk

This commit is contained in:
Jude Nelson
2023-08-01 09:44:48 -04:00
parent 3d0c6d8e87
commit 4ceef5caa9

View File

@@ -20,7 +20,8 @@ use crate::net::{
connection::{ConnectionOptions, ReplyHandleP2P},
db::{LocalPeer, PeerDB},
neighbors::{
NeighborWalk, NeighborWalkDB, MAX_NEIGHBOR_BLOCK_DELAY, NEIGHBOR_MINIMUM_CONTACT_INTERVAL,
NeighborWalk, NeighborWalkDB, NeighborWalkResult, MAX_NEIGHBOR_BLOCK_DELAY,
NEIGHBOR_MINIMUM_CONTACT_INTERVAL,
},
p2p::PeerNetwork,
Error as net_error, HandshakeData, Neighbor, NeighborAddress, NeighborKey, PeerAddress,
@@ -42,9 +43,9 @@ use stacks_common::types::chainstate::StacksPublicKey;
use stacks_common::util::hash::Hash160;
use stacks_common::util::log;
/// A trait for representing a set of connected neighbors, for the purposes of executing some P2P
/// A trait for representing session state for a set of connected neighbors, for the purposes of executing some P2P
/// algorithm.
pub trait NeighborSet {
pub trait NeighborComms {
/// Add a neighbor and its event ID as connecting
fn add_connecting<NK: ToNeighborKey>(
&mut self,
@@ -64,8 +65,29 @@ pub trait NeighborSet {
fn pin_connection(&mut self, event_id: usize);
/// Unpin a connection -- allow it to get pruned
fn unpin_connection(&mut self, event_id: usize);
/// Get the collection of pinned connections
fn get_pinned_connections(&self) -> &HashSet<usize>;
/// Clear all pinned connections and return them.
/// List items are guaranteed to be unique
fn clear_pinned_connections(&mut self) -> HashSet<usize>;
/// Is the connection pinned?
fn is_pinned(&self, event_id: usize) -> bool;
/// Add an in-flight request to begin polling on
fn add_batch_request(&mut self, naddr: NeighborAddress, rh: ReplyHandleP2P);
/// Get the number of inflight requests
fn count_inflight(&self) -> usize;
/// Poll for any received messages.
fn collect_replies(
&mut self,
network: &mut PeerNetwork,
) -> Vec<(NeighborAddress, StacksMessage)>;
/// Take all dead neighbors
fn take_dead_neighbors(&mut self) -> HashSet<NeighborKey>;
/// Take all broken neighbors
fn take_broken_neighbors(&mut self) -> HashSet<NeighborKey>;
/// Cancel any ongoing requests. Any messages that had been enqueued from
/// `add_batch_request()` will not be delivered after this call completes.
fn cancel_inflight(&mut self);
/// Send off a handshake to a remote peer.
/// Fails if not connected.
@@ -198,10 +220,15 @@ pub trait NeighborSet {
/// Connect to a remote neighbor, and get back a reply handle which we can use to wait for a
/// handshake response. If the neighbor is already connected, then just send a handshake.
///
/// Normally, the caller would track the returned reply handle with a call to
/// `add_batch_request()`. However, this is ommitted here for callers who want to do their own
/// polling.
///
/// Return Ok(Some(handle)) if we connected.
/// Return Ok(None) if we're in the process of connecting, and should try again.
/// Return Err(..) if we fail
fn neighbor_session_begin<NK: ToNeighborKey>(
fn neighbor_session_begin_only<NK: ToNeighborKey>(
&mut self,
network: &mut PeerNetwork,
neighbor_addr: &NK,
@@ -256,9 +283,36 @@ pub trait NeighborSet {
}
}
/// Connect to a remote neighbor, optionally connecting to it first.
/// If successful, a Handshake message will be sent, and the returned HandshakeAccept or
/// HandshakeReject can be obtained with a follow up call to `collect_replies()`
///
/// Return Ok(true) if we connected.
/// Return Ok(false) if we're in the process of connecting, and should try again.
fn neighbor_session_begin(
&mut self,
network: &mut PeerNetwork,
neighbor_addr: &NeighborAddress,
neighbor_pubkh: &Hash160,
) -> Result<bool, net_error> {
let handle_opt =
self.neighbor_session_begin_only(network, neighbor_addr, neighbor_pubkh)?;
if let Some(handle) = handle_opt {
self.add_batch_request(neighbor_addr.clone(), handle);
return Ok(true);
}
// still trying
Ok(false)
}
/// Send a message to a connected neighbor.
/// Fails if the neighbor is not connected.
fn neighbor_send<NK: ToNeighborKey>(
///
/// If successful, the caller usually calls `add_batch_request()`. This
/// is not carried out here because the caller may instead want to do a blocking wait
/// with the given reply handle (or do its own batching).
fn neighbor_send_only<NK: ToNeighborKey>(
network: &mut PeerNetwork,
neighbor_addr: &NK,
msg_payload: StacksMessageType,
@@ -268,6 +322,21 @@ pub trait NeighborSet {
network.send_message(&nk, msg, network.get_connection_opts().timeout)
}
/// Send a message to a connected neighbor.
/// If successful, the reply handle is then tracked via a follow-up call to
/// `add_batch_request()`.
/// Fails if the neighbor is not connected.
fn neighbor_send(
&mut self,
network: &mut PeerNetwork,
neighbor_addr: &NeighborAddress,
msg_payload: StacksMessageType,
) -> Result<(), net_error> {
let handle = Self::neighbor_send_only(network, neighbor_addr, msg_payload)?;
self.add_batch_request(neighbor_addr.clone(), handle);
Ok(())
}
/// Try to receive a message from a peer handle.
/// On success, consume the reply handle and return the StacksMessage.
/// On error, either return the reply handle so we can try again, or return an error if we
@@ -328,6 +397,148 @@ pub trait NeighborSet {
None => Err(net_error::PeerNotConnected),
}
}
/// Reset all comms
fn reset(&mut self) {
let _ = self.take_broken_neighbors();
let _ = self.take_dead_neighbors();
self.cancel_inflight();
self.clear_pinned_connections();
}
}
/// Transport-level API for peer network state machines.
/// Prod implementation of NeighborComms.
pub struct PeerNetworkComms {
/// Set of PeerNetwork event IDs that this walk is tracking (so they won't get pruned)
events: HashSet<usize>,
/// Map of neighbors we're currently trying to connect to (binds their addresses to their event IDs)
connecting: HashMap<NeighborKey, usize>,
/// Set of neighbors that died during our comms session
dead_connections: HashSet<NeighborKey>,
/// Set of neighbors who misbehaved during our comms session
broken_connections: HashSet<NeighborKey>,
/// Ongoing batch of requests. Will be `None` if there are no inflight requests.
ongoing_batch_request: Option<NeighborCommsRequest>,
}
impl PeerNetworkComms {
pub fn new() -> PeerNetworkComms {
PeerNetworkComms {
events: HashSet::new(),
connecting: HashMap::new(),
dead_connections: HashSet::new(),
broken_connections: HashSet::new(),
ongoing_batch_request: None,
}
}
}
impl NeighborComms for PeerNetworkComms {
fn add_connecting<NK: ToNeighborKey>(
&mut self,
network: &PeerNetwork,
nk: &NK,
event_id: usize,
) {
self.connecting
.insert(nk.to_neighbor_key(network), event_id);
self.pin_connection(event_id);
}
fn get_connecting<NK: ToNeighborKey>(&self, network: &PeerNetwork, nk: &NK) -> Option<usize> {
self.connecting
.get(&nk.to_neighbor_key(network))
.map(|event_ref| *event_ref)
}
fn remove_connecting<NK: ToNeighborKey>(&mut self, network: &PeerNetwork, nk: &NK) {
let event_id_opt = self.connecting.remove(&nk.to_neighbor_key(network));
if let Some(event_id) = event_id_opt {
self.unpin_connection(event_id);
}
}
fn add_dead<NK: ToNeighborKey>(&mut self, network: &PeerNetwork, nk: &NK) {
self.dead_connections.insert(nk.to_neighbor_key(network));
}
fn add_broken<NK: ToNeighborKey>(&mut self, network: &PeerNetwork, nk: &NK) {
self.broken_connections.insert(nk.to_neighbor_key(network));
}
fn pin_connection(&mut self, event_id: usize) {
self.events.insert(event_id);
}
fn unpin_connection(&mut self, event_id: usize) {
self.events.remove(&event_id);
}
fn get_pinned_connections(&self) -> &HashSet<usize> {
&self.events
}
fn clear_pinned_connections(&mut self) -> HashSet<usize> {
let events = mem::replace(&mut self.events, HashSet::new());
events
}
fn is_pinned(&self, event_id: usize) -> bool {
self.events.contains(&event_id)
}
fn add_batch_request(&mut self, naddr: NeighborAddress, rh: ReplyHandleP2P) {
if let Some(ref mut batch) = self.ongoing_batch_request.as_mut() {
batch.add(naddr, rh);
} else {
let mut batch = NeighborCommsRequest::new();
batch.add(naddr, rh);
self.ongoing_batch_request = Some(batch);
}
}
fn count_inflight(&self) -> usize {
self.ongoing_batch_request
.as_ref()
.map(|batch| batch.count_inflight())
.unwrap_or(0)
}
fn collect_replies(
&mut self,
network: &mut PeerNetwork,
) -> Vec<(NeighborAddress, StacksMessage)> {
let mut ret = vec![];
let mut clear = false;
let mut ongoing_batch_request = self.ongoing_batch_request.take();
if let Some(batch) = ongoing_batch_request.as_mut() {
ret.extend(batch.new_replies(self, network));
if batch.count_inflight() == 0 {
clear = true;
}
}
if clear {
self.ongoing_batch_request = None;
} else {
self.ongoing_batch_request = ongoing_batch_request;
}
ret
}
fn cancel_inflight(&mut self) {
self.ongoing_batch_request = None;
}
fn take_dead_neighbors(&mut self) -> HashSet<NeighborKey> {
let dead = mem::replace(&mut self.dead_connections, HashSet::new());
dead
}
fn take_broken_neighbors(&mut self) -> HashSet<NeighborKey> {
let broken = mem::replace(&mut self.broken_connections, HashSet::new());
broken
}
}
/// This is a helper trait to ensure that a given struct can be turned into a NeighborKey for the
@@ -360,20 +571,20 @@ impl ToNeighborKey for NeighborAddress {
/// This struct represents a batch of in-flight requests to a set of peers, identified by a
/// neighbor key (or something that converts to it)
#[derive(Debug)]
pub struct NeighborSetRequest {
pub struct NeighborCommsRequest {
state: HashMap<NeighborAddress, ReplyHandleP2P>,
}
/// This struct represents everything we need to iterate through a set of ongoing requests, in
/// order to pull out completed replies.
pub struct NeighborSetMessageIterator<'a, NS: NeighborSet> {
pub struct NeighborCommsMessageIterator<'a, NS: NeighborComms> {
network: &'a mut PeerNetwork,
state: &'a mut HashMap<NeighborAddress, ReplyHandleP2P>,
neighbor_set: &'a mut NS,
}
/// This is an iterator over completed requests
impl<NS: NeighborSet> Iterator for NeighborSetMessageIterator<'_, NS> {
impl<NS: NeighborComms> Iterator for NeighborCommsMessageIterator<'_, NS> {
type Item = (NeighborAddress, StacksMessage);
fn next(&mut self) -> Option<Self::Item> {
@@ -408,7 +619,7 @@ impl<NS: NeighborSet> Iterator for NeighborSetMessageIterator<'_, NS> {
}
};
if NeighborSetRequest::is_message_stale(&message, stable_block_height) {
if NeighborCommsRequest::is_message_stale(&message, stable_block_height) {
debug!(
"{:?}: Remote neighbor {:?} is still bootstrapping (at block {})",
&self.network.get_local_peer(),
@@ -425,9 +636,9 @@ impl<NS: NeighborSet> Iterator for NeighborSetMessageIterator<'_, NS> {
}
}
impl NeighborSetRequest {
pub fn new() -> NeighborSetRequest {
NeighborSetRequest {
impl NeighborCommsRequest {
pub fn new() -> NeighborCommsRequest {
NeighborCommsRequest {
state: HashMap::new(),
}
}
@@ -443,12 +654,12 @@ impl NeighborSetRequest {
}
/// Iterate over all in-flight requests
pub fn new_replies<'a, NS: NeighborSet>(
pub fn new_replies<'a, NS: NeighborComms>(
&'a mut self,
neighbor_set: &'a mut NS,
network: &'a mut PeerNetwork,
) -> NeighborSetMessageIterator<NS> {
NeighborSetMessageIterator {
) -> NeighborCommsMessageIterator<NS> {
NeighborCommsMessageIterator {
network,
state: &mut self.state,
neighbor_set,
@@ -460,76 +671,3 @@ impl NeighborSetRequest {
self.state.len()
}
}
/// Transport-level API for the neighbor walk
impl<DB: NeighborWalkDB> NeighborWalk<DB> {
/// Try and get the next reply from the single neighbor we're talking to
pub(crate) fn try_get_reply(
&mut self,
network: &mut PeerNetwork,
) -> Result<Option<StacksMessage>, net_error> {
let mut req_opt = self.ongoing_request.take();
let cur_neighbor_addr = mem::replace(&mut self.cur_neighbor.addr, NeighborKey::empty());
let msg_opt = self.poll_next_reply(network, &cur_neighbor_addr, &mut req_opt)?;
self.ongoing_request = req_opt;
self.cur_neighbor.addr = cur_neighbor_addr;
if let Some(message) = msg_opt.as_ref() {
if NeighborSetRequest::is_message_stale(
&message,
network.get_chain_view().burn_stable_block_height,
) {
test_debug!(
"{:?}: neighbor {:?} is still bootstrapping (on block {})",
network.get_local_peer(),
&self.cur_neighbor.addr,
message.preamble.burn_block_height
);
return Err(net_error::StaleNeighbor);
}
}
Ok(msg_opt)
}
/// Add a request to the ongoing batch request, instantiating it if need be
pub(crate) fn add_batch_request(&mut self, naddr: NeighborAddress, rh: ReplyHandleP2P) {
if let Some(ref mut batch) = self.ongoing_batch_request.as_mut() {
batch.add(naddr, rh);
} else {
let mut batch = NeighborSetRequest::new();
batch.add(naddr, rh);
self.ongoing_batch_request = Some(batch);
}
}
/// Count up the number of batched requests in-flight
pub(crate) fn count_inflight(&self) -> usize {
self.ongoing_batch_request
.as_ref()
.map(|batch| batch.count_inflight())
.unwrap_or(0)
}
/// Collect all newly-arrived replies from the ongoing network batch request
pub(crate) fn collect_replies(
&mut self,
network: &mut PeerNetwork,
) -> Vec<(NeighborAddress, StacksMessage)> {
let mut ret = vec![];
let mut clear = false;
let mut ongoing_batch_request = self.ongoing_batch_request.take();
if let Some(batch) = ongoing_batch_request.as_mut() {
ret.extend(batch.new_replies(self, network));
if batch.count_inflight() == 0 {
clear = true;
}
}
if clear {
self.ongoing_batch_request = None;
} else {
self.ongoing_batch_request = ongoing_batch_request;
}
ret
}
}