diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 0fe372b79..331fc9d89 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -62,6 +62,7 @@ use std::sync::mpsc::SyncSender; use std::sync::mpsc::Receiver; use std::sync::mpsc::sync_channel; use std::sync::mpsc::SendError; +use std::sync::mpsc::TrySendError; use std::sync::mpsc::RecvError; use std::sync::mpsc::TryRecvError; @@ -105,7 +106,6 @@ pub enum NetworkRequest { Ban(Vec), AdvertizeBlocks(BlocksAvailableMap), // announce to all wanting neighbors that we have these blocks AdvertizeMicroblocks(BlocksAvailableMap), // announce to all wanting neighbors that we have these confirmed microblock streams - Request(NeighborKey, StacksMessage, u64), // target neighbor, message to send, ttl Relay(NeighborKey, StacksMessage), Broadcast(Vec, StacksMessageType) } @@ -116,65 +116,42 @@ pub enum NetworkRequest { /// a way to issue commands and hear back replies from them. pub struct NetworkHandle { chan_in: SyncSender, - chan_out: Receiver, net_error>>, - result_buf: VecDeque>, - result_bufsz: usize + chan_out: Receiver>, } /// Internal handle for receiving requests from a NetworkHandle. /// This is the 'other end' of a NetworkHandle inside the peer network struct. struct NetworkHandleServer { chan_in: Receiver, - chan_out: SyncSender, net_error>> + chan_out: SyncSender> } impl NetworkHandle { - pub fn new(chan_in: SyncSender, chan_out: Receiver, net_error>>, result_bufsz: usize) -> NetworkHandle { + pub fn new(chan_in: SyncSender, chan_out: Receiver>) -> NetworkHandle { NetworkHandle { chan_in: chan_in, chan_out: chan_out, - result_buf: VecDeque::new(), - result_bufsz: result_bufsz } } + /// Send out a request to the p2p thread, and synchronously wait for a response fn send_request(&mut self, req: NetworkRequest) -> Result<(), net_error> { - self.chan_in.send(req).map_err(|_e| net_error::InvalidHandle)?; - - if self.result_bufsz > 0 { - while self.result_buf.len() < self.result_bufsz { - match self.chan_out.try_recv() { - Ok(res) => { - match res { - Ok(handle_opt) => { - assert!(handle_opt.is_none(), "BUG: sending a message unidirectionally resulted in a reply handle"); - self.result_buf.push_back(Ok(())); - }, - Err(e) => { - self.result_buf.push_back(Err(e)); - } - } - }, - Err(TryRecvError::Empty) => { - break; - }, - Err(TryRecvError::Disconnected) => { - return Err(net_error::InvalidHandle); - } - } + match self.chan_in.try_send(req) { + Ok(_) => {}, + Err(TrySendError::Full(_)) => { + warn!("P2P handle channel is full"); + return Err(net_error::FullHandle); } - Ok(()) - } - else { - match self.chan_out.recv().map_err(|_e| net_error::InvalidHandle)? { - Ok(_) => Ok(()), - Err(e) => Err(e) + Err(TrySendError::Disconnected(_)) => { + warn!("P2P handle channel is disconnected"); + return Err(net_error::InvalidHandle); } } - } - pub fn next_result(&mut self) -> Option> { - self.result_buf.pop_front() + match self.chan_out.recv().map_err(|_e| net_error::InvalidHandle)? { + Ok(_) => Ok(()), + Err(e) => Err(e) + } } /// Connect to a remote peer @@ -207,22 +184,6 @@ impl NetworkHandle { self.send_request(req) } - /// Sends the message to the p2p network thread and gets back a reply handle the calling thread - /// can wait on. - pub fn send_signed_request(&mut self, neighbor_key: NeighborKey, msg: StacksMessage, ttl: u64) -> Result { - let req = NetworkRequest::Request(neighbor_key, msg, ttl); - - self.chan_in.send(req).map_err(|_e| net_error::InvalidHandle)?; - match self.chan_out.recv().map_err(|_e| net_error::InvalidHandle)? { - Ok(Some(handle)) => Ok(handle), - Ok(None) => { - // this can only happen if there's a bug - panic!("BUG: p2p network did not return a reply handle to a request"); - }, - Err(e) => Err(e) - } - } - /// Relay a message to a peer via the p2p network thread, expecting no reply. /// Called from outside the p2p thread by other threads. pub fn relay_signed_message(&mut self, neighbor_key: NeighborKey, msg: StacksMessage) -> Result<(), net_error> { @@ -239,18 +200,18 @@ impl NetworkHandle { } impl NetworkHandleServer { - pub fn new(chan_in: Receiver, chan_out: SyncSender, net_error>>) -> NetworkHandleServer { + pub fn new(chan_in: Receiver, chan_out: SyncSender>) -> NetworkHandleServer { NetworkHandleServer { chan_in: chan_in, chan_out: chan_out } } - pub fn pair(bufsz: usize, result_bufsz: usize) -> (NetworkHandleServer, NetworkHandle) { + pub fn pair(bufsz: usize) -> (NetworkHandleServer, NetworkHandle) { let (msg_send, msg_recv) = sync_channel(bufsz); let (handle_send, handle_recv) = sync_channel(bufsz); let server = NetworkHandleServer::new(msg_recv, handle_send); - let client = NetworkHandle::new(msg_send, handle_recv, result_bufsz); + let client = NetworkHandle::new(msg_send, handle_recv); (server, client) } } @@ -460,8 +421,8 @@ impl PeerNetwork { } /// Create a network handle for another thread to use to communicate with remote peers - pub fn new_handle(&mut self, bufsz: usize, response_bufsz: usize) -> NetworkHandle { - let (server, client) = NetworkHandleServer::pair(bufsz, response_bufsz); + pub fn new_handle(&mut self, bufsz: usize) -> NetworkHandle { + let (server, client) = NetworkHandleServer::pair(bufsz); self.handles.push_back(server); client } @@ -734,16 +695,15 @@ impl PeerNetwork { } /// Dispatch a single request from another thread. - /// Returns an option for a reply handle if the caller expects the peer to reply. - fn dispatch_request(&mut self, request: NetworkRequest) -> Result, net_error> { + fn dispatch_request(&mut self, request: NetworkRequest) -> Result<(), net_error> { match request { NetworkRequest::Connect(neighbor_key) => { self.connect_peer(&neighbor_key) - .and_then(|_event_id| Ok(None)) + .and_then(|_event_id| Ok(())) }, NetworkRequest::Disconnect(neighbor_key) => { self.deregister_neighbor(&neighbor_key); - Ok(None) + Ok(()) }, NetworkRequest::Ban(neighbor_keys) => { for neighbor_key in neighbor_keys.iter() { @@ -756,27 +716,23 @@ impl PeerNetwork { None => {} } } - Ok(None) + Ok(()) }, NetworkRequest::AdvertizeBlocks(blocks) => { if !(cfg!(test) && self.connection_opts.disable_block_advertisement) { self.advertize_blocks(blocks)?; } - Ok(None) + Ok(()) } NetworkRequest::AdvertizeMicroblocks(mblocks) => { if !(cfg!(test) && self.connection_opts.disable_block_advertisement) { self.advertize_microblocks(mblocks)?; } - Ok(None) + Ok(()) } - NetworkRequest::Request(neighbor_key, msg, ttl) => { - self.send_message(&neighbor_key, msg, ttl) - .and_then(|rh| Ok(Some(rh))) - }, NetworkRequest::Relay(neighbor_key, msg) => { self.relay_signed_message(&neighbor_key, msg) - .and_then(|_| Ok(None)) + .and_then(|_| Ok(())) }, NetworkRequest::Broadcast(relay_hints, msg) => { // pick some neighbors. Note that only some messages can be broadcasted. @@ -810,13 +766,14 @@ impl PeerNetwork { } }?; self.broadcast_message(neighbor_keys, relay_hints, msg); - Ok(None) + Ok(()) } } } /// Process any handle requests from other threads. /// Returns the number of requests dispatched. + /// This method does not block. fn dispatch_requests(&mut self) -> usize { let mut to_remove = vec![]; let mut messages = vec![]; @@ -860,12 +817,15 @@ impl PeerNetwork { for (i, dispatch_res) in responses { match self.handles.get(i) { Some(handle) => { - let send_res = handle.chan_out.send(dispatch_res); + let send_res = handle.chan_out.try_send(dispatch_res); match send_res { Ok(_) => { num_dispatched += 1; } - Err(_e) => { + Err(TrySendError::Full(dropped)) => { + warn!("P2P client channel {} is full; dropping '{:?}'", i, &dropped); + } + Err(TrySendError::Disconnected(_)) => { // channel disconnected; remove to_remove.push(i); } @@ -2896,7 +2856,7 @@ mod test { &p2p.chain_view.burn_stable_consensus_hash, StacksMessageType::Ping(PingData::new())); - let mut h = p2p.new_handle(1, 0); + let mut h = p2p.new_handle(1); use std::net::TcpListener; let listener = TcpListener::bind("127.0.0.1:2100").unwrap(); @@ -2996,7 +2956,7 @@ mod test { &p2p.chain_view.burn_stable_consensus_hash, StacksMessageType::Ping(PingData::new())); - let mut h = p2p.new_handle(1, 0); + let mut h = p2p.new_handle(1); use std::net::TcpListener; let listener = TcpListener::bind("127.0.0.1:2200").unwrap();