unified polling system for multiple server sockets. There is now a single list of pollable events for all server sockets, and we poll on all of them at once.

This commit is contained in:
Jude Nelson
2020-04-16 02:06:26 -04:00
parent b2439eb099
commit cd0626ad2d

View File

@@ -28,6 +28,7 @@ use util::db::DBConn;
use std::net;
use std::net::SocketAddr;
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;
use std::io;
use std::io::Read;
@@ -68,15 +69,43 @@ impl NetworkPollState {
}
}
pub struct NetworkState {
// state for a single network server
pub struct NetworkServerState {
addr: SocketAddr,
server_socket: mio_net::TcpListener,
server_event: mio::Token,
}
// state for the entire network
pub struct NetworkState {
poll: mio::Poll,
server: mio_net::TcpListener,
events: mio::Events,
event_capacity: usize,
servers: Vec<NetworkServerState>,
count: usize,
event_map: HashMap<usize, usize> // map socket events to their registered server socket (including server sockets)
}
impl NetworkState {
pub fn new(event_capacity: usize) -> Result<NetworkState, net_error> {
let poll = mio::Poll::new()
.map_err(|e| {
error!("Failed to initialize poller: {:?}", e);
net_error::BindError
})?;
let events = mio::Events::with_capacity(event_capacity);
Ok(NetworkState {
poll: poll,
events: events,
event_capacity: event_capacity,
servers: vec![],
count: 1,
event_map: HashMap::new()
})
}
fn bind_address(addr: &SocketAddr) -> Result<mio_net::TcpListener, net_error> {
if !cfg!(test) {
mio_net::TcpListener::bind(addr)
@@ -112,66 +141,87 @@ impl NetworkState {
}
}
pub fn bind(addr: &SocketAddr, capacity: usize) -> Result<NetworkState, net_error> {
/// Bind to the given socket address.
/// Returns the handle to the poll state, used to key network poll events.
pub fn bind(&mut self, addr: &SocketAddr) -> Result<usize, net_error> {
let server = NetworkState::bind_address(addr)?;
let poll = mio::Poll::new()
.map_err(|e| {
error!("Failed to initialize poller: {:?}", e);
net_error::BindError
})?;
let next_server_event = self.next_event_id();
let events = mio::Events::with_capacity(capacity);
poll.register(&server, SERVER, mio::Ready::readable(), mio::PollOpt::edge())
self.poll.register(&server, mio::Token(next_server_event), mio::Ready::readable(), mio::PollOpt::edge())
.map_err(|e| {
error!("Failed to register server socket: {:?}", &e);
net_error::BindError
})?;
Ok(NetworkState {
let network_server = NetworkServerState {
addr: addr.clone(),
poll: poll,
server: server,
events: events,
count: 1
})
}
server_socket: server,
server_event: mio::Token(next_server_event),
};
/// next event ID
pub fn next_event_id(&mut self) -> usize {
let ret = self.count;
self.count += 1;
ret
assert!(!self.event_map.contains_key(&next_server_event));
self.servers.push(network_server);
self.event_map.insert(next_server_event, 0); // server events always mapped to 0
Ok(next_server_event)
}
/// Register a socket for read/write notifications with this poller
pub fn register(&mut self, event_id: usize, sock: &mio_net::TcpStream) -> Result<(), net_error> {
pub fn register(&mut self, server_event_id: usize, event_id: usize, sock: &mio_net::TcpStream) -> Result<(), net_error> {
self.poll.register(sock, mio::Token(event_id), Ready::all(), PollOpt::edge())
.map_err(|e| {
error!("Failed to register socket: {:?}", &e);
net_error::RegisterError
})
})?;
// this is a server event
assert!(self.event_map.contains_key(&server_event_id));
assert_eq!(self.event_map.get(&server_event_id), Some(&0));
// this event ID is not in use
assert!(!self.event_map.contains_key(&event_id));
self.event_map.insert(event_id, server_event_id);
test_debug!("Register socket {:?} as event {} on server {}", sock, event_id, server_event_id);
Ok(())
}
/// Deregister a socket event
pub fn deregister(&mut self, sock: &mio_net::TcpStream) -> Result<(), net_error> {
pub fn deregister(&mut self, event_id: usize, sock: &mio_net::TcpStream) -> Result<(), net_error> {
self.poll.deregister(sock)
.map_err(|e| {
error!("Failed to deregister socket: {:?}", &e);
error!("Failed to deregister socket {}: {:?}", event_id, &e);
net_error::RegisterError
})?;
sock.shutdown(Shutdown::Both)
.map_err(|_e| net_error::SocketError)?;
test_debug!("Socket deregistered: {:?}", sock);
self.event_map.remove(&event_id);
test_debug!("Socket deregistered: {}, {:?}", event_id, sock);
Ok(())
}
fn make_next_event_id(&self, cur_count: usize, in_use: &HashSet<usize>) -> usize {
let mut ret = cur_count;
while self.event_map.contains_key(&ret) || in_use.contains(&ret) {
ret = (ret + 1) % self.event_capacity;
}
ret
}
/// next event ID
pub fn next_event_id(&mut self) -> usize {
let ret = self.make_next_event_id(self.count, &HashSet::new());
self.count = (ret + 1) % self.event_capacity;
ret
}
/// Connect to a remote peer, but don't register it with the poll handle.
/// The underlying connect(2) is _asynchronous_, so the caller will need to register it with a
/// poll handle and wait for it to be connected.
pub fn connect(&mut self, addr: &SocketAddr) -> Result<mio_net::TcpStream, net_error> {
pub fn connect(addr: &SocketAddr) -> Result<mio_net::TcpStream, net_error> {
let stream = mio_net::TcpStream::connect(addr)
.map_err(|_e| {
test_debug!("Failed to convert to mio stream: {:?}", &_e);
@@ -205,21 +255,38 @@ impl NetworkState {
Ok(stream)
}
/// Poll socket states
pub fn poll(&mut self, timeout: u64) -> Result<NetworkPollState, net_error> {
/// Poll all server sockets.
/// Returns a map between network server handles (returned by bind()) and their new polling state
pub fn poll(&mut self, timeout: u64) -> Result<HashMap<usize, NetworkPollState>, net_error> {
self.events.clear();
self.poll.poll(&mut self.events, Some(Duration::from_millis(timeout)))
.map_err(|e| {
error!("Failed to poll: {:?}", &e);
net_error::PollError
})?;
let mut poll_states = HashMap::new();
for server in self.servers.iter() {
// pre-populate with server tokens
let server_event_id = usize::from(server.server_event);
poll_states.insert(server_event_id, NetworkPollState::new());
}
let mut new_events = HashSet::new();
let mut poll_state = NetworkPollState::new();
for event in &self.events {
match event.token() {
SERVER => {
let token = event.token();
let mut is_server_event = false;
for server in self.servers.iter() {
// server token?
if token == server.server_event {
// new inbound connection(s)
is_server_event = true;
let poll_state = poll_states.get_mut(&usize::from(token)).expect(&format!("BUG: FATAL: no poll state registered for server {}", usize::from(token)));
loop {
let (client_sock, _client_addr) = match self.server.accept() {
let (client_sock, _client_addr) = match server.server_socket.accept() {
Ok((client_sock, client_addr)) => (client_sock, client_addr),
Err(e) => {
match e.kind() {
@@ -233,18 +300,41 @@ impl NetworkState {
}
};
test_debug!("New socket accepted from {:?} (event {}): {:?}", &_client_addr, self.count, &client_sock);
poll_state.new.insert(self.count, client_sock);
self.count += 1;
let next_event_id = self.make_next_event_id(self.count, &new_events);
self.count = (next_event_id + 1) % self.event_capacity;
new_events.insert(next_event_id);
test_debug!("New socket accepted from {:?} (event {}) on server {:?}: {:?}", &_client_addr, next_event_id, &server.server_socket, &client_sock);
poll_state.new.insert(next_event_id, client_sock);
}
break;
}
}
if is_server_event {
continue;
}
// event for a client of one of our servers. which one?
let event_id = usize::from(token);
match self.event_map.get(&event_id) {
Some(server_event_id) => {
if let Some(poll_state) = poll_states.get_mut(server_event_id) {
test_debug!("Wakeup socket event {} on server {}", event_id, server_event_id);
poll_state.ready.push(event_id);
}
else {
panic!("Unknown server event ID {}", server_event_id);
}
},
mio::Token(event_id) => {
// I/O available
poll_state.ready.push(event_id);
None => {
panic!("Surreptitious readiness event {}", event_id);
}
}
}
Ok(poll_state)
Ok(poll_states)
}
}