Drop the Request type variant since it's no longer used anywhere. Make it so that any attempt to send a request to the p2p thread can fail if the channel is full, and make it so the p2p thread can't block on returning an error message to a client (e.g. the relayer). Do not bother buffering up errors -- if the client's channel is full, drop them and log them (since that's all the client will be doing anyway)

This commit is contained in:
Jude Nelson
2020-08-09 22:54:07 -04:00
parent 8c60bb3cfb
commit 19fcf1b03b

View File

@@ -62,6 +62,7 @@ use std::sync::mpsc::SyncSender;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::SendError;
use std::sync::mpsc::TrySendError;
use std::sync::mpsc::RecvError;
use std::sync::mpsc::TryRecvError;
@@ -105,7 +106,6 @@ pub enum NetworkRequest {
Ban(Vec<NeighborKey>),
AdvertizeBlocks(BlocksAvailableMap), // announce to all wanting neighbors that we have these blocks
AdvertizeMicroblocks(BlocksAvailableMap), // announce to all wanting neighbors that we have these confirmed microblock streams
Request(NeighborKey, StacksMessage, u64), // target neighbor, message to send, ttl
Relay(NeighborKey, StacksMessage),
Broadcast(Vec<RelayData>, StacksMessageType)
}
@@ -116,65 +116,42 @@ pub enum NetworkRequest {
/// a way to issue commands and hear back replies from them.
pub struct NetworkHandle {
chan_in: SyncSender<NetworkRequest>,
chan_out: Receiver<Result<Option<ReplyHandleP2P>, net_error>>,
result_buf: VecDeque<Result<(), net_error>>,
result_bufsz: usize
chan_out: Receiver<Result<(), net_error>>,
}
/// Internal handle for receiving requests from a NetworkHandle.
/// This is the 'other end' of a NetworkHandle inside the peer network struct.
struct NetworkHandleServer {
chan_in: Receiver<NetworkRequest>,
chan_out: SyncSender<Result<Option<ReplyHandleP2P>, net_error>>
chan_out: SyncSender<Result<(), net_error>>
}
impl NetworkHandle {
pub fn new(chan_in: SyncSender<NetworkRequest>, chan_out: Receiver<Result<Option<ReplyHandleP2P>, net_error>>, result_bufsz: usize) -> NetworkHandle {
pub fn new(chan_in: SyncSender<NetworkRequest>, chan_out: Receiver<Result<(), net_error>>) -> NetworkHandle {
NetworkHandle {
chan_in: chan_in,
chan_out: chan_out,
result_buf: VecDeque::new(),
result_bufsz: result_bufsz
}
}
/// Send out a request to the p2p thread, and synchronously wait for a response
fn send_request(&mut self, req: NetworkRequest) -> Result<(), net_error> {
self.chan_in.send(req).map_err(|_e| net_error::InvalidHandle)?;
if self.result_bufsz > 0 {
while self.result_buf.len() < self.result_bufsz {
match self.chan_out.try_recv() {
Ok(res) => {
match res {
Ok(handle_opt) => {
assert!(handle_opt.is_none(), "BUG: sending a message unidirectionally resulted in a reply handle");
self.result_buf.push_back(Ok(()));
},
Err(e) => {
self.result_buf.push_back(Err(e));
}
}
},
Err(TryRecvError::Empty) => {
break;
},
Err(TryRecvError::Disconnected) => {
return Err(net_error::InvalidHandle);
}
}
match self.chan_in.try_send(req) {
Ok(_) => {},
Err(TrySendError::Full(_)) => {
warn!("P2P handle channel is full");
return Err(net_error::FullHandle);
}
Ok(())
}
else {
match self.chan_out.recv().map_err(|_e| net_error::InvalidHandle)? {
Ok(_) => Ok(()),
Err(e) => Err(e)
Err(TrySendError::Disconnected(_)) => {
warn!("P2P handle channel is disconnected");
return Err(net_error::InvalidHandle);
}
}
}
pub fn next_result(&mut self) -> Option<Result<(), net_error>> {
self.result_buf.pop_front()
match self.chan_out.recv().map_err(|_e| net_error::InvalidHandle)? {
Ok(_) => Ok(()),
Err(e) => Err(e)
}
}
/// Connect to a remote peer
@@ -207,22 +184,6 @@ impl NetworkHandle {
self.send_request(req)
}
/// Sends the message to the p2p network thread and gets back a reply handle the calling thread
/// can wait on.
pub fn send_signed_request(&mut self, neighbor_key: NeighborKey, msg: StacksMessage, ttl: u64) -> Result<ReplyHandleP2P, net_error> {
let req = NetworkRequest::Request(neighbor_key, msg, ttl);
self.chan_in.send(req).map_err(|_e| net_error::InvalidHandle)?;
match self.chan_out.recv().map_err(|_e| net_error::InvalidHandle)? {
Ok(Some(handle)) => Ok(handle),
Ok(None) => {
// this can only happen if there's a bug
panic!("BUG: p2p network did not return a reply handle to a request");
},
Err(e) => Err(e)
}
}
/// Relay a message to a peer via the p2p network thread, expecting no reply.
/// Called from outside the p2p thread by other threads.
pub fn relay_signed_message(&mut self, neighbor_key: NeighborKey, msg: StacksMessage) -> Result<(), net_error> {
@@ -239,18 +200,18 @@ impl NetworkHandle {
}
impl NetworkHandleServer {
pub fn new(chan_in: Receiver<NetworkRequest>, chan_out: SyncSender<Result<Option<ReplyHandleP2P>, net_error>>) -> NetworkHandleServer {
pub fn new(chan_in: Receiver<NetworkRequest>, chan_out: SyncSender<Result<(), net_error>>) -> NetworkHandleServer {
NetworkHandleServer {
chan_in: chan_in,
chan_out: chan_out
}
}
pub fn pair(bufsz: usize, result_bufsz: usize) -> (NetworkHandleServer, NetworkHandle) {
pub fn pair(bufsz: usize) -> (NetworkHandleServer, NetworkHandle) {
let (msg_send, msg_recv) = sync_channel(bufsz);
let (handle_send, handle_recv) = sync_channel(bufsz);
let server = NetworkHandleServer::new(msg_recv, handle_send);
let client = NetworkHandle::new(msg_send, handle_recv, result_bufsz);
let client = NetworkHandle::new(msg_send, handle_recv);
(server, client)
}
}
@@ -460,8 +421,8 @@ impl PeerNetwork {
}
/// Create a network handle for another thread to use to communicate with remote peers
pub fn new_handle(&mut self, bufsz: usize, response_bufsz: usize) -> NetworkHandle {
let (server, client) = NetworkHandleServer::pair(bufsz, response_bufsz);
pub fn new_handle(&mut self, bufsz: usize) -> NetworkHandle {
let (server, client) = NetworkHandleServer::pair(bufsz);
self.handles.push_back(server);
client
}
@@ -734,16 +695,15 @@ impl PeerNetwork {
}
/// Dispatch a single request from another thread.
/// Returns an option for a reply handle if the caller expects the peer to reply.
fn dispatch_request(&mut self, request: NetworkRequest) -> Result<Option<ReplyHandleP2P>, net_error> {
fn dispatch_request(&mut self, request: NetworkRequest) -> Result<(), net_error> {
match request {
NetworkRequest::Connect(neighbor_key) => {
self.connect_peer(&neighbor_key)
.and_then(|_event_id| Ok(None))
.and_then(|_event_id| Ok(()))
},
NetworkRequest::Disconnect(neighbor_key) => {
self.deregister_neighbor(&neighbor_key);
Ok(None)
Ok(())
},
NetworkRequest::Ban(neighbor_keys) => {
for neighbor_key in neighbor_keys.iter() {
@@ -756,27 +716,23 @@ impl PeerNetwork {
None => {}
}
}
Ok(None)
Ok(())
},
NetworkRequest::AdvertizeBlocks(blocks) => {
if !(cfg!(test) && self.connection_opts.disable_block_advertisement) {
self.advertize_blocks(blocks)?;
}
Ok(None)
Ok(())
}
NetworkRequest::AdvertizeMicroblocks(mblocks) => {
if !(cfg!(test) && self.connection_opts.disable_block_advertisement) {
self.advertize_microblocks(mblocks)?;
}
Ok(None)
Ok(())
}
NetworkRequest::Request(neighbor_key, msg, ttl) => {
self.send_message(&neighbor_key, msg, ttl)
.and_then(|rh| Ok(Some(rh)))
},
NetworkRequest::Relay(neighbor_key, msg) => {
self.relay_signed_message(&neighbor_key, msg)
.and_then(|_| Ok(None))
.and_then(|_| Ok(()))
},
NetworkRequest::Broadcast(relay_hints, msg) => {
// pick some neighbors. Note that only some messages can be broadcasted.
@@ -810,13 +766,14 @@ impl PeerNetwork {
}
}?;
self.broadcast_message(neighbor_keys, relay_hints, msg);
Ok(None)
Ok(())
}
}
}
/// Process any handle requests from other threads.
/// Returns the number of requests dispatched.
/// This method does not block.
fn dispatch_requests(&mut self) -> usize {
let mut to_remove = vec![];
let mut messages = vec![];
@@ -860,12 +817,15 @@ impl PeerNetwork {
for (i, dispatch_res) in responses {
match self.handles.get(i) {
Some(handle) => {
let send_res = handle.chan_out.send(dispatch_res);
let send_res = handle.chan_out.try_send(dispatch_res);
match send_res {
Ok(_) => {
num_dispatched += 1;
}
Err(_e) => {
Err(TrySendError::Full(dropped)) => {
warn!("P2P client channel {} is full; dropping '{:?}'", i, &dropped);
}
Err(TrySendError::Disconnected(_)) => {
// channel disconnected; remove
to_remove.push(i);
}
@@ -2896,7 +2856,7 @@ mod test {
&p2p.chain_view.burn_stable_consensus_hash,
StacksMessageType::Ping(PingData::new()));
let mut h = p2p.new_handle(1, 0);
let mut h = p2p.new_handle(1);
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:2100").unwrap();
@@ -2996,7 +2956,7 @@ mod test {
&p2p.chain_view.burn_stable_consensus_hash,
StacksMessageType::Ping(PingData::new()));
let mut h = p2p.new_handle(1, 0);
let mut h = p2p.new_handle(1);
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:2200").unwrap();