Merge branch 'feat/pox-chainstate-refactor' into next

This commit is contained in:
Jude Nelson
2020-07-27 19:38:03 -04:00
5 changed files with 102 additions and 288 deletions

View File

@@ -804,22 +804,21 @@ The peer may take an operator-given public IP address. If no public IP address
is given, the peer will learn the IP address using the `NatPunchRequest` and
`NatPunchReply` messages as follows:
1. The peer sends a `NatPunchRequest` to a randomly-chosen neighbor it has
1. The peer sends a `NatPunchRequest` to a randomly-chosen initial neighbor it has
already handshaked with. It uses a random nonce value.
2. The remote neighbor replies with a (signed) `NatPunchReply` message, with its
`addrbytes` and `port` set to what it believes the public IP is (based on the
underlying socket's peer address).
3. The peer, upon receipt and authentication of the `NatPunchReply`, will
confirm the public IP address by attempting to connect to itself. To do so,
it sends a `NatPunchRequest` to the public IP address it learned (with a
random nonce).
4. If the peer successfully connects to itself via the public IP address, it
will send a `NatPunchReply` back to itself with the nonce used in step 3.
5. Upon receipt of the `NatPunchReply`, the peer will have confirmed its public
3. Upon receipt of the `NatPunchReply`, the peer will have confirmed its public
IP address, and will send it in all future `HandshakeAccept` messages. It
will periodically re-learn its IP address, if it was not given by the
operator.
Because the peer's initial neighbors are chosen by the operator as being
sufficiently trustworthy to supply network information for network walks, it is
reasonable to assume that they can also be trusted to tell a bootstrapping peer
its public IP address.
### Checking a Peer's Liveness
A sender peer can check that a peer is still alive by sending it a `Ping`

View File

@@ -723,7 +723,7 @@ impl Burnchain {
let ipc_block = downloader.download(&ipc_header)?;
let download_end = get_epoch_time_ms();
debug!("Downloaded block {} in {}ms", ipc_block.height(), download_end - download_start);
debug!("Downloaded block {} in {}ms", ipc_block.height(), download_end.saturating_sub(download_start));
parser_send.send(Some(ipc_block))
.map_err(|_e| burnchain_error::ThreadChannelError)?;
@@ -741,7 +741,7 @@ impl Burnchain {
let burnchain_block = parser.parse(&ipc_block)?;
let parse_end = get_epoch_time_ms();
debug!("Parsed block {} in {}ms", burnchain_block.block_height(), parse_end - parse_start);
debug!("Parsed block {} in {}ms", burnchain_block.block_height(), parse_end.saturating_sub(parse_start));
db_send.send(Some(burnchain_block))
.map_err(|_e| burnchain_error::ThreadChannelError)?;
@@ -765,7 +765,7 @@ impl Burnchain {
last_processed = (tip, Some(transition));
let insert_end = get_epoch_time_ms();
debug!("Inserted block {} in {}ms", burnchain_block.block_height(), insert_end - insert_start);
debug!("Inserted block {} in {}ms", burnchain_block.block_height(), insert_end.saturating_sub(insert_start));
}
Ok(last_processed)
});

View File

@@ -280,6 +280,7 @@ const PEERDB_SETUP : &'static [&'static str]= &[
org INTEGER NOT NULL,
allowed INTEGER NOT NULL,
denied INTEGER NOT NULL,
initial INTEGER NOT NULL, -- 1 if this was one of the initial neighbors, 0 otherwise
in_degree INTEGER NOT NULL,
out_degree INTEGER NOT NULL,
@@ -374,6 +375,10 @@ impl PeerDB {
PeerDB::asn4_insert(&mut tx, &asn4)?;
}
for neighbor in initial_neighbors {
PeerDB::set_initial_peer(&mut tx, neighbor.addr.network_id, &neighbor.addr.addrbytes, neighbor.addr.port)?;
}
tx.commit()
.map_err(db_error::SqliteError)?;
@@ -478,13 +483,22 @@ impl PeerDB {
db.instantiate(network_id, parent_network_id, privkey_opt, key_expires, data_url, p2p_addr, p2p_port, asn4_recs, &vec![])?;
}
}
} else {
}
else {
db.update_local_peer(network_id, parent_network_id, data_url, p2p_port)?;
{
let mut tx = db.tx_begin()?;
PeerDB::refresh_allows(&mut tx)?;
PeerDB::refresh_denies(&mut tx)?;
PeerDB::clear_initial_peers(&mut tx)?;
if let Some(neighbors) = initial_neighbors {
for neighbor in neighbors {
PeerDB::set_initial_peer(&mut tx, neighbor.addr.network_id, &neighbor.addr.addrbytes, neighbor.addr.port)?;
}
}
tx.commit()?;
}
}
@@ -672,10 +686,11 @@ impl PeerDB {
&neighbor.denied,
&neighbor.in_degree,
&neighbor.out_degree,
&0i64,
&slot];
tx.execute("INSERT OR REPLACE INTO frontier (peer_version, network_id, addrbytes, port, public_key, expire_block_height, last_contact_time, asn, org, allowed, denied, in_degree, out_degree, slot) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)", neighbor_args)
tx.execute("INSERT OR REPLACE INTO frontier (peer_version, network_id, addrbytes, port, public_key, expire_block_height, last_contact_time, asn, org, allowed, denied, in_degree, out_degree, initial, slot) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)", neighbor_args)
.map_err(db_error::SqliteError)?;
Ok(())
@@ -690,6 +705,34 @@ impl PeerDB {
Ok(())
}
/// Is a peer one of this node's initial neighbors?
pub fn is_initial_peer(conn: &DBConn, network_id: u32, peer_addr: &PeerAddress, peer_port: u16) -> Result<bool, db_error> {
let res : Option<i64> = query_row(conn, "SELECT initial FROM frontier WHERE network_id = ?1 AND addrbytes = ?2 AND port = ?3",
&[&network_id as &dyn ToSql, &peer_addr.to_bin(), &peer_port])?;
match res {
Some(x) => Ok(x != 0),
None => Ok(false)
}
}
/// Set a peer as an initial peer
fn set_initial_peer<'a>(tx: &mut Transaction<'a>, network_id: u32, peer_addr: &PeerAddress, peer_port: u16) -> Result<(), db_error> {
tx.execute("UPDATE frontier SET initial = 1 WHERE network_id = ?1 AND addrbytes = ?2 AND port = ?3",
&[&network_id as &dyn ToSql, &peer_addr.to_bin(), &peer_port])
.map_err(db_error::SqliteError)?;
Ok(())
}
/// clear all initial peers
fn clear_initial_peers<'a>(tx: &mut Transaction<'a>) -> Result<(), db_error> {
tx.execute("UPDATE frontier SET initial = 0", NO_PARAMS)
.map_err(db_error::SqliteError)?;
Ok(())
}
/// Set/unset allow flag for a peer
/// Pass -1 for "always"
pub fn set_allow_peer<'a>(tx: &mut Transaction<'a>, network_id: u32, peer_addr: &PeerAddress, peer_port: u16, allow_deadline: i64) -> Result<(), db_error> {
@@ -1211,6 +1254,10 @@ mod test {
assert!(n.expire_block > 23456 + 14);
assert!(n.allowed == 0);
}
for neighbor in &initial_neighbors {
assert!(PeerDB::is_initial_peer(db.conn(), neighbor.addr.network_id, &neighbor.addr.addrbytes, neighbor.addr.port).unwrap());
}
}
#[test]

