From cd0626ad2dcecb421621b4aeff3dfc0fa0dfd660 Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Thu, 16 Apr 2020 02:06:26 -0400 Subject: [PATCH] unified polling system for multiple server sockets. There is now a single list of pollable events for all server sockets, and we poll on all of them at once. --- src/net/poll.rs | 174 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 132 insertions(+), 42 deletions(-) diff --git a/src/net/poll.rs b/src/net/poll.rs index 1c08d9cdd..2afa5817d 100644 --- a/src/net/poll.rs +++ b/src/net/poll.rs @@ -28,6 +28,7 @@ use util::db::DBConn; use std::net; use std::net::SocketAddr; use std::collections::HashMap; +use std::collections::HashSet; use std::time::Duration; use std::io; use std::io::Read; @@ -68,15 +69,43 @@ impl NetworkPollState { } } -pub struct NetworkState { +// state for a single network server +pub struct NetworkServerState { addr: SocketAddr, + server_socket: mio_net::TcpListener, + server_event: mio::Token, +} + +// state for the entire network +pub struct NetworkState { poll: mio::Poll, - server: mio_net::TcpListener, events: mio::Events, + event_capacity: usize, + servers: Vec, count: usize, + event_map: HashMap // map socket events to their registered server socket (including server sockets) } impl NetworkState { + pub fn new(event_capacity: usize) -> Result { + let poll = mio::Poll::new() + .map_err(|e| { + error!("Failed to initialize poller: {:?}", e); + net_error::BindError + })?; + + let events = mio::Events::with_capacity(event_capacity); + + Ok(NetworkState { + poll: poll, + events: events, + event_capacity: event_capacity, + servers: vec![], + count: 1, + event_map: HashMap::new() + }) + } + fn bind_address(addr: &SocketAddr) -> Result { if !cfg!(test) { mio_net::TcpListener::bind(addr) @@ -112,66 +141,87 @@ impl NetworkState { } } - pub fn bind(addr: &SocketAddr, capacity: usize) -> Result { + /// Bind to the given socket address. + /// Returns the handle to the poll state, used to key network poll events. + pub fn bind(&mut self, addr: &SocketAddr) -> Result { let server = NetworkState::bind_address(addr)?; - let poll = mio::Poll::new() - .map_err(|e| { - error!("Failed to initialize poller: {:?}", e); - net_error::BindError - })?; + let next_server_event = self.next_event_id(); - let events = mio::Events::with_capacity(capacity); - - poll.register(&server, SERVER, mio::Ready::readable(), mio::PollOpt::edge()) + self.poll.register(&server, mio::Token(next_server_event), mio::Ready::readable(), mio::PollOpt::edge()) .map_err(|e| { error!("Failed to register server socket: {:?}", &e); net_error::BindError })?; - Ok(NetworkState { + let network_server = NetworkServerState { addr: addr.clone(), - poll: poll, - server: server, - events: events, - count: 1 - }) - } + server_socket: server, + server_event: mio::Token(next_server_event), + }; - /// next event ID - pub fn next_event_id(&mut self) -> usize { - let ret = self.count; - self.count += 1; - ret + assert!(!self.event_map.contains_key(&next_server_event)); + + self.servers.push(network_server); + self.event_map.insert(next_server_event, 0); // server events always mapped to 0 + + Ok(next_server_event) } /// Register a socket for read/write notifications with this poller - pub fn register(&mut self, event_id: usize, sock: &mio_net::TcpStream) -> Result<(), net_error> { + pub fn register(&mut self, server_event_id: usize, event_id: usize, sock: &mio_net::TcpStream) -> Result<(), net_error> { self.poll.register(sock, mio::Token(event_id), Ready::all(), PollOpt::edge()) .map_err(|e| { error!("Failed to register socket: {:?}", &e); net_error::RegisterError - }) + })?; + + // this is a server event + assert!(self.event_map.contains_key(&server_event_id)); + assert_eq!(self.event_map.get(&server_event_id), Some(&0)); + + // this event ID is not in use + assert!(!self.event_map.contains_key(&event_id)); + + self.event_map.insert(event_id, server_event_id); + test_debug!("Register socket {:?} as event {} on server {}", sock, event_id, server_event_id); + Ok(()) } /// Deregister a socket event - pub fn deregister(&mut self, sock: &mio_net::TcpStream) -> Result<(), net_error> { + pub fn deregister(&mut self, event_id: usize, sock: &mio_net::TcpStream) -> Result<(), net_error> { self.poll.deregister(sock) .map_err(|e| { - error!("Failed to deregister socket: {:?}", &e); + error!("Failed to deregister socket {}: {:?}", event_id, &e); net_error::RegisterError })?; sock.shutdown(Shutdown::Both) .map_err(|_e| net_error::SocketError)?; - test_debug!("Socket deregistered: {:?}", sock); + self.event_map.remove(&event_id); + test_debug!("Socket deregistered: {}, {:?}", event_id, sock); Ok(()) } + fn make_next_event_id(&self, cur_count: usize, in_use: &HashSet) -> usize { + let mut ret = cur_count; + while self.event_map.contains_key(&ret) || in_use.contains(&ret) { + ret = (ret + 1) % self.event_capacity; + } + ret + } + + /// next event ID + pub fn next_event_id(&mut self) -> usize { + let ret = self.make_next_event_id(self.count, &HashSet::new()); + self.count = (ret + 1) % self.event_capacity; + ret + } + /// Connect to a remote peer, but don't register it with the poll handle. /// The underlying connect(2) is _asynchronous_, so the caller will need to register it with a /// poll handle and wait for it to be connected. - pub fn connect(&mut self, addr: &SocketAddr) -> Result { + pub fn connect(addr: &SocketAddr) -> Result { let stream = mio_net::TcpStream::connect(addr) .map_err(|_e| { test_debug!("Failed to convert to mio stream: {:?}", &_e); @@ -205,21 +255,38 @@ impl NetworkState { Ok(stream) } - /// Poll socket states - pub fn poll(&mut self, timeout: u64) -> Result { + /// Poll all server sockets. + /// Returns a map between network server handles (returned by bind()) and their new polling state + pub fn poll(&mut self, timeout: u64) -> Result, net_error> { + self.events.clear(); self.poll.poll(&mut self.events, Some(Duration::from_millis(timeout))) .map_err(|e| { error!("Failed to poll: {:?}", &e); net_error::PollError })?; + + let mut poll_states = HashMap::new(); + for server in self.servers.iter() { + // pre-populate with server tokens + let server_event_id = usize::from(server.server_event); + poll_states.insert(server_event_id, NetworkPollState::new()); + } + + let mut new_events = HashSet::new(); - let mut poll_state = NetworkPollState::new(); for event in &self.events { - match event.token() { - SERVER => { + let token = event.token(); + let mut is_server_event = false; + + for server in self.servers.iter() { + // server token? + if token == server.server_event { // new inbound connection(s) + is_server_event = true; + let poll_state = poll_states.get_mut(&usize::from(token)).expect(&format!("BUG: FATAL: no poll state registered for server {}", usize::from(token))); + loop { - let (client_sock, _client_addr) = match self.server.accept() { + let (client_sock, _client_addr) = match server.server_socket.accept() { Ok((client_sock, client_addr)) => (client_sock, client_addr), Err(e) => { match e.kind() { @@ -233,18 +300,41 @@ impl NetworkState { } }; - test_debug!("New socket accepted from {:?} (event {}): {:?}", &_client_addr, self.count, &client_sock); - poll_state.new.insert(self.count, client_sock); - self.count += 1; + let next_event_id = self.make_next_event_id(self.count, &new_events); + self.count = (next_event_id + 1) % self.event_capacity; + + new_events.insert(next_event_id); + + test_debug!("New socket accepted from {:?} (event {}) on server {:?}: {:?}", &_client_addr, next_event_id, &server.server_socket, &client_sock); + poll_state.new.insert(next_event_id, client_sock); + } + + break; + } + } + + if is_server_event { + continue; + } + + // event for a client of one of our servers. which one? + let event_id = usize::from(token); + match self.event_map.get(&event_id) { + Some(server_event_id) => { + if let Some(poll_state) = poll_states.get_mut(server_event_id) { + test_debug!("Wakeup socket event {} on server {}", event_id, server_event_id); + poll_state.ready.push(event_id); + } + else { + panic!("Unknown server event ID {}", server_event_id); } }, - mio::Token(event_id) => { - // I/O available - poll_state.ready.push(event_id); + None => { + panic!("Surreptitious readiness event {}", event_id); } } } - Ok(poll_state) + Ok(poll_states) } }