diff --git a/src/net/neighbors/walk.rs b/src/net/neighbors/walk.rs
new file mode 100644
index 000000000..f3f5622c0
--- /dev/null
+++ b/src/net/neighbors/walk.rs
@@ -0,0 +1,2342 @@
+// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
+// Copyright (C) 2020-2023 Stacks Open Internet Foundation
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use crate::net::db::PeerDB;
+
+use crate::net::{
+ neighbors::{
+ NeighborSet, ToNeighborKey, MAX_NEIGHBOR_BLOCK_DELAY, NEIGHBOR_MINIMUM_CONTACT_INTERVAL,
+ },
+ Error as net_error, HandshakeAcceptData, HandshakeData, MessageSequence, Neighbor,
+ NeighborAddress, NeighborKey, PeerAddress, StackerDBHandshakeData, StacksMessage,
+ StacksMessageType, NUM_NEIGHBORS,
+};
+
+use crate::net::neighbors::NeighborSetRequest;
+
+use crate::net::connection::ConnectionOptions;
+use crate::net::connection::ReplyHandleP2P;
+
+use crate::net::db::LocalPeer;
+
+use crate::net::p2p::PeerNetwork;
+
+use crate::util_lib::db::DBConn;
+use crate::util_lib::db::DBTx;
+use crate::util_lib::db::Error as db_error;
+
+use stacks_common::util::secp256k1::Secp256k1PublicKey;
+
+use std::cmp;
+use std::mem;
+
+use std::collections::HashMap;
+use std::collections::HashSet;
+
+use crate::burnchains::Address;
+use crate::burnchains::Burnchain;
+use crate::burnchains::BurnchainView;
+use crate::burnchains::PublicKey;
+
+use rand::prelude::*;
+use rand::thread_rng;
+use stacks_common::types::chainstate::StacksPublicKey;
+use stacks_common::util::get_epoch_time_secs;
+use stacks_common::util::hash::Hash160;
+use stacks_common::util::log;
+
+/// This struct records information from an inbound peer that has authenticated to this node. As
+/// new remote nodes connect, this node will remember this state for them so that the neighbor walk
+/// logic can try to ask them for neighbors. This enables a public peer to ask a NAT'ed peer for
+/// its neighbors.
+#[derive(Debug, PartialEq, Clone)]
+pub struct NeighborPingback {
+ pub ts: u64, // when we discovered this neighbor to ping back
+ pub peer_version: u32, // peer version of neighbor to ping back
+ pub network_id: u32, // network ID of neighbor to ping back
+ pub pubkey: StacksPublicKey, // public key of neighbor to ping back
+}
+
+/// Struct for capturing the results of a walk.
+/// -- reports newly-connected neighbors
+/// -- reports neighbors we had trouble talking to.
+/// The peer network will use this struct to clean out dead neighbors, and to keep the number of
+/// _outgoing_ connections limited to NUM_NEIGHBORS.
+#[derive(Clone, Debug)]
+pub struct NeighborWalkResult {
+ /// Newly-added node neighbors
+ pub new_connections: HashSet,
+ /// Dead connections discovered (so we can close their sockets)
+ pub dead_connections: HashSet,
+ /// Connections to misbehaving peers (so we can close their sockets and ban them)
+ pub broken_connections: HashSet,
+ /// Neighbors who got replaced in the PeerDB because they were offline, but mapped to a new
+ /// peer that was online and had the same slot locations
+ pub replaced_neighbors: HashSet,
+}
+
+impl NeighborWalkResult {
+ pub fn new() -> NeighborWalkResult {
+ NeighborWalkResult {
+ new_connections: HashSet::new(),
+ dead_connections: HashSet::new(),
+ broken_connections: HashSet::new(),
+ replaced_neighbors: HashSet::new(),
+ }
+ }
+
+ pub fn add_new(&mut self, nk: NeighborKey) -> () {
+ self.new_connections.insert(nk);
+ }
+
+ pub fn add_broken(&mut self, nk: NeighborKey) -> () {
+ self.broken_connections.insert(nk);
+ }
+
+ pub fn add_dead(&mut self, nk: NeighborKey) -> () {
+ self.dead_connections.insert(nk);
+ }
+
+ pub fn add_replaced(&mut self, nk: NeighborKey) -> () {
+ self.replaced_neighbors.insert(nk);
+ }
+
+ pub fn clear(&mut self) -> () {
+ self.new_connections.clear();
+ self.dead_connections.clear();
+ self.broken_connections.clear();
+ self.replaced_neighbors.clear();
+ }
+}
+
+#[derive(Debug, PartialEq, Clone, Copy)]
+pub enum NeighborWalkState {
+ HandshakeBegin,
+ HandshakeFinish,
+ GetNeighborsBegin,
+ GetNeighborsFinish,
+ GetHandshakesBegin,
+ GetHandshakesFinish,
+ GetNeighborsNeighborsBegin,
+ GetNeighborsNeighborsFinish,
+ PingbackHandshakesBegin,
+ PingbackHandshakesFinish,
+ ReplacedNeighborsPingBegin,
+ ReplacedNeighborsPingFinish,
+ Finished,
+}
+
+// TODO: NeighborWalkState should be refactored so that its members simply contain the relevant
+// parts of this struct. This struct, as well as walk statistics kept in PeerNetwork, should live
+// entirely within this struct.
+/// A struct representing the ongoing state of the neighbor walk. The peer node continuously
+/// attempts to connect to peers in its frontier at random (including inbound peers) to discover
+/// other peers it can reach. The walk uses a variation of Metropolis-Hastings random graph walk
+/// in order to calculate a random subset of the total set of peers. The high-level steps are:
+///
+/// 1. Handshake with the current neighbor (if there is no current neighbor, then pick one at random)
+/// 2. Ask it for its neighbors
+/// 3. Handshake with each neighbor
+/// 4. Ask each neighbor for its neighbors
+/// 5. Calculate the ratio of in-degree to out-degree for each neighbor, and then flip a coin. If
+/// heads, then keep the current neighbor as-is. If tails, then
+#[derive(Debug)]
+pub struct NeighborWalk {
+ /// Current state of the walk
+ pub state: NeighborWalkState,
+ /// Set of PeerNetwork event IDs that this walk is tracking (so they won't get pruned)
+ pub events: HashSet,
+
+ /// Map of neighbors we're currently trying to connect to (binds their addresses to their event IDs)
+ connecting: HashMap,
+
+ /// Addresses of neighbors resolved by GetNeighborsBegin/GetNeighborsFinish
+ pending_neighbor_addrs: Option>,
+
+ /// Last neighbor visited
+ prev_neighbor: Option,
+ /// Current neighbor we're querying
+ pub(crate) cur_neighbor: Neighbor,
+ /// Next neighbor we're going to query
+ next_neighbor: Option,
+
+ /// Whether or not the next walk should start with an outbound peer or inbound peer
+ next_walk_outbound: bool,
+ /// Whether or not we have an outbound connection to cur_neighbor
+ walk_outbound: bool,
+
+ /// This is the value of cur_neighbor as returned by its HandshakeAccept.
+ /// It might be different than our value.
+ neighbor_from_handshake: NeighborKey,
+
+ /// current neighbor's frontier, built up when querying `cur_neighbor`'s neighbors
+ pub frontier: HashMap,
+ /// newly-discovered neighbors-of-neighbors of `cur_neighbor`
+ new_frontier: HashMap,
+
+ /// Ongoing single request
+ pub(crate) ongoing_request: Option,
+ /// Ongoing batch request
+ pub(crate) ongoing_batch_request: Option,
+
+ /// GetHandshakesBegin / GetHandshakesFinish: outstanding requests to handshake with our cur_neighbor's neighbors.
+ resolved_handshake_neighbors: HashMap,
+ handshake_neighbor_addrs: Vec,
+
+ /// GetNeighborsNeighborsBegin / GetNeighborsNeighborsFinish:
+ /// outstanding requests to get the neighbors of our cur_neighbor's neighbors
+ resolved_getneighbors_neighbors: HashMap>,
+
+ /// ReplacedNeighborsPingBegin / ReplacedNeighborsPingFinish:
+ /// outstanding requests to ping existing neighbors to be replaced in the frontier
+ neighbor_replacements: HashMap,
+ replaced_neighbors: HashMap,
+
+ /// PingbackHandshakesBegin / PingbackHandshakesFinish:
+ /// outstanding requests to new inbound peers
+ network_pingbacks: HashMap, // taken from the network at instantiation. Maps address to (peer version, network ID, timestamp)
+
+ /// neighbor walk result we build up incrementally
+ pub result: NeighborWalkResult,
+
+ /// time that we started/finished the last walk
+ walk_start_time: u64,
+ walk_end_time: u64,
+
+ /// walk random-restart parameters
+ pub(crate) walk_step_count: u64, // how many times we've taken a step
+ pub(crate) walk_min_duration: u64, // minimum steps we have to take before reset
+ pub(crate) walk_max_duration: u64, // maximum steps we have to take before reset
+ pub(crate) walk_reset_prob: f64, // probability that we do a reset once the minimum duration is met
+ pub(crate) walk_instantiation_time: u64,
+ pub(crate) walk_reset_interval: u64, // how long a walk can last, in wall-clock time
+ pub(crate) walk_state_time: u64, // when the walk entered this state
+ pub(crate) walk_state_timeout: u64, // how long the walk can remain in this state
+}
+
+/// Database I/O helpers
+impl NeighborWalk {
+ /// Get some initial fresh random neighbor(s) to crawl,
+ /// given the number of neighbors and current burn block height.
+ ///
+ /// Returns a list of one or more neighbors on success.
+ /// Returns NoSuchNeighbor if there are no known neighbors
+ /// Returns DBError if there's a problem reading the DB
+ fn get_random_neighbors(
+ network: &PeerNetwork,
+ num_neighbors: u64,
+ block_height: u64,
+ ) -> Result, net_error> {
+ let cur_epoch = network.get_current_epoch();
+ let neighbors = PeerDB::get_random_walk_neighbors(
+ &network.peerdb.conn(),
+ network.get_local_peer().network_id,
+ cur_epoch.network_epoch,
+ num_neighbors as u32,
+ block_height,
+ )
+ .map_err(net_error::DBError)?;
+
+ if neighbors.len() == 0 {
+ debug!(
+ "{:?}: No neighbors available in the peer DB!",
+ network.get_local_peer()
+ );
+ return Err(net_error::NoSuchNeighbor);
+ }
+ Ok(neighbors)
+ }
+
+ /// Get a random starting neighbor for the walk.
+ /// Older but still fresh neighbors will be preferred.
+ fn get_first_walk_neighbor(network: &PeerNetwork) -> Result {
+ // pick a random neighbor as a walking point.
+ // favor neighbors with older last-contact times
+ let mut next_neighbors = Self::get_random_neighbors(
+ network,
+ (NUM_NEIGHBORS as u64) * 2,
+ network.get_chain_view().burn_block_height,
+ )
+ .map_err(|e| {
+ debug!(
+ "{:?}: Failed to load initial walk neighbors: {:?}",
+ network.get_local_peer(),
+ &e
+ );
+ e
+ })?;
+
+ if next_neighbors.len() == 0 {
+ return Err(net_error::NoSuchNeighbor);
+ }
+
+ next_neighbors.sort_by(|n1, n2| n1.last_contact_time.cmp(&n2.last_contact_time));
+ let median_neighbor_idx = next_neighbors.len() / 2;
+ let random_neighbor_idx = if median_neighbor_idx > 0 {
+ thread_rng().gen::() % median_neighbor_idx
+ } else {
+ 0
+ };
+
+ Ok(next_neighbors[random_neighbor_idx].clone())
+ }
+
+ /// Find the neighbor addresses that we need to resolve to neighbors,
+ /// and find out the neighbor addresses that we already have fresh neighbor data for.
+ /// If we know of a neighbor, and contacted it recently, then consider it resolved _even if_
+ /// the reported NeighborAddress public key hash doesn't match our records.
+ fn lookup_stale_neighbors(
+ dbconn: &DBConn,
+ network_id: u32,
+ block_height: u64,
+ addrs: &Vec,
+ ) -> Result<(HashMap, Vec), net_error> {
+ let mut to_resolve = vec![];
+ let mut resolved: HashMap = HashMap::new();
+ for naddr in addrs {
+ let neighbor_opt =
+ Neighbor::from_neighbor_address(dbconn, network_id, block_height, naddr)?;
+
+ if let Some(neighbor) = neighbor_opt {
+ // already know about this neighbor, so look at its last contact time
+ if neighbor.last_contact_time + NEIGHBOR_MINIMUM_CONTACT_INTERVAL
+ < get_epoch_time_secs()
+ {
+ // stale
+ to_resolve.push((*naddr).clone());
+ } else {
+ // our copy is still fresh
+ resolved.insert(naddr.clone(), neighbor);
+ }
+ continue;
+ }
+
+ // need to resolve this one, but don't talk to it if we did so recently (even
+ // if we have stale information for it -- the remote node could be trying to trick
+ // us into DDoS'ing this node).
+ let peer_opt = PeerDB::get_peer(dbconn, network_id, &naddr.addrbytes, naddr.port)
+ .map_err(net_error::DBError)?;
+
+ if let Some(n) = peer_opt {
+ // we know about this neighbor, but its key didn't match the
+ // neighboraddress. Only try to re-connect with it if we haven't done
+ // so recently, so a rogue neighbor can't force us to DDoS another
+ // peer.
+ if n.last_contact_time + NEIGHBOR_MINIMUM_CONTACT_INTERVAL < get_epoch_time_secs() {
+ to_resolve.push((*naddr).clone());
+ } else {
+ // recently contacted
+ resolved.insert(naddr.clone(), n);
+ }
+ } else {
+ // okay, we really don't know about this neighbor
+ to_resolve.push((*naddr).clone());
+ }
+ }
+ Ok((resolved, to_resolve))
+ }
+
+ /// Select neighbors that are routable, and ignore ones that are not.
+ /// TODO: expand if we ever want to filter by unroutable network class or something
+ fn filter_sensible_neighbors(neighbors: Vec) -> Vec {
+ let mut ret = vec![];
+ for neighbor in neighbors.into_iter() {
+ if neighbor.addrbytes.is_anynet() {
+ continue;
+ }
+ ret.push(neighbor);
+ }
+ ret
+ }
+
+ /// Given a neighbor we tried to insert into the peer database, find one of the existing
+ /// neighbors it collided with. Return its slot in the peer db.
+ fn find_replaced_neighbor_slot(
+ conn: &DBConn,
+ nk: &NeighborKey,
+ ) -> Result, net_error> {
+ let mut slots = PeerDB::peer_slots(conn, nk.network_id, &nk.addrbytes, nk.port)
+ .map_err(net_error::DBError)?;
+
+ if slots.len() == 0 {
+ // not present
+ return Ok(None);
+ }
+
+ let mut rng = thread_rng();
+ slots.shuffle(&mut rng);
+
+ for slot in slots {
+ let peer_opt =
+ PeerDB::get_peer_at(conn, nk.network_id, slot).map_err(net_error::DBError)?;
+
+ match peer_opt {
+ None => {
+ continue;
+ }
+ Some(_) => {
+ return Ok(Some(slot));
+ }
+ }
+ }
+
+ Ok(None)
+ }
+
+ /// Add a neighbor or schedule it to be pinged since it's up for replacement.
+ /// Returns (was-new?, neighbor)
+ fn add_or_schedule_replace_neighbor<'a>(
+ &mut self,
+ tx: &mut DBTx<'a>,
+ block_height: u64,
+ naddr: &NeighborAddress,
+ peer_version: u32,
+ network_id: u32,
+ handshake: &HandshakeData,
+ db_data: Option<&StackerDBHandshakeData>,
+ ) -> Result<(bool, Neighbor), net_error> {
+ let mut neighbor_from_handshake =
+ Neighbor::from_handshake(tx, peer_version, network_id, handshake)?;
+
+ if Neighbor::from_neighbor_address(tx, network_id, block_height, naddr)?.is_some() {
+ test_debug!("already know about {:?}", naddr);
+ neighbor_from_handshake
+ .save_update(tx, db_data.map(|x| x.smart_contracts.as_slice()))?;
+
+ // seen this neighbor before
+ return Ok((false, neighbor_from_handshake));
+ }
+
+ debug!("new neighbor {:?}", &neighbor_from_handshake.addr);
+
+ // didn't know about this neighbor yet. Try to add it.
+ let added =
+ neighbor_from_handshake.save(tx, db_data.map(|x| x.smart_contracts.as_slice()))?;
+
+ if added {
+ // neighbor was new, and we had space to add it.
+ return Ok((true, neighbor_from_handshake));
+ }
+
+ // neighbor was new, but we don't have space to insert it.
+ // find and record a neighbor it would replace.
+ let replaced_neighbor_slot_opt =
+ NeighborWalk::find_replaced_neighbor_slot(tx, &neighbor_from_handshake.addr)?;
+ if let Some(slot) = replaced_neighbor_slot_opt {
+ // if this peer isn't allowed or denied, then consider
+ // replacing. Otherwise, keep the local configuration's preference.
+ if !neighbor_from_handshake.is_denied() && !neighbor_from_handshake.is_allowed() {
+ self.neighbor_replacements.insert(
+ NeighborAddress::from_neighbor(&neighbor_from_handshake),
+ neighbor_from_handshake.clone(),
+ );
+ self.replaced_neighbors.insert(
+ NeighborAddress::from_neighbor(&neighbor_from_handshake),
+ slot,
+ );
+ }
+ }
+
+ // neighbor was new
+ Ok((true, neighbor_from_handshake))
+ }
+}
+
+/// Constructors and state-machine mechanics
+impl NeighborWalk {
+ pub fn new(
+ neighbor: &Neighbor,
+ outbound: bool,
+ pingbacks: HashMap,
+ connection_opts: &ConnectionOptions,
+ ) -> NeighborWalk {
+ NeighborWalk {
+ state: NeighborWalkState::HandshakeBegin,
+ events: HashSet::new(),
+
+ connecting: HashMap::new(),
+ pending_neighbor_addrs: None,
+
+ prev_neighbor: None,
+ cur_neighbor: neighbor.clone(),
+ next_neighbor: None,
+ next_walk_outbound: true,
+ walk_outbound: outbound,
+ neighbor_from_handshake: NeighborKey::empty(),
+
+ frontier: HashMap::new(),
+ new_frontier: HashMap::new(),
+
+ ongoing_request: None,
+ ongoing_batch_request: None,
+
+ resolved_handshake_neighbors: HashMap::new(),
+ handshake_neighbor_addrs: vec![],
+
+ resolved_getneighbors_neighbors: HashMap::new(),
+
+ neighbor_replacements: HashMap::new(),
+ replaced_neighbors: HashMap::new(),
+
+ network_pingbacks: pingbacks,
+
+ result: NeighborWalkResult::new(),
+
+ walk_start_time: get_epoch_time_secs(),
+ walk_end_time: 0,
+
+ walk_step_count: 0,
+ walk_min_duration: connection_opts.walk_min_duration,
+ walk_max_duration: connection_opts.walk_max_duration,
+ walk_reset_prob: connection_opts.walk_reset_prob,
+ walk_instantiation_time: get_epoch_time_secs(),
+ walk_reset_interval: connection_opts.walk_reset_interval,
+ walk_state_time: get_epoch_time_secs(),
+ walk_state_timeout: connection_opts.walk_state_timeout,
+ }
+ }
+
+ /// Instantiate the neighbor walk from a neighbor routable from us.
+ /// Returns the walk on success.
+ /// Returns NoSuchNeighbor if there's no neighbors available
+ /// Returns DBError if we can't read the peer DB
+ pub(crate) fn instantiate_walk(network: &PeerNetwork) -> Result {
+ let first_neighbor = Self::get_first_walk_neighbor(network)?;
+ let w = NeighborWalk::new(
+ &first_neighbor,
+ true,
+ network.walk_pingbacks.clone(),
+ &network.get_connection_opts(),
+ );
+
+ debug!(
+ "{:?}: instantiated neighbor walk to outbound peer {:?}",
+ network.get_local_peer(),
+ &first_neighbor
+ );
+
+ Ok(w)
+ }
+
+ /// Instantiate the neighbor walk to an always-allowed node.
+ /// If we're in the initial block download, then this must also be a *bootstrap* peer.
+ /// Returns the neighbor walk on success
+ /// Returns NotFoundError if no always-allwed neighbors are in the DB.
+ /// Returns DBError if there's a problem querying the DB
+ pub(crate) fn instantiate_walk_to_always_allowed(
+ network: &PeerNetwork,
+ ibd: bool,
+ ) -> Result {
+ let mut allowed_peers = if ibd {
+ // only get bootstrap peers (will be randomized)
+ PeerDB::get_bootstrap_peers(
+ &network.peerdb.conn(),
+ network.get_local_peer().network_id,
+ )?
+ } else {
+ // can be any peer marked 'always-allowed' (will be randomized)
+ PeerDB::get_always_allowed_peers(
+ network.peerdb.conn(),
+ network.get_local_peer().network_id,
+ )?
+ };
+
+ let allowed_peer = if let Some(peer) = allowed_peers.pop() {
+ peer
+ } else {
+ // no allowed peers in DB. Try a different strategy
+ return Err(net_error::NotFoundError);
+ };
+
+ debug!(
+ "Will (re-)connect to always-allowed peer {:?}",
+ &allowed_peer.addr
+ );
+ let w = NeighborWalk::new(
+ &allowed_peer,
+ true,
+ network.walk_pingbacks.clone(),
+ &network.get_connection_opts(),
+ );
+
+ debug!(
+ "{:?}: instantiated neighbor walk to always-allowed peer {:?}",
+ network.get_local_peer(),
+ &allowed_peer
+ );
+ Ok(w)
+ }
+
+ /// Instantiate a neighbor walk, but use an inbound neighbor instead of a neighbor from our
+ /// peer DB. This helps a public node discover other public nodes, by asking a private node
+ /// for its neighbors (which can include other public nodes).
+ /// If an inbound connection is found, then return the walk to it.
+ /// Otherwise, return NoSuchNeighbor
+ pub(crate) fn instantiate_walk_from_inbound(
+ network: &PeerNetwork,
+ ) -> Result {
+ if network.get_num_p2p_convos() == 0 {
+ debug!(
+ "{:?}: failed to begin inbound neighbor walk: no one's connected to us",
+ network.get_local_peer()
+ );
+ return Err(net_error::NoSuchNeighbor);
+ }
+
+ // pick a random search index
+ let mut idx = thread_rng().gen::() % network.get_num_p2p_convos();
+
+ test_debug!(
+ "{:?}: try inbound neighbors -- sample out of {}. idx = {}",
+ network.get_local_peer(),
+ network.get_num_p2p_convos(),
+ idx
+ );
+
+ // find an inbound connection
+ for _ in 0..network.walk_pingbacks.len() + 1 {
+ let event_id = match network.peers.keys().skip(idx).next() {
+ Some(eid) => *eid,
+ None => {
+ idx = 0;
+ continue;
+ }
+ };
+ idx = (idx + 1) % network.get_num_p2p_convos();
+
+ let convo = network
+ .peers
+ .get(&event_id)
+ .expect("BUG: no conversation for event ID key");
+
+ if convo.is_outbound() || !convo.is_authenticated() {
+ test_debug!(
+ "{:?}: skip outbound and/or unauthenticated neighbor {}",
+ network.get_local_peer(),
+ &convo.to_neighbor_key()
+ );
+ continue;
+ }
+
+ // found!
+ let pubkey = convo
+ .get_public_key()
+ .expect("BUG: authenticated conversation without public key");
+
+ let nk = convo.to_neighbor_key();
+ let empty_neighbor = Neighbor::empty(&nk, &pubkey, 0);
+ let w = NeighborWalk::new(
+ &empty_neighbor,
+ false,
+ network.walk_pingbacks.clone(),
+ &network.get_connection_opts(),
+ );
+
+ debug!(
+ "{:?}: instantiated neighbor walk to inbound peer {}",
+ network.get_local_peer(),
+ &nk
+ );
+
+ return Ok(w);
+ }
+
+ // no inbound peers
+ return Err(net_error::NoSuchNeighbor);
+ }
+
+ /// Instantiate a neighbor walk, but go straight to the pingback logic (i.e. we don't have any
+ /// immediate neighbors). That is, try to connect and step to a node that connected to us.
+ /// The returned neighbor walk will be in the PingabckHandshakesBegin state.
+ ///
+ /// Returns the new walk, if we have any pingbacks to connect to.
+ /// Returns NoSuchNeighbor if there are no pingbacks to choose from
+ /// Return Denied if the chosen pingback peer is blocked
+ pub(crate) fn instantiate_walk_from_pingback(
+ network: &PeerNetwork,
+ ) -> Result {
+ if network.walk_pingbacks.len() == 0 {
+ return Err(net_error::NoSuchNeighbor);
+ }
+
+ // random search
+ let idx = thread_rng().gen::() % network.walk_pingbacks.len();
+
+ test_debug!(
+ "{:?}: try pingback candidates -- sample out of {}. idx = {}",
+ network.get_local_peer(),
+ network.walk_pingbacks.len(),
+ idx
+ );
+
+ let (addr, pingback_peer) = match network.walk_pingbacks.iter().skip(idx).next() {
+ Some((ref addr, ref pingback_peer)) => (addr.clone(), pingback_peer.clone()),
+ None => {
+ return Err(net_error::NoSuchNeighbor);
+ }
+ };
+
+ let nk = NeighborKey::from_neighbor_address(
+ pingback_peer.peer_version,
+ pingback_peer.network_id,
+ &addr,
+ );
+
+ // don't proceed if denied
+ if PeerDB::is_peer_denied(
+ &network.peerdb.conn(),
+ nk.network_id,
+ &nk.addrbytes,
+ nk.port,
+ )? {
+ debug!(
+ "{:?}: pingback neighbor {:?} is denied",
+ network.get_local_peer(),
+ &nk
+ );
+ return Err(net_error::Denied);
+ }
+
+ // (this will be ignored by the neighbor walk)
+ let empty_neighbor = Neighbor::empty(&nk, &pingback_peer.pubkey, 0);
+
+ let mut w = NeighborWalk::new(
+ &empty_neighbor,
+ false,
+ network.walk_pingbacks.clone(),
+ &network.get_connection_opts(),
+ );
+
+ debug!(
+ "{:?}: instantiated neighbor walk to {} for pingback only",
+ network.get_local_peer(),
+ &nk
+ );
+
+ w.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::PingbackHandshakesBegin,
+ )?;
+ Ok(w)
+ }
+
+ /// Reset the walk with a new neighbor.
+ /// Give back a report of the walk.
+ /// Resets neighbor pointer.
+ /// Clears out connections, but preserves state (frontier, result, etc.).
+ fn reset(
+ &mut self,
+ local_peer: &LocalPeer,
+ next_neighbor: Neighbor,
+ next_neighbor_outbound: bool,
+ ) -> NeighborWalkResult {
+ test_debug!(
+ "{:?}: Walk reset to {} neighbor {:?}",
+ local_peer,
+ if self.next_walk_outbound {
+ "outbound"
+ } else {
+ "inbound"
+ },
+ &next_neighbor.addr
+ );
+ self.state = NeighborWalkState::HandshakeBegin;
+ self.walk_state_time = get_epoch_time_secs();
+
+ if self.cur_neighbor != next_neighbor {
+ // moving on -- clear frontier
+ self.frontier.clear();
+ }
+
+ self.prev_neighbor = Some(self.cur_neighbor.clone());
+ self.cur_neighbor = next_neighbor;
+ self.walk_outbound = next_neighbor_outbound;
+ self.next_neighbor = None;
+
+ self.clear_connections(local_peer);
+ self.new_frontier.clear();
+
+ let result = self.result.clone();
+
+ self.walk_end_time = get_epoch_time_secs();
+
+ // leave self.frontier and self.result alone until the next walk.
+ // (makes it so that at the end of the walk, we can query the result and frontier, which
+ // get built up over successive passes of the state-machine)
+ result
+ }
+
+ /// Clear the walk's connection state
+ fn clear_connections(&mut self, _local_peer: &LocalPeer) -> () {
+ test_debug!("{:?}: Walk clear connections", _local_peer);
+ self.events.clear();
+ self.connecting.clear();
+ self.pending_neighbor_addrs = None;
+
+ self.ongoing_request = None;
+ self.ongoing_batch_request = None;
+
+ self.resolved_handshake_neighbors.clear();
+ self.handshake_neighbor_addrs.clear();
+
+ self.resolved_getneighbors_neighbors.clear();
+
+ self.neighbor_replacements.clear();
+ self.replaced_neighbors.clear();
+
+ self.network_pingbacks.clear();
+ }
+
+ /// Update the state of the walk.
+ /// If the code spent too much time in one state, then the walk will fail with StepTimeout
+ fn set_state(
+ &mut self,
+ _local_peer: &LocalPeer,
+ new_state: NeighborWalkState,
+ ) -> Result<(), net_error> {
+ if self.walk_state_time + self.walk_state_timeout < get_epoch_time_secs() {
+ return Err(net_error::StepTimeout);
+ }
+
+ test_debug!(
+ "{:?}: Advance walk state: {:?} --> {:?} (after {} seconds)",
+ _local_peer,
+ &self.state,
+ &new_state,
+ get_epoch_time_secs().saturating_sub(self.walk_state_time)
+ );
+ self.state = new_state;
+ self.walk_state_time = get_epoch_time_secs();
+ Ok(())
+ }
+
+ /// Begin handshaking with our current neighbor.
+ /// On success, return Ok(true) and transition to HandshakeFinish
+ /// On failure, return an Err(...)
+ /// If we're not yet connected, return Ok(false). The caller should try again.
+ pub fn handshake_begin(&mut self, network: &mut PeerNetwork) -> Result {
+ if self.ongoing_request.is_some() {
+ // in progress already
+ return Ok(true);
+ }
+
+ // if cur_neighbor is _us_, then grab a different neighbor and try again
+ if Hash160::from_node_public_key(&self.cur_neighbor.public_key)
+ == Hash160::from_node_public_key(&Secp256k1PublicKey::from_private(
+ &network.get_local_peer().private_key,
+ ))
+ {
+ test_debug!(
+ "{:?}: Walk stepped to ourselves. Will reset instead.",
+ network.get_local_peer()
+ );
+ return Err(net_error::NoSuchNeighbor);
+ }
+
+ // if cur_neighbor is our bind address, then grab a different neighbor and try
+ // again
+ if network.is_bound(&self.cur_neighbor.addr) {
+ debug!(
+ "{:?}: Walk stepped to our bind address ({:?}). Will reset instead.",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr
+ );
+ return Err(net_error::NoSuchNeighbor);
+ }
+
+ // if cur_neighbor is an anynet address, then grab a different neighbor and try
+ // again
+ if self.cur_neighbor.addr.addrbytes.is_anynet() {
+ debug!(
+ "{:?}: Walk stepped to an any-network address ({:?}). Will reset instead.",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr
+ );
+ return Err(net_error::NoSuchNeighbor);
+ }
+
+ let cur_addr = self.cur_neighbor.addr.clone();
+ self.new_frontier.clear();
+ self.result.clear();
+
+ let cur_pubkh = Hash160::from_node_public_key(&self.cur_neighbor.public_key);
+ if let Some(handle) = self.neighbor_session_begin(network, &cur_addr, &cur_pubkh)? {
+ debug!(
+ "{:?}: Handshake sent to {:?}",
+ network.get_local_peer(),
+ &cur_addr
+ );
+ self.ongoing_request = Some(handle);
+ self.set_state(network.get_local_peer(), NeighborWalkState::HandshakeFinish)?;
+ Ok(true)
+ } else {
+ debug!(
+ "{:?}: No Handshake sent (dest was {:?}); still connecting",
+ network.get_local_peer(),
+ &cur_addr,
+ );
+ Ok(false)
+ }
+ }
+
+ /// Handle a HandshakeAcceptData.
+ /// Update the PeerDB information from the handshake data, as well as `self.cur_neighbor`, if
+ /// this neighbor was routable. If it's not routable (i.e. we walked to an inbound neighbor),
+ /// then do not update the DB.
+ /// Add this neighbor to our newly-calculated frontier either way.
+ /// Returns the updated `self.cur_neighbor` on success.
+ /// Returns Err(..) if we failed to validate the request or we have a DB error.
+ fn handle_handshake_accept(
+ &mut self,
+ network: &mut PeerNetwork,
+ message: &StacksMessage,
+ data: &HandshakeAcceptData,
+ db_data: Option<&StackerDBHandshakeData>,
+ ) -> Result {
+ // accepted! can proceed to ask for neighbors
+ // save knowledge to the peer DB if it was outbound
+ // (NOTE: an outbound neighbor should already be in
+ // the DB, since it's cur_neighbor)
+ if self.walk_outbound {
+ // connected to a routable neighbor, so update its entry in the DB.
+ let local_peer_str = format!("{:?}", network.get_local_peer());
+
+ let tx = network.peerdb.tx_begin()?;
+ let mut neighbor_from_handshake = Neighbor::from_handshake(
+ &tx,
+ message.preamble.peer_version,
+ message.preamble.network_id,
+ &data.handshake,
+ )?;
+
+ // if the neighbor accidentally gave us a private IP address, then
+ // just use the one we used to contact it. This can happen if the
+ // node is behind a load-balancer, or is doing port-forwarding,
+ // etc.
+ if neighbor_from_handshake.addr.addrbytes.is_in_private_range() {
+ debug!(
+ "{}: outbound neighbor gave private IP address {:?}; assuming it meant {:?}",
+ local_peer_str, &neighbor_from_handshake.addr, &self.cur_neighbor.addr
+ );
+ neighbor_from_handshake.addr.addrbytes = self.cur_neighbor.addr.addrbytes.clone();
+ neighbor_from_handshake.addr.port = self.cur_neighbor.addr.port;
+ }
+
+ let res = if neighbor_from_handshake.addr != self.cur_neighbor.addr {
+ // somehow, got a handshake from someone that _isn't_ cur_neighbor
+ debug!("{}: got unsolicited (or bootstrapping) HandshakeAccept from outbound {:?} (expected {:?})",
+ local_peer_str,
+ &neighbor_from_handshake.addr,
+ &self.cur_neighbor.addr);
+
+ Err(net_error::PeerNotConnected)
+ } else {
+ // this is indeed cur_neighbor
+ self.cur_neighbor.handshake_update(&tx, &data.handshake)?;
+ self.cur_neighbor
+ .save_update(&tx, db_data.map(|x| x.smart_contracts.as_slice()))?;
+
+ debug!(
+ "{}: Connected with {:?}",
+ local_peer_str, &self.cur_neighbor.addr
+ );
+ self.new_frontier
+ .insert(self.cur_neighbor.addr.clone(), self.cur_neighbor.clone());
+
+ // advance state!
+ Ok(self.cur_neighbor.clone())
+ };
+ tx.commit()?;
+ self.neighbor_from_handshake = neighbor_from_handshake.addr;
+ res
+ } else {
+ // connected to an unroutable neighbor, so
+ // don't save to DB (but do update frontier)
+ let neighbor_from_handshake = Neighbor::from_handshake(
+ &network.peerdb.conn(),
+ message.preamble.peer_version,
+ message.preamble.network_id,
+ &data.handshake,
+ )?;
+ debug!(
+ "{:?}: Connected with inbound non-frontier neighbor {:?}: {:?}",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr,
+ &neighbor_from_handshake.addr
+ );
+ self.neighbor_from_handshake = neighbor_from_handshake.addr;
+
+ Ok(self.cur_neighbor.clone())
+ }
+ }
+
+ /// Finish handshaking with our current neighbor, thereby ensuring that it is connected
+ /// Returns true if we finished talking to the neighbor
+ /// Returns false if not
+ pub fn handshake_try_finish(&mut self, network: &mut PeerNetwork) -> Result {
+ assert!(self.state == NeighborWalkState::HandshakeFinish);
+
+ let message = match self.try_get_reply(network)? {
+ Some(msg) => msg,
+ None => {
+ // try again later
+ return Ok(false);
+ }
+ };
+
+ match message.payload {
+ StacksMessageType::HandshakeAccept(ref data) => {
+ debug!(
+ "{:?}: received HandshakeAccept from {} {:?}: {:?}",
+ network.get_local_peer(),
+ if self.walk_outbound {
+ "outbound"
+ } else {
+ "inbound"
+ },
+ &message.to_neighbor_key(&data.handshake.addrbytes, data.handshake.port),
+ &data.handshake
+ );
+
+ self.handle_handshake_accept(network, &message, data, None)?;
+
+ // proceed to ask this neighbor for its neighbors.
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::GetNeighborsBegin,
+ )?;
+ Ok(true)
+ }
+ StacksMessageType::StackerDBHandshakeAccept(ref data, ref db_data) => {
+ debug!(
+ "{:?}: received StackerDBHandshakeAccept from {} {:?}: {:?}, {:?}",
+ network.get_local_peer(),
+ if self.walk_outbound {
+ "outbound"
+ } else {
+ "inbound"
+ },
+ &message.to_neighbor_key(&data.handshake.addrbytes, data.handshake.port),
+ &data.handshake,
+ db_data
+ );
+
+ self.handle_handshake_accept(network, &message, data, Some(db_data))?;
+
+ // proceed to ask this neighbor for its neighbors.
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::GetNeighborsBegin,
+ )?;
+ Ok(true)
+ }
+ StacksMessageType::HandshakeReject => {
+ // told to bugger off
+ Err(net_error::PeerNotConnected)
+ }
+ StacksMessageType::Nack(_) => {
+ // something's wrong on our end (we're using a new key that they don't yet
+ // know about, or something)
+ Err(net_error::PeerNotConnected)
+ }
+ _ => {
+ // invalid message
+ debug!(
+ "{:?}: Got out-of-sequence message from {:?}",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr
+ );
+ self.add_broken(network, &self.cur_neighbor.addr.clone());
+ Err(net_error::InvalidMessage)
+ }
+ }
+ }
+
+ /// Begin refreshing our knowledge of peer in/out degrees.
+ /// Ask self.cur_neighbor for its neighbors
+ pub fn getneighbors_begin(&mut self, network: &mut PeerNetwork) -> Result {
+ assert!(self.state == NeighborWalkState::GetNeighborsBegin);
+
+ if self.ongoing_request.is_some() {
+ // already in-flight
+ return Ok(true);
+ }
+ debug!(
+ "{:?}: send GetNeighbors to {:?}",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr
+ );
+
+ let handle = Self::neighbor_send(
+ network,
+ &self.cur_neighbor.addr,
+ StacksMessageType::GetNeighbors,
+ )?;
+
+ self.ongoing_request = Some(handle);
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::GetNeighborsFinish,
+ )?;
+ Ok(true)
+ }
+
+ /// Try to finish the getneighbors request to cur_neighbor
+ /// Returns the list of neighbors we need to resolve
+ /// Return None if we're not done yet, or haven't started yet.
+ pub fn getneighbors_try_finish(
+ &mut self,
+ network: &mut PeerNetwork,
+ ) -> Result {
+ assert!(self.state == NeighborWalkState::GetNeighborsFinish);
+
+ let block_height = network.get_chain_view().burn_block_height;
+ let message = match self.try_get_reply(network)? {
+ Some(msg) => msg,
+ None => {
+ // try again later
+ return Ok(false);
+ }
+ };
+
+ let mut neighbor_addrs = match message.payload {
+ StacksMessageType::Neighbors(ref data) => {
+ debug!(
+ "{:?}: Got Neighbors from {:?}: {:?}",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr,
+ data.neighbors
+ );
+ let neighbors = NeighborWalk::filter_sensible_neighbors(data.neighbors.clone());
+ let (mut found, to_resolve) = NeighborWalk::lookup_stale_neighbors(
+ network.peerdb.conn(),
+ message.preamble.network_id,
+ block_height,
+ &neighbors,
+ )?;
+
+ // add neighbors we already know about to the frontier of `cur_neighbor`
+ for (_naddr, neighbor) in found.drain() {
+ self.new_frontier
+ .insert(neighbor.addr.clone(), neighbor.clone());
+ self.frontier
+ .insert(neighbor.addr.clone(), neighbor.clone());
+ }
+
+ to_resolve
+ }
+ StacksMessageType::Nack(ref data) => {
+ debug!(
+ "{:?}: Neighbor {:?} NACK'ed GetNeighbors with code {:?}",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr,
+ data.error_code
+ );
+ self.add_broken(network, &self.cur_neighbor.addr.clone());
+ return Err(net_error::ConnectionBroken);
+ }
+ _ => {
+ // invalid message
+ debug!(
+ "{:?}: Got out-of-sequence message from {:?}",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr
+ );
+ self.add_broken(network, &self.cur_neighbor.addr.clone());
+ return Err(net_error::InvalidMessage);
+ }
+ };
+
+ // proceed to handshake with them.
+ // If this is an inbound neighbor, then try also to handshake with its advertized
+ // IP address.
+ if !self.walk_outbound {
+ test_debug!("{:?}: will try to handshake with inbound neighbor {:?}'s advertized address {:?} as well", network.get_local_peer(), &self.cur_neighbor.addr, &self.neighbor_from_handshake);
+ let cur_neighbor_pubkey_hash =
+ Hash160::from_node_public_key(&self.cur_neighbor.public_key);
+ neighbor_addrs.push(NeighborAddress::from_neighbor_key(
+ self.neighbor_from_handshake.clone(),
+ cur_neighbor_pubkey_hash,
+ ));
+ }
+
+ // prune the list to a reasonable size in case cur_neighbor gave us too many for our
+ // configuration
+ if neighbor_addrs.len() as u64 > network.get_connection_opts().max_neighbors_of_neighbor {
+ debug!(
+ "{:?}: will handshake with {} neighbors out of {} reported by {:?}",
+ network.get_local_peer(),
+ &network.get_connection_opts().max_neighbors_of_neighbor,
+ neighbor_addrs.len(),
+ &self.cur_neighbor.addr
+ );
+ neighbor_addrs.shuffle(&mut thread_rng());
+ neighbor_addrs
+ .truncate(network.get_connection_opts().max_neighbors_of_neighbor as usize);
+ }
+
+ test_debug!(
+ "{:?}: received Neighbors from {} {:?}: {:?}",
+ network.get_local_peer(),
+ if self.walk_outbound {
+ "outbound"
+ } else {
+ "inbound"
+ },
+ &self.cur_neighbor.addr,
+ &neighbor_addrs
+ );
+
+ // now go and try and connect to these neighbors
+ self.pending_neighbor_addrs = Some(neighbor_addrs);
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::GetHandshakesBegin,
+ )?;
+ Ok(true)
+ }
+
+ /// Begin getting the neighors of cur_neighbor's neighbors.
+ /// ReplyHandleP2Ps should be reply handles for Handshake requests.
+ pub fn neighbor_handshakes_begin(
+ &mut self,
+ network: &mut PeerNetwork,
+ ) -> Result {
+ assert!(self.state == NeighborWalkState::GetHandshakesBegin);
+
+ let my_pubkey_hash = Hash160::from_node_public_key(&Secp256k1PublicKey::from_private(
+ &network.get_local_peer().private_key,
+ ));
+ debug!(
+ "{:?}: my public key hash is {}",
+ network.get_local_peer(),
+ &my_pubkey_hash
+ );
+
+ let pending_neighbor_addrs = self
+ .pending_neighbor_addrs
+ .take()
+ .expect("FATAL: no result from GetNeighbors");
+
+ // got neighbors -- proceed to ask each one for *its* neighbors so we can
+ // estimate cur_neighbor's in-degree and grow our frontier.
+ debug!(
+ "{:?}: will try to connect to {} neighbors of {:?}",
+ network.get_local_peer(),
+ pending_neighbor_addrs.len(),
+ &self.cur_neighbor.addr
+ );
+
+ let mut still_pending = vec![];
+ for na in pending_neighbor_addrs.into_iter() {
+ // don't talk to myself if we're listed as a neighbor of this
+ // remote peer.
+ if na.public_key_hash == my_pubkey_hash {
+ test_debug!(
+ "{:?}: skip handshaking with myself",
+ network.get_local_peer()
+ );
+ continue;
+ }
+
+ // don't handshake with cur_neighbor if we already know its public IP
+ // address (we may not know this if the neighbor is inbound)
+ if na.addrbytes == self.cur_neighbor.addr.addrbytes
+ && na.port == self.cur_neighbor.addr.port
+ {
+ test_debug!(
+ "{:?}: skip handshaking with cur_neighbor {:?}",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr
+ );
+ continue;
+ }
+
+ let nk = na.to_neighbor_key(network);
+
+ // don't talk to a neighbor if it's unroutable anyway
+ if network.is_bound(&nk) || nk.addrbytes.is_anynet() {
+ test_debug!(
+ "{:?}: will not connect to bind / anynet address {:?}",
+ network.get_local_peer(),
+ &nk
+ );
+ continue;
+ }
+
+ // start a session with this neighbor
+ match self.neighbor_session_begin(network, &nk, &na.public_key_hash) {
+ Ok(Some(handle)) => {
+ debug!(
+ "{:?}: will Handshake with neighbor-of-neighbor {:?} ({})",
+ network.get_local_peer(),
+ &nk,
+ &na.public_key_hash
+ );
+ self.add_batch_request(na, handle);
+ }
+ Ok(None) => {
+ test_debug!(
+ "{:?}: already connecting to {:?}",
+ network.get_local_peer(),
+ &nk
+ );
+ still_pending.push(na);
+ continue;
+ }
+ Err(e) => {
+ debug!(
+ "{:?}: Failed to connect to {:?}: {:?}",
+ network.get_local_peer(),
+ &nk,
+ &e
+ );
+ continue;
+ }
+ }
+ }
+
+ if still_pending.len() > 0 {
+ // try again
+ self.pending_neighbor_addrs = Some(still_pending);
+ return Ok(false);
+ }
+
+ // everybody connected! next state
+ test_debug!(
+ "{:?}: connected to {} neighbors-of-neighbors of {:?}",
+ network.get_local_peer(),
+ self.count_inflight(),
+ &self.cur_neighbor.addr
+ );
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::GetHandshakesFinish,
+ )?;
+ Ok(true)
+ }
+
+ /// Handle a handshake accept from a neighbor as part of our neighbor-handshake step
+ fn handle_neighbor_handshake_accept(
+ &mut self,
+ network: &mut PeerNetwork,
+ block_height: u64,
+ naddr: NeighborAddress,
+ message: &StacksMessage,
+ data: &HandshakeAcceptData,
+ db_data: Option<&StackerDBHandshakeData>,
+ ) -> Result<(), net_error> {
+ // NOTE: even if cur_neighbor is an inbound neighbor, the neighbors
+ // of cur_neighbor that we could handshake with are necessarily
+ // outbound connections. So, save them all.
+ // Do we know about this peer already?
+ let mut tx = network.peerdb.tx_begin()?;
+ let (new, neighbor) = self.add_or_schedule_replace_neighbor(
+ &mut tx,
+ block_height,
+ &naddr,
+ message.preamble.peer_version,
+ message.preamble.network_id,
+ &data.handshake,
+ db_data,
+ )?;
+ if new {
+ // neighbor was new
+ self.new_frontier
+ .insert(neighbor.addr.clone(), neighbor.clone());
+ } else {
+ // frontier maintenance
+ self.frontier
+ .insert(neighbor.addr.clone(), neighbor.clone());
+ }
+
+ self.resolved_handshake_neighbors.insert(naddr, neighbor);
+ tx.commit()?;
+ Ok(())
+ }
+
+ /// Try to finish getting handshakes from cur_neighbors' neighbors.
+ /// As a side-effect of handshaking with all these peers, our PeerDB instance will be expanded
+ /// with the addresses, public keys, public key expiries of these neighbors -- i.e. this method grows
+ /// our frontier.
+ /// Returns Ok(true) if all outstanding requests completed.
+ /// Returns Ok(false) if there are still pending requests
+ /// Returns Err(..) on DB errors
+ pub fn neighbor_handshakes_try_finish(
+ &mut self,
+ network: &mut PeerNetwork,
+ ) -> Result {
+ assert!(self.state == NeighborWalkState::GetHandshakesFinish);
+
+ // filter queries for existing neighbors by the current burnchain block height
+ let block_height = network.get_chain_view().burn_block_height;
+
+ // see if we got any replies
+ test_debug!(
+ "{:?}: Try to finish {} in-flight handshakes with neighbors-of-neighbor {:?}",
+ network.get_local_peer(),
+ self.count_inflight(),
+ &self.cur_neighbor.addr
+ );
+
+ for (naddr, message) in self.collect_replies(network).into_iter() {
+ let nkey = naddr.to_neighbor_key(network);
+ match message.payload {
+ StacksMessageType::HandshakeAccept(ref data) => {
+ debug!(
+ "{:?}: Got HandshakeAccept from {:?}",
+ network.get_local_peer(),
+ &nkey
+ );
+
+ self.handle_neighbor_handshake_accept(
+ network,
+ block_height,
+ naddr,
+ &message,
+ data,
+ None,
+ )?;
+ }
+ StacksMessageType::StackerDBHandshakeAccept(ref data, ref db_data) => {
+ debug!(
+ "{:?}: Got StackerDBHandshakeAccept from {:?}: {:?}",
+ network.get_local_peer(),
+ &nkey,
+ db_data
+ );
+
+ self.handle_neighbor_handshake_accept(
+ network,
+ block_height,
+ naddr,
+ &message,
+ data,
+ Some(db_data),
+ )?;
+ }
+ StacksMessageType::HandshakeReject => {
+ // remote peer doesn't want to talk to us
+ debug!(
+ "{:?}: Neighbor {:?} rejected our handshake",
+ network.get_local_peer(),
+ &nkey
+ );
+ self.add_dead(
+ network,
+ &NeighborKey::from_neighbor_address(
+ message.preamble.peer_version,
+ message.preamble.network_id,
+ &naddr,
+ ),
+ );
+ }
+ StacksMessageType::Nack(ref data) => {
+ // remote peer nope'd us
+ debug!(
+ "{:?}: Neighbor {:?} NACK'ed our handshake with error code {:?}",
+ network.get_local_peer(),
+ &nkey,
+ data.error_code
+ );
+ self.add_dead(
+ network,
+ &NeighborKey::from_neighbor_address(
+ message.preamble.peer_version,
+ message.preamble.network_id,
+ &naddr,
+ ),
+ );
+ }
+ _ => {
+ // protocol violation
+ debug!(
+ "{:?}: Neighbor {:?} replied an out-of-sequence message",
+ network.get_local_peer(),
+ &naddr
+ );
+ self.add_broken(
+ network,
+ &NeighborKey::from_neighbor_address(
+ message.preamble.peer_version,
+ message.preamble.network_id,
+ &naddr,
+ ),
+ );
+ }
+ }
+ }
+
+ if self.count_inflight() > 0 {
+ // still handshaking
+ return Ok(false);
+ }
+
+ // finished handshaking! find neighbors that accepted
+ let mut neighbor_addrs = vec![];
+
+ // update our frontier knowledge
+ for (nkey, new_neighbor) in self.new_frontier.drain() {
+ debug!(
+ "{:?}: Add to frontier of {:?}: {:?}",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr,
+ &nkey
+ );
+
+ if nkey.addrbytes != self.cur_neighbor.addr.addrbytes
+ || nkey.port != self.cur_neighbor.addr.port
+ {
+ neighbor_addrs.push(NeighborAddress::from_neighbor(&new_neighbor));
+ }
+
+ self.frontier.insert(nkey.clone(), new_neighbor);
+ }
+
+ self.new_frontier.clear();
+
+ self.handshake_neighbor_addrs.clear();
+ self.handshake_neighbor_addrs.append(&mut neighbor_addrs);
+
+ // advance state!
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::GetNeighborsNeighborsBegin,
+ )?;
+ Ok(true)
+ }
+
+ /// Begin asking remote neighbors for their neighbors in order to estimate cur_neighbor's
+ /// in-degree. We should be connected to all of them, so don't worry about establishing
+ /// connections to them.
+ pub fn getneighbors_neighbors_begin(
+ &mut self,
+ network: &mut PeerNetwork,
+ ) -> Result {
+ assert!(self.state == NeighborWalkState::GetNeighborsNeighborsBegin);
+
+ let handshake_neighbor_addrs = mem::replace(&mut self.handshake_neighbor_addrs, vec![]);
+ for naddr in handshake_neighbor_addrs.into_iter() {
+ let nk = naddr.to_neighbor_key(network);
+ if !network.is_registered(&nk) {
+ // not connected to this neighbor -- can't ask for neighbors
+ debug!("{:?}: Not connected to {:?}", network.get_local_peer(), &nk);
+ continue;
+ }
+ debug!(
+ "{:?}: send GetNeighbors to {:?}",
+ network.get_local_peer(),
+ &nk
+ );
+ match NeighborWalk::neighbor_send(network, &nk, StacksMessageType::GetNeighbors) {
+ Ok(rh) => {
+ self.add_batch_request(naddr, rh);
+ }
+ Err(e) => {
+ debug!(
+ "{:?}: Could not send to {:?}: {:?}",
+ network.get_local_peer(),
+ &nk,
+ &e
+ );
+ continue;
+ }
+ }
+ }
+
+ // advance state!
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::GetNeighborsNeighborsFinish,
+ )?;
+ Ok(true)
+ }
+
+ /// Try to finish getting the neighbors from cur_neighbors' neighbors.
+ /// Once finished, update `cur_neighbor` and `prev_neighbor` to walk to the next random neighbor based
+ /// on what we have discovered, and if this neighbor we were considering was an outbound
+ /// neighbor, then also update its in/out-degree estimates in the peers DB.
+ /// Returns Ok(true) if we're done
+ /// Returns Ok(false) if we're still waiting
+ /// Returns Err(..) on irrecoverable error
+ pub fn getneighbors_neighbors_try_finish(
+ &mut self,
+ network: &mut PeerNetwork,
+ ) -> Result {
+ assert!(self.state == NeighborWalkState::GetNeighborsNeighborsFinish);
+
+ // see if we got any replies
+ for (naddr, message) in self.collect_replies(network).into_iter() {
+ let nkey = naddr.to_neighbor_key(network);
+ match message.payload {
+ StacksMessageType::Neighbors(ref data) => {
+ debug!(
+ "{:?}: Got Neighbors from {:?}: {:?}",
+ network.get_local_peer(),
+ &nkey,
+ &data.neighbors
+ );
+ let neighbors = NeighborWalk::filter_sensible_neighbors(data.neighbors.clone());
+ self.resolved_getneighbors_neighbors
+ .insert(naddr, neighbors);
+ }
+ StacksMessageType::Nack(ref data) => {
+ // not broken; likely because it hasn't gotten to processing our
+ // handshake yet. We'll just ignore it.
+ debug!(
+ "{:?}: Neighbor {:?} NACKed with code {:?}",
+ network.get_local_peer(),
+ &nkey,
+ data.error_code
+ );
+ }
+ _ => {
+ // unexpected reply
+ debug!("{:?}: Neighbor {:?} replied an out-of-sequence message (type {}); assuming broken", network.get_local_peer(), &nkey, message.get_message_name());
+ self.add_broken(network, &nkey);
+ }
+ }
+ }
+
+ if self.count_inflight() > 0 {
+ // not done yet
+ debug!(
+ "{:?}: still waiting for {} Neighbors replies",
+ network.get_local_peer(),
+ self.count_inflight()
+ );
+ return Ok(false);
+ }
+
+ // finished! build up frontier's in-degree estimation, plus ourselves
+ self.cur_neighbor.in_degree = 1;
+ self.cur_neighbor.out_degree = self.frontier.len() as u32;
+
+ for (_, neighbor_list) in self.resolved_getneighbors_neighbors.iter() {
+ for na in neighbor_list {
+ if na.addrbytes == self.cur_neighbor.addr.addrbytes
+ && na.port == self.cur_neighbor.addr.port
+ {
+ self.cur_neighbor.in_degree += 1;
+ }
+ }
+ }
+
+ // only save if the neighbor is routable from us
+ if self.walk_outbound {
+ // remember this peer's in/out degree estimates
+ debug!(
+ "{:?}: In/Out degree of current neighbor {:?} is {}/{}",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr,
+ self.cur_neighbor.in_degree,
+ self.cur_neighbor.out_degree
+ );
+
+ let mut tx = network.peerdb.tx_begin()?;
+ self.cur_neighbor.save_update(&mut tx, None)?;
+ tx.commit()?;
+ }
+
+ // perform the MHRWDA step to update the cur_neighbor cursor to potentially point to a
+ // new neighbor, so we can do this all again!
+ self.step(network);
+
+ // advance state
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::PingbackHandshakesBegin,
+ )?;
+ Ok(true)
+ }
+
+ /// Pick a random neighbor from a given list of neighbors, excluding an optional given neighbor
+ fn pick_random_neighbor(
+ frontier: &HashMap,
+ exclude: Option<&Neighbor>,
+ ) -> Option {
+ let mut rnd = thread_rng();
+
+ let sample = rnd.gen_range(0, frontier.len());
+ let mut count = 0;
+
+ for (nk, n) in frontier.iter() {
+ count += match exclude {
+ None => 1,
+ Some(ref e) => {
+ if (*e).addr == *nk {
+ 0
+ } else {
+ 1
+ }
+ }
+ };
+ if count >= sample {
+ return Some(n.clone());
+ }
+ }
+ return None;
+ }
+
+ /// Calculate the "degree ratio" between two neighbors, used to determine the probability of
+ /// stepping to a neighbor in MHRWDA. We estimate each neighbor's undirected degree, and then
+ /// measure how represented each neighbor's AS is in the peer graph. We *bias* the sample so
+ /// that peers in under-represented ASs are more likely to be walked to than they otherwise
+ /// would be if considering only neighbor degrees.
+ fn degree_ratio(peerdb_conn: &DBConn, n1: &Neighbor, n2: &Neighbor) -> f64 {
+ let d1 = n1.degree() as f64;
+ let d2 = n2.degree() as f64;
+ let as_d1 = PeerDB::asn_count(peerdb_conn, n1.asn).unwrap_or(1) as f64;
+ let as_d2 = PeerDB::asn_count(peerdb_conn, n2.asn).unwrap_or(1) as f64;
+ (d1 * as_d2) / (d2 * as_d1)
+ }
+
+ /// Do the MHRWDA step -- try to step from our cur_neighbor to an immediate neighbor, if there
+ /// is any neighbor to step to. Return the new cur_neighbor, if we were able to step.
+ /// The caller should call reset() after this, optionally with a newly-selected frontier
+ /// neighbor if we were unable to take a step.
+ ///
+ /// This is a slightly modified MHRWDA algorithm. The following differences are described:
+ /// * The Stacks peer network is a _directed_ graph, whereas MHRWDA is desigend to operate
+ /// on _undirected_ graphs. As such, we calculate a separate peer graph with undirected edges
+ /// with the same peers. We estimate a peer's undirected degree with Neighbor::degree().
+ /// * The probability of transitioning to a new peer is proportional not only to the ratio of
+ /// the current peer's degree to the new peer's degree, but also to the ratio of the new
+ /// peer's AS's node count to the current peer's AS's node count.
+ ///
+ /// This method updates self.next_neighbor with a new neighbor to step to, or None to restart.
+ pub fn step(&mut self, network: &PeerNetwork) {
+ let peerdb_conn = network.peerdb.conn();
+ test_debug!(
+ "{:?}: execute neighbor step from {:?}",
+ network.get_local_peer(),
+ &self.cur_neighbor.addr
+ );
+
+ let mut rnd = thread_rng();
+
+ // step to a node in cur_neighbor's frontier, per MHRWDA
+ let next_neighbor_opt = if self.frontier.len() == 0 {
+ // just started the walk
+ if self.walk_outbound {
+ // outbound neighbor, so stay here for now -- we don't yet know this neighbor's
+ // frontier
+ Some(self.cur_neighbor.clone())
+ } else {
+ // inbound; reset
+ None
+ }
+ } else {
+ // continuing the walk
+ let next_neighbor = NeighborWalk::pick_random_neighbor(&self.frontier, None)
+ .expect("BUG: empty frontier size"); // won't panic since self.frontier.len() > 0
+ let walk_prob: f64 = rnd.gen();
+ if walk_prob
+ < fmin!(
+ 1.0,
+ NeighborWalk::degree_ratio(peerdb_conn, &self.cur_neighbor, &next_neighbor)
+ )
+ {
+ // won the coin toss; will take a step.
+ // take care not to step back to the neighbor from which we
+ // stepped previously
+ if let Some(ref prev_neighbor) = self.prev_neighbor.as_ref() {
+ if prev_neighbor.addr == next_neighbor.addr {
+ // oops, backtracked. Try to pick a different neighbor, if possible.
+ if self.frontier.len() == 1 {
+ // no other choices. will need to reset this walk.
+ None
+ } else {
+ // have alternative choices, so instead of backtracking, we'll delay
+ // acceptance by probabilistically deciding to step to an alternative
+ // instead of backtracking.
+ let alt_next_neighbor = NeighborWalk::pick_random_neighbor(
+ &self.frontier,
+ Some(&prev_neighbor),
+ )
+ .expect("BUG: empty frontier size");
+ let alt_prob: f64 = rnd.gen();
+
+ let cur_to_alt = NeighborWalk::degree_ratio(
+ peerdb_conn,
+ &self.cur_neighbor,
+ &alt_next_neighbor,
+ );
+ let prev_to_cur = NeighborWalk::degree_ratio(
+ peerdb_conn,
+ &prev_neighbor,
+ &self.cur_neighbor,
+ );
+ let trans_prob = fmin!(
+ fmin!(1.0, cur_to_alt * cur_to_alt),
+ fmax!(1.0, prev_to_cur * prev_to_cur)
+ );
+
+ if alt_prob < fmin!(1.0, trans_prob) {
+ // go to alt peer instead
+ Some(alt_next_neighbor)
+ } else {
+ // backtrack.
+ Some(next_neighbor)
+ }
+ }
+ } else {
+ // not backtracking. Take a step.
+ Some(next_neighbor)
+ }
+ } else {
+ // not backtracking. Take a step.
+ Some(next_neighbor)
+ }
+ } else {
+ // lost the coin toss. will not take a step
+ Some(self.cur_neighbor.clone())
+ }
+ };
+
+ if let Some(ref neighbor) = next_neighbor_opt {
+ debug!(
+ "{:?}: Walk steps to {:?}",
+ network.get_local_peer(),
+ &neighbor.addr
+ );
+ } else {
+ debug!(
+ "{:?}: Walk will not step to a new neighbor",
+ network.get_local_peer()
+ );
+ }
+
+ self.next_neighbor = next_neighbor_opt;
+ if let Some(ref next_neighbor) = self.next_neighbor {
+ if *next_neighbor == self.cur_neighbor {
+ self.next_walk_outbound = self.walk_outbound;
+ } else {
+ // can only step to outbound neighbors
+ self.next_walk_outbound = true;
+ }
+ }
+ }
+
+ /// Start to connect to newly-discovered inbound peers
+ pub fn pingback_handshakes_begin(
+ &mut self,
+ network: &mut PeerNetwork,
+ ) -> Result {
+ // caller will have already populated the pending_pingback_handshakes hashmap
+ assert!(self.state == NeighborWalkState::PingbackHandshakesBegin);
+
+ let network_pingbacks = mem::replace(&mut self.network_pingbacks, HashMap::new());
+ let mut still_pending: HashMap = HashMap::new();
+
+ for (naddr, pingback) in network_pingbacks.into_iter() {
+ // pingback hint is stale? (or we tried to connect and timed out?)
+ if pingback.ts + network.get_connection_opts().pingback_timeout < get_epoch_time_secs()
+ {
+ continue;
+ }
+
+ let nk = NeighborKey::from_neighbor_address(
+ pingback.peer_version,
+ pingback.network_id,
+ &naddr,
+ );
+
+ // start a session with this neighbor
+ match self.neighbor_session_begin(network, &nk, &naddr.public_key_hash) {
+ Ok(Some(handle)) => {
+ debug!(
+ "{:?}: Sent pingback handshake to {:?}",
+ network.get_local_peer(),
+ &nk
+ );
+ self.add_batch_request(naddr, handle);
+ }
+ Ok(None) => {
+ debug!(
+ "{:?}: No pingback handshake sent to {:?}; still connecting",
+ network.get_local_peer(),
+ &nk
+ );
+
+ // try again
+ still_pending.insert(naddr, pingback);
+ continue;
+ }
+ Err(e) => {
+ debug!(
+ "{:?}: Failed to connect to pingback {:?}: {:?}",
+ network.get_local_peer(),
+ &nk,
+ &e
+ );
+ continue;
+ }
+ }
+ }
+
+ self.network_pingbacks.clear();
+ self.network_pingbacks.extend(still_pending);
+
+ if self.network_pingbacks.len() > 0 {
+ // still connecting
+ debug!(
+ "{:?}: Still trying to pingback-handshake with {} neighbors",
+ network.get_local_peer(),
+ self.network_pingbacks.len()
+ );
+ return Ok(false);
+ }
+
+ // good to go!
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::PingbackHandshakesFinish,
+ )?;
+ Ok(true)
+ }
+
+ /// Does a given handshakedata represent an expected public key hash?
+ fn check_handshake_pubkey_hash(
+ nk: &NeighborKey,
+ data: &HandshakeAcceptData,
+ naddr: &NeighborAddress,
+ ) -> bool {
+ let neighbor_pubkey_hash =
+ Hash160::from_node_public_key_buffer(&data.handshake.node_public_key);
+ if neighbor_pubkey_hash != naddr.public_key_hash {
+ debug!(
+ "Neighbor {:?} had an unexpected pubkey hash: expected {:?} != {:?}",
+ nk, &naddr.public_key_hash, &neighbor_pubkey_hash
+ );
+ return false;
+ }
+
+ true
+ }
+
+ /// Finish up connecting to newly-discovered inbound peers
+ pub fn pingback_handshakes_try_finish(
+ &mut self,
+ network: &mut PeerNetwork,
+ ) -> Result {
+ assert!(self.state == NeighborWalkState::PingbackHandshakesFinish);
+
+ // filter neighbors by currnet observed burn block height
+ let block_height = network.get_chain_view().burn_block_height;
+
+ // see if we got any replies
+ for (naddr, message) in self.collect_replies(network).into_iter() {
+ // if we got back a HandshakeAccept, and it's on the same chain as us, we're good!
+ match message.payload {
+ StacksMessageType::HandshakeAccept(ref data) => {
+ debug!("{:?}: received HandshakeAccept from peer {:?}; now known to be routable from us", network.get_local_peer(), &message.to_neighbor_key(&data.handshake.addrbytes, data.handshake.port));
+
+ let peer_nk =
+ message.to_neighbor_key(&data.handshake.addrbytes, data.handshake.port);
+ if !Self::check_handshake_pubkey_hash(&peer_nk, data, &naddr) {
+ continue;
+ }
+
+ let mut tx = network.peerdb.tx_begin()?;
+ self.add_or_schedule_replace_neighbor(
+ &mut tx,
+ block_height,
+ &naddr,
+ message.preamble.peer_version,
+ message.preamble.network_id,
+ &data.handshake,
+ None,
+ )?;
+ tx.commit()?;
+ }
+ StacksMessageType::StackerDBHandshakeAccept(ref data, ref db_data) => {
+ debug!("{:?}: received StackerDBHandshakeAccept from peer {:?}; now known to be routable from us", network.get_local_peer(), &message.to_neighbor_key(&data.handshake.addrbytes, data.handshake.port));
+
+ let peer_nk =
+ message.to_neighbor_key(&data.handshake.addrbytes, data.handshake.port);
+ if !Self::check_handshake_pubkey_hash(&peer_nk, data, &naddr) {
+ continue;
+ }
+
+ let mut tx = network.peerdb.tx_begin()?;
+ self.add_or_schedule_replace_neighbor(
+ &mut tx,
+ block_height,
+ &naddr,
+ message.preamble.peer_version,
+ message.preamble.network_id,
+ &data.handshake,
+ Some(db_data),
+ )?;
+ tx.commit()?;
+ }
+ _ => {
+ let nkey = naddr.to_neighbor_key(network);
+ debug!(
+ "{:?}: Neighbor {:?} replied {:?} instead of pingback handshake",
+ network.get_local_peer(),
+ &nkey,
+ &message.get_message_name()
+ );
+ }
+ }
+ }
+
+ if self.count_inflight() > 0 {
+ debug!(
+ "{:?}: Still waiting for pingback-handshake response from {} neighbors",
+ network.get_local_peer(),
+ self.count_inflight()
+ );
+ return Ok(false);
+ }
+
+ // done!
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::ReplacedNeighborsPingBegin,
+ )?;
+ Ok(true)
+ }
+
+ /// Ping existing neighbors that would be replaced by the discovery of new neighbors (i.e.
+ /// through getting the neighbors of our neighbor, or though pingbacks)
+ pub fn ping_existing_neighbors_begin(
+ &mut self,
+ network: &mut PeerNetwork,
+ ) -> Result {
+ assert!(self.state == NeighborWalkState::ReplacedNeighborsPingBegin);
+
+ let replaced_neighbors = mem::replace(&mut self.replaced_neighbors, HashMap::new());
+ for (naddr, _slot) in replaced_neighbors.into_iter() {
+ let nk = naddr.to_neighbor_key(network);
+ test_debug!(
+ "{:?}: send Handshake to replaceable neighbor {:?}",
+ network.get_local_peer(),
+ nk
+ );
+
+ match NeighborWalk::neighbor_send(
+ network,
+ &nk,
+ StacksMessageType::Handshake(HandshakeData::from_local_peer(
+ network.get_local_peer(),
+ )),
+ ) {
+ Ok(handle) => {
+ self.add_batch_request(naddr, handle);
+ }
+ Err(e) => {
+ debug!(
+ "{:?}: Not connected to {:?}: ({:?}",
+ network.get_local_peer(),
+ &nk,
+ &e
+ );
+ }
+ }
+ }
+
+ // advance state!
+ self.set_state(
+ network.get_local_peer(),
+ NeighborWalkState::ReplacedNeighborsPingFinish,
+ )?;
+ Ok(true)
+ }
+
+ /// Handle a handshake accept for a pinged neighbor.
+ /// If it was a StackerDBHandshakeAccept, then also handle the newly-announced DBs
+ fn handle_handshake_accept_from_ping(
+ &mut self,
+ network: &mut PeerNetwork,
+ message: &StacksMessage,
+ data: &HandshakeAcceptData,
+ db_data: Option<&StackerDBHandshakeData>,
+ ) -> Result<(), net_error> {
+ let mut tx = network.peerdb.tx_begin()?;
+ let mut neighbor_from_handshake = Neighbor::from_handshake(
+ &mut tx,
+ message.preamble.peer_version,
+ message.preamble.network_id,
+ &data.handshake,
+ )?;
+ neighbor_from_handshake
+ .save_update(&mut tx, db_data.map(|x| x.smart_contracts.as_slice()))?;
+ tx.commit()?;
+
+ let naddr = NeighborAddress::from_neighbor(&neighbor_from_handshake);
+
+ // not going to replace
+ if self.replaced_neighbors.contains_key(&naddr) {
+ test_debug!(
+ "{:?}: will NOT replace {:?}",
+ network.get_local_peer(),
+ &neighbor_from_handshake.addr
+ );
+ self.replaced_neighbors.remove(&naddr);
+ }
+
+ Ok(())
+ }
+
+ /// try to finish pinging/handshaking all exisitng neighbors.
+ /// if the remote neighbor does _not_ respond to our ping, then replace it.
+ ///
+ /// This is the final step in the state-machine. It returns the walk result.
+ ///
+ /// Returns Ok(Some(walk_result)) if the task is completed.
+ /// Returns Ok(None) if we're still waiting for network replies
+ /// Returns Err(..) on unrecoverable error
+ pub fn ping_existing_neighbors_try_finish(
+ &mut self,
+ network: &mut PeerNetwork,
+ ) -> Result, net_error> {
+ assert!(self.state == NeighborWalkState::ReplacedNeighborsPingFinish);
+
+ for (nkey, message) in self.collect_replies(network).into_iter() {
+ match message.payload {
+ StacksMessageType::HandshakeAccept(ref data) => {
+ // this peer is still alive -- will not replace it
+ // save knowledge to the peer DB (NOTE: the neighbor should already be in
+ // the DB, since it's cur_neighbor)
+ test_debug!(
+ "{:?}: received HandshakeAccept from {:?}",
+ network.get_local_peer(),
+ &message.to_neighbor_key(&data.handshake.addrbytes, data.handshake.port)
+ );
+ self.handle_handshake_accept_from_ping(network, &message, data, None)?;
+ }
+ StacksMessageType::StackerDBHandshakeAccept(ref data, ref db_data) => {
+ // this peer is still alive -- will not replace it
+ // save knowledge to the peer DB (NOTE: the neighbor should already be in
+ // the DB, since it's cur_neighbor)
+ test_debug!(
+ "{:?}: received StackerDBHandshakeAccept from {:?}: {:?}, {:?}",
+ network.get_local_peer(),
+ &message.to_neighbor_key(&data.handshake.addrbytes, data.handshake.port),
+ data,
+ db_data
+ );
+ self.handle_handshake_accept_from_ping(network, &message, data, Some(db_data))?;
+ }
+ StacksMessageType::Nack(ref data) => {
+ // evict
+ debug!(
+ "{:?}: Neighbor {:?} NACK'ed Handshake with code {:?}; will evict",
+ network.get_local_peer(),
+ nkey,
+ data.error_code
+ );
+ self.add_broken(network, &nkey);
+ }
+ _ => {
+ // unexpected reply -- this peer is misbehaving and should be replaced
+ debug!("{:?}: Neighbor {:?} replied an out-of-sequence message (type {}); will replace", network.get_local_peer(), &nkey, message.get_message_name());
+ self.add_broken(network, &nkey);
+ }
+ }
+ }
+
+ if self.count_inflight() > 0 {
+ // still pending
+ return Ok(None);
+ }
+
+ // done getting pings. do our replacements
+ let network_id = network.get_local_peer().network_id;
+ let local_peer_str = format!("{:?}", network.get_local_peer());
+
+ let mut tx = network.peerdb.tx_begin()?;
+ for (replaceable_naddr, slot) in self.replaced_neighbors.iter() {
+ let replacement = match self.neighbor_replacements.get(replaceable_naddr) {
+ Some(n) => n.clone(),
+ None => {
+ continue;
+ }
+ };
+
+ let replaced_opt = PeerDB::get_peer_at(&mut tx, network_id, *slot)?;
+ match replaced_opt {
+ Some(replaced) => {
+ if PeerDB::is_address_denied(&mut tx, &replacement.addr.addrbytes)? {
+ debug!(
+ "{}: Will not replace {:?} with {:?} -- is denied",
+ local_peer_str, &replaced.addr, &replacement.addr
+ );
+ } else {
+ debug!(
+ "{}: Replace {:?} with {:?}",
+ local_peer_str, &replaced.addr, &replacement.addr
+ );
+
+ PeerDB::insert_or_replace_peer(&mut tx, &replacement, *slot)?;
+ self.result.add_replaced(replaced.addr.clone());
+ }
+ }
+ None => {}
+ }
+ }
+ tx.commit()?;
+
+ // done talking to these neighbors
+ self.events.clear();
+
+ // advance state!
+ self.set_state(network.get_local_peer(), NeighborWalkState::Finished)?;
+
+ // calculate the walk result
+ if let Some(next_neighbor) = self.next_neighbor.take() {
+ // stepped! return the result
+ return Ok(Some(self.reset(
+ network.get_local_peer(),
+ next_neighbor,
+ false,
+ )));
+ }
+
+ // didn't step
+ // need to select a random new neighbor (will be outbound)
+ // NOTE: this will fail if this peer only has inbound neighbors,
+ // and force the walk to restart.
+ let next_neighbor = Self::get_first_walk_neighbor(network)?;
+
+ test_debug!(
+ "{:?}: Did not step to any neighbor; resetting walk to {:?}",
+ network.get_local_peer(),
+ &next_neighbor.addr
+ );
+ Ok(Some(self.reset(
+ network.get_local_peer(),
+ next_neighbor,
+ true,
+ )))
+ }
+
+ /// Top-level state transition.
+ /// Returns Some(walk result) when the state machine completes
+ /// Returns None if there's still more work to do
+ /// Returns Err(..) if the walk failed and ought to be terminated
+ pub fn run(
+ &mut self,
+ network: &mut PeerNetwork,
+ ) -> Result , net_error> {
+ // synchronize local peer state, in case we learn e.g. the public IP address in the mean
+ // time
+ let mut can_continue = true;
+ while can_continue {
+ // a walk times out if it stays in one state for too long
+ if self.walk_state_time + self.walk_state_timeout < get_epoch_time_secs() {
+ debug!(
+ "{:?}: walk has timed out: stayed in state {:?} for more than {} seconds",
+ network.get_local_peer(),
+ &self.state,
+ self.walk_state_timeout
+ );
+ return Ok(None);
+ }
+
+ can_continue = match self.state {
+ NeighborWalkState::HandshakeBegin => self.handshake_begin(network)?,
+ NeighborWalkState::HandshakeFinish => self.handshake_try_finish(network)?,
+ NeighborWalkState::GetNeighborsBegin => self.getneighbors_begin(network)?,
+ NeighborWalkState::GetNeighborsFinish => self.getneighbors_try_finish(network)?,
+ NeighborWalkState::GetHandshakesBegin => self.neighbor_handshakes_begin(network)?,
+ NeighborWalkState::GetHandshakesFinish => {
+ self.neighbor_handshakes_try_finish(network)?
+ }
+ NeighborWalkState::GetNeighborsNeighborsBegin => {
+ self.getneighbors_neighbors_begin(network)?
+ }
+ NeighborWalkState::GetNeighborsNeighborsFinish => {
+ self.getneighbors_neighbors_try_finish(network)?
+ }
+ NeighborWalkState::PingbackHandshakesBegin => {
+ self.pingback_handshakes_begin(network)?
+ }
+ NeighborWalkState::PingbackHandshakesFinish => {
+ self.pingback_handshakes_try_finish(network)?
+ }
+ NeighborWalkState::ReplacedNeighborsPingBegin => {
+ self.ping_existing_neighbors_begin(network)?
+ }
+ NeighborWalkState::ReplacedNeighborsPingFinish => {
+ let walk_result_opt = self.ping_existing_neighbors_try_finish(network)?;
+ if walk_result_opt.is_some() {
+ // did one pass of the state-machine
+ self.walk_step_count += 1;
+ return Ok(walk_result_opt);
+ }
+
+ // blocked; waiting for more replies
+ false
+ }
+ NeighborWalkState::Finished => false,
+ };
+ }
+
+ Ok(None)
+ }
+}
+
+/// NeighborWalk state machine is a neighbor set
+impl NeighborSet for NeighborWalk {
+ fn add_connecting(
+ &mut self,
+ network: &PeerNetwork,
+ nk: &NK,
+ event_id: usize,
+ ) {
+ self.connecting
+ .insert(nk.to_neighbor_key(network), event_id);
+ self.pin_connection(event_id);
+ }
+
+ fn get_connecting(&self, network: &PeerNetwork, nk: &NK) -> Option {
+ self.connecting
+ .get(&nk.to_neighbor_key(network))
+ .map(|event_ref| *event_ref)
+ }
+
+ fn remove_connecting(&mut self, network: &PeerNetwork, nk: &NK) {
+ self.connecting.remove(&nk.to_neighbor_key(network));
+ }
+
+ fn add_dead(&mut self, network: &PeerNetwork, nk: &NK) {
+ self.result
+ .dead_connections
+ .insert(nk.to_neighbor_key(network));
+ }
+
+ fn add_broken(&mut self, network: &PeerNetwork, nk: &NK) {
+ self.result
+ .broken_connections
+ .insert(nk.to_neighbor_key(network));
+ }
+
+ fn pin_connection(&mut self, event_id: usize) {
+ self.events.insert(event_id);
+ }
+
+ fn unpin_connection(&mut self, event_id: usize) {
+ self.events.remove(&event_id);
+ }
+
+ fn is_pinned(&self, event_id: usize) -> bool {
+ self.events.contains(&event_id)
+ }
+}