View File

@@ -2191,7 +2191,8 @@ mod test {
let mut peer_1_config = TestPeerConfig::from_port(31990);
let peer_2_config = TestPeerConfig::from_port(31992);
// peer 1 crawls peer 2
// peer 1 crawls peer 2, but not vice versa
// (so only peer 1 will learn its public IP)
peer_1_config.add_neighbor(&peer_2_config.to_neighbor());
let mut peer_1 = TestPeer::new(peer_1_config);
@@ -2201,7 +2202,7 @@ mod test {
let mut walk_1_count = 0;
let mut walk_2_count = 0;
while walk_1_count < 20 || walk_2_count < 20 || (!peer_1.network.public_ip_confirmed || !peer_2.network.public_ip_confirmed) {
while walk_1_count < 20 || walk_2_count < 20 || (!peer_1.network.public_ip_confirmed) {
let _ = peer_1.step();
let _ = peer_2.step();
@@ -2264,11 +2265,10 @@ mod test {
assert!(peer_1.network.public_ip_learned);
assert!(peer_1.network.public_ip_confirmed);
// peer 2 learned and confirmed its public IP address from peer 1
assert!(peer_2.network.local_peer.public_ip_address.is_some());
assert_eq!(peer_2.network.local_peer.public_ip_address.clone().unwrap(), (PeerAddress::from_socketaddr(&format!("127.0.0.1:1").parse::<SocketAddr>().unwrap()), 31992));
// peer 2 learned nothing, despite trying
assert!(peer_2.network.local_peer.public_ip_address.is_none());
assert!(peer_2.network.public_ip_learned);
assert!(peer_2.network.public_ip_confirmed);
assert!(!peer_2.network.public_ip_confirmed);
}
#[test]

View File

@@ -258,7 +258,6 @@ impl NetworkHandleServer {
#[derive(Debug, Clone, PartialEq, Copy)]
pub enum PeerNetworkWorkState {
GetPublicIP,
ConfirmPublicIP,
BlockInvSync,
BlockDownload,
Prune
@@ -337,12 +336,9 @@ pub struct PeerNetwork {
// our public IP address that we give out in our handshakes
pub public_ip_learned: bool, // was the IP address given to us, or did we have to go learn it?
pub public_ip_confirmed: bool, // once we learned the IP address, were we able to confirm it by self-connecting?
public_ip_address_unconfirmed: Option<(PeerAddress, u16)>,
public_ip_requested_at: u64,
public_ip_learned_at: u64,
public_ip_reply_handle: Option<ReplyHandleP2P>,
public_ip_self_event_id: usize,
public_ip_ping_nonce: u32,
public_ip_retries: u64,
}
@@ -404,14 +400,11 @@ impl PeerNetwork {
port: 0
},
public_ip_address_unconfirmed: pub_ip.clone(),
public_ip_learned: pub_ip_learned,
public_ip_requested_at: 0,
public_ip_learned_at: 0,
public_ip_confirmed: false,
public_ip_reply_handle: None,
public_ip_self_event_id: 0,
public_ip_ping_nonce: 0,
public_ip_retries: 0
}
}
@@ -983,23 +976,17 @@ impl PeerNetwork {
/// Check to see if we can register the given socket
/// * we can't have registered this neighbor already
/// * if this is inbound, we can't add more than self.num_clients
fn can_register_peer(&mut self, event_id: usize, neighbor_key: &NeighborKey, outbound: bool) -> Result<(), net_error> {
if !(!self.public_ip_confirmed && self.public_ip_self_event_id == event_id) {
// (this is _not_ us connecting to ourselves)
// don't talk to our bind address
if self.is_bound(neighbor_key) {
debug!("{:?}: do not register myself at {:?}", &self.local_peer, neighbor_key);
return Err(net_error::Denied);
}
// denied?
if PeerDB::is_peer_denied(&self.peerdb.conn(), neighbor_key.network_id, &neighbor_key.addrbytes, neighbor_key.port)? {
info!("{:?}: Peer {:?} is denied; dropping", &self.local_peer, neighbor_key);
return Err(net_error::Denied);
}
fn can_register_peer(&mut self, neighbor_key: &NeighborKey, outbound: bool) -> Result<(), net_error> {
// don't talk to our bind address
if self.is_bound(neighbor_key) {
debug!("{:?}: do not register myself at {:?}", &self.local_peer, neighbor_key);
return Err(net_error::Denied);
}
else {
debug!("{:?}: skip deny check for verifying my IP address (event {})", &self.local_peer, event_id);
// denied?
if PeerDB::is_peer_denied(&self.peerdb.conn(), neighbor_key.network_id, &neighbor_key.addrbytes, neighbor_key.port)? {
info!("{:?}: Peer {:?} is denied; dropping", &self.local_peer, neighbor_key);
return Err(net_error::Denied);
}
// already connected?
@@ -1053,7 +1040,7 @@ impl PeerNetwork {
None => (None, NeighborKey::from_socketaddr(self.peer_version, self.local_peer.network_id, &client_addr))
};
match self.can_register_peer(event_id, &neighbor_key, outbound) {
match self.can_register_peer(&neighbor_key, outbound) {
Ok(_) => {},
Err(e) => {
debug!("Could not register peer {:?}: {:?}", &neighbor_key, &e);
@@ -1659,7 +1646,7 @@ impl PeerNetwork {
debug!("{:?}: begin obtaining public IP address", &self.local_peer);
// pick a random outbound conversation
// pick a random outbound conversation to one of the initial neighbors
let mut idx = thread_rng().gen::<usize>() % self.peers.len();
for _ in 0..self.peers.len()+1 {
let event_id = match self.peers.keys().skip(idx).next() {
@@ -1676,6 +1663,10 @@ impl PeerNetwork {
continue;
}
if !PeerDB::is_initial_peer(self.peerdb.conn(), convo.peer_network_id, &convo.peer_addrbytes, convo.peer_port)? {
continue;
}
debug!("Ask {:?} for my IP address", &convo);
let nonce = thread_rng().gen::<u32>();
@@ -1709,7 +1700,6 @@ impl PeerNetwork {
return Ok(true);
}
/// Learn this peer's public IP address.
/// If it was given to us directly, then we can just skip this step.
/// Once learned, we'll confirm it by trying to self-connect.
@@ -1738,13 +1728,26 @@ impl PeerNetwork {
Ok(message) => match message.payload {
StacksMessageType::NatPunchReply(data) => {
// peer offers us our public IP address.
// confirm it by self-connecting
debug!("{:?}: learned that my IP address is supposidly {:?}", &self.local_peer, &data.addrbytes);
info!("{:?}: learned that my IP address is {:?}", &self.local_peer, &data.addrbytes);
self.public_ip_confirmed = true;
self.public_ip_learned_at = get_epoch_time_secs();
self.public_ip_retries = 0;
// prepare for the next step -- confirming the public IP address
self.public_ip_confirmed = false;
self.public_ip_self_event_id = 0;
self.public_ip_address_unconfirmed = Some((data.addrbytes, self.bind_nk.port));
// if our IP address changed, then disconnect witih everyone
let old_ip = self.local_peer.public_ip_address.clone();
self.local_peer.public_ip_address = Some((data.addrbytes, self.bind_nk.port));
if old_ip != self.local_peer.public_ip_address {
let mut all_event_ids = vec![];
for (eid, _) in self.peers.iter() {
all_event_ids.push(*eid);
}
info!("IP address changed from {:?} to {:?}; closing all connections and re-establishing them", &old_ip, &self.local_peer.public_ip_address);
for eid in all_event_ids.into_iter() {
self.deregister_peer(eid);
}
}
return Ok(true);
},
other_payload => {
@@ -1772,162 +1775,6 @@ impl PeerNetwork {
return Ok(true);
}
/// Begin the process of confirming our public IP address
/// Return Ok(finished preparing to confirm the IP address)
/// return Err(..) on failure
fn begin_ping_public_ip(&mut self, public_ip: (PeerAddress, u16)) -> Result<bool, net_error> {
// ping ourselves using our public IP
if self.public_ip_self_event_id == 0 {
debug!("{:?}: Begin confirming public IP address", &self.local_peer);
// connect to ourselves
let public_nk = NeighborKey {
network_id: self.local_peer.network_id,
peer_version: self.peer_version,
addrbytes: public_ip.0.clone(),
port: public_ip.1
};
let event_id = match self.connect_peer_deny_checks(&public_nk, false) {
Ok(eid) => eid,
Err(net_error::AlreadyConnected(eid)) => eid, // weird if this happens, but you never know
Err(e) => {
info!("Failed to connect to my IP address: {:?}", &e);
return Err(e);
}
};
self.public_ip_self_event_id = event_id;
// call again
return Ok(false);
}
else if self.connecting.contains_key(&self.public_ip_self_event_id) {
debug!("{:?}: still connecting to myself at {:?}", &self.local_peer, &public_ip);
// call again
return Ok(false);
}
else if let Some(ref mut convo) = self.peers.get_mut(&self.public_ip_self_event_id) {
// connected! Ping myself with another natpunch
debug!("{:?}: Pinging myself at {:?}", &self.local_peer, &public_ip);
let nonce = thread_rng().gen::<u32>();
let ping_natpunch = StacksMessageType::NatPunchRequest(nonce);
self.public_ip_ping_nonce = nonce;
let ping_request = convo.sign_message(&self.chain_view, &self.local_peer.private_key, ping_natpunch)
.map_err(|e| {
info!("Failed to sign ping to myself: {:?}", &e);
e
})?;
let mut rh = convo.send_signed_request(ping_request, self.connection_opts.timeout)
.map_err(|e| {
info!("Failed to send ping to myself: {:?}", &e);
e
})?;
self.saturate_p2p_socket(self.public_ip_self_event_id, &mut rh)
.map_err(|e| {
info!("Failed to saturate ping socket to myself");
e
})?;
self.public_ip_reply_handle = Some(rh);
return Ok(true);
}
else {
// could not connect (timed out or the like)
info!("{:?}: Failed to connect to myself for IP confirmation", &self.local_peer);
return Ok(true);
}
}
/// Confirm our public IP address if we had to learn it -- try to connect to ourselves via a
/// ping, and if we succeed, we know that the peer we learned it from was being honest (enough)
/// Return Ok(done with this step?) on success
/// Return Err(..) on failure
fn do_ping_public_ip(&mut self) -> Result<bool, net_error> {
assert!(self.public_ip_address_unconfirmed.is_some());
let public_ip = self.public_ip_address_unconfirmed.clone().unwrap();
if self.public_ip_reply_handle.is_none() {
if !self.begin_ping_public_ip(public_ip.clone())? {
return Ok(false);
}
}
let rh_opt = self.public_ip_reply_handle.take();
if let Some(mut rh) = rh_opt {
debug!("{:?}: waiting for Pong from myself to confirm my IP address", &self.local_peer);
if let Err(e) = self.saturate_p2p_socket(rh.get_event_id(), &mut rh) {
info!("{:?}: Failed to ping myself to confirm my IP address", &self.local_peer);
return Err(e);
}
match rh.try_send_recv() {
Ok(message) => {
// disconnect from myself
self.deregister_peer(self.public_ip_self_event_id);
self.public_ip_self_event_id = 0;
match message.payload {
StacksMessageType::NatPunchReply(data) => {
if data.nonce == self.public_ip_ping_nonce {
// confirmed!
info!("{:?}: confirmed my public IP to be {:?}", &self.local_peer, &public_ip);
self.public_ip_confirmed = true;
self.public_ip_learned_at = get_epoch_time_secs();
self.public_ip_retries = 0;
// if our IP address changed, then disconnect witih everyone
let old_ip = self.local_peer.public_ip_address.clone();
self.local_peer.public_ip_address = self.public_ip_address_unconfirmed.clone();
if old_ip != self.local_peer.public_ip_address {
let mut all_event_ids = vec![];
for (eid, _) in self.peers.iter() {
all_event_ids.push(*eid);
}
info!("IP address changed from {:?} to {:?}; closing all connections and re-establishing them", &old_ip, &self.local_peer.public_ip_address);
for eid in all_event_ids.into_iter() {
self.deregister_peer(eid);
}
}
return Ok(true);
}
else {
// weird response
info!("{:?}: invalid Pong response to myself: {} != {}", &self.local_peer, data.nonce, self.public_ip_ping_nonce);
return Err(net_error::InvalidMessage);
}
},
other_payload => {
info!("{:?}: unexpected response to my public IP confirmation ping: {:?}", &self.local_peer, &other_payload);
return Err(net_error::InvalidMessage);
}
}
},
Err(req_res) => match req_res {
Ok(same_req) => {
// try again
self.public_ip_reply_handle = Some(same_req);
return Ok(false);
}
Err(e) => {
// disconnected
debug!("{:?}: Failed to get a ping reply: {:?}", &self.local_peer, &e);
return Err(e);
}
}
}
}
return Ok(true);
}
/// Do we need to (re)fetch our public IP?
fn need_public_ip(&mut self) -> bool {
if !self.public_ip_learned {
@@ -1967,15 +1814,8 @@ impl PeerNetwork {
fn public_ip_reset(&mut self) {
debug!("{:?}: reset public IP query state", &self.local_peer);
if self.public_ip_self_event_id > 0 {
self.deregister_peer(self.public_ip_self_event_id);
self.public_ip_self_event_id = 0;
}
self.public_ip_self_event_id = 0;
self.public_ip_reply_handle = None;
self.public_ip_confirmed = false;
self.public_ip_address_unconfirmed = None;
if self.public_ip_learned {
// will go relearn it if it wasn't given
@@ -2019,59 +1859,6 @@ impl PeerNetwork {
Ok(true)
}
/// Confirm our publicly-routable IP address.
/// Return true once we're done.
fn do_confirm_public_ip(&mut self) -> Result<bool, net_error> {
if !self.need_public_ip() {
return Ok(true);
}
if self.public_ip_confirmed {
// IP already confirmed
test_debug!("{:?}: learned IP address is confirmed", &self.local_peer);
return Ok(true);
}
if self.public_ip_address_unconfirmed.is_none() {
// can't do this yet, so skip
test_debug!("{:?}: unconfirmed IP address is not known yet", &self.local_peer);
return Ok(true);
}
// finished request successfully
self.public_ip_requested_at = get_epoch_time_secs();
match self.do_ping_public_ip() {
Ok(b) => {
if !b {
test_debug!("{:?}: try do_confirm_public_ip again", &self.local_peer);
return Ok(false);
}
},
Err(e) => {
test_debug!("{:?}: failed to confirm public IP: {:?}", &self.local_peer, &e);
self.public_ip_reset();
match e {
net_error::NoSuchNeighbor => {
// haven't connected to anyone yet
return Ok(true);
},
_ => {
return Err(e);
}
};
}
}
// learned and confirmed! clean up
if self.public_ip_self_event_id > 0 {
self.deregister_peer(self.public_ip_self_event_id);
}
self.public_ip_self_event_id = 0;
self.public_ip_reply_handle = None;
Ok(true)
}
/// Update the state of our neighbors' block inventories.
/// Return true if we finish
fn do_network_inv_sync(&mut self, sortdb: &SortitionDB) -> Result<bool, net_error> {
@@ -2179,7 +1966,7 @@ impl PeerNetwork {
match self.do_get_public_ip() {
Ok(b) => {
if b {
self.work_state = PeerNetworkWorkState::ConfirmPublicIP;
self.work_state = PeerNetworkWorkState::BlockInvSync;
}
}
Err(e) => {
@@ -2189,25 +1976,6 @@ impl PeerNetwork {
}
}
},
PeerNetworkWorkState::ConfirmPublicIP => {
// confirm the public IP address we previously got
if cfg!(test) && self.connection_opts.disable_natpunch {
self.work_state = PeerNetworkWorkState::BlockInvSync;
}
else {
match self.do_confirm_public_ip() {
Ok(b) => {
if b {
self.work_state = PeerNetworkWorkState::BlockInvSync;
}
},
Err(e) => {
info!("Failed to confirm public IP ({:?}); skipping", &e);
self.work_state = PeerNetworkWorkState::BlockInvSync;
}
}
}
}
PeerNetworkWorkState::BlockInvSync => {
// synchronize peer block inventories
if self.do_network_inv_sync(sortdb)? {