mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-05-16 07:59:49 +08:00
feat: when connecting to an address, allow the caller to set the socket send/recv buffer sizes
This commit is contained in:
@@ -318,7 +318,11 @@ impl NetworkState {
|
||||
/// Connect to a remote peer, but don't register it with the poll handle.
|
||||
/// The underlying connect(2) is _asynchronous_, so the caller will need to register it with a
|
||||
/// poll handle and wait for it to be connected.
|
||||
pub fn connect(addr: &SocketAddr) -> Result<mio_net::TcpStream, net_error> {
|
||||
pub fn connect(
|
||||
addr: &SocketAddr,
|
||||
socket_send_buffer: u32,
|
||||
socket_recv_buffer: u32,
|
||||
) -> Result<mio_net::TcpStream, net_error> {
|
||||
let stream = mio_net::TcpStream::connect(addr).map_err(|_e| {
|
||||
test_debug!("Failed to convert to mio stream: {:?}", &_e);
|
||||
net_error::ConnectionError
|
||||
@@ -328,14 +332,14 @@ impl NetworkState {
|
||||
// Don't go crazy on TIME_WAIT states; have them all die after 5 seconds
|
||||
stream
|
||||
.set_linger(Some(time::Duration::from_millis(5000)))
|
||||
.map_err(|_e| {
|
||||
test_debug!("Failed to set SO_LINGER: {:?}", &_e);
|
||||
.map_err(|e| {
|
||||
warn!("Failed to set SO_LINGER: {:?}", &e);
|
||||
net_error::ConnectionError
|
||||
})?;
|
||||
|
||||
// Disable Nagle algorithm
|
||||
stream.set_nodelay(true).map_err(|_e| {
|
||||
test_debug!("Failed to set TCP_NODELAY: {:?}", &_e);
|
||||
warn!("Failed to set TCP_NODELAY: {:?}", &_e);
|
||||
net_error::ConnectionError
|
||||
})?;
|
||||
|
||||
@@ -343,8 +347,8 @@ impl NetworkState {
|
||||
// for a while. Linux default is 7200 seconds, so make sure we keep it here.
|
||||
stream
|
||||
.set_keepalive(Some(time::Duration::from_millis(7200 * 1000)))
|
||||
.map_err(|_e| {
|
||||
test_debug!("Failed to set TCP_KEEPALIVE and/or SO_KEEPALIVE: {:?}", &_e);
|
||||
.map_err(|e| {
|
||||
warn!("Failed to set TCP_KEEPALIVE and/or SO_KEEPALIVE: {:?}", &e);
|
||||
net_error::ConnectionError
|
||||
})?;
|
||||
|
||||
@@ -354,6 +358,26 @@ impl NetworkState {
|
||||
stream.set_send_buffer_size(32).unwrap();
|
||||
stream.set_recv_buffer_size(32).unwrap();
|
||||
}
|
||||
} else {
|
||||
stream
|
||||
.set_send_buffer_size(socket_send_buffer as usize)
|
||||
.map_err(|e| {
|
||||
warn!(
|
||||
"Failed to set socket write buffer size to {}: {:?}",
|
||||
socket_send_buffer, &e
|
||||
);
|
||||
net_error::ConnectionError
|
||||
})?;
|
||||
|
||||
stream
|
||||
.set_recv_buffer_size(socket_recv_buffer as usize)
|
||||
.map_err(|e| {
|
||||
warn!(
|
||||
"Failed to set socket read buffer size to {}: {:?}",
|
||||
socket_send_buffer, &e
|
||||
);
|
||||
net_error::ConnectionError
|
||||
})?;
|
||||
}
|
||||
|
||||
test_debug!("New socket connected to {:?}: {:?}", addr, &stream);
|
||||
@@ -514,7 +538,7 @@ mod test {
|
||||
let addr = format!("127.0.0.1:{}", &port)
|
||||
.parse::<SocketAddr>()
|
||||
.unwrap();
|
||||
let sock = NetworkState::connect(&addr).unwrap();
|
||||
let sock = NetworkState::connect(&addr, 4096, 4096).unwrap();
|
||||
|
||||
let event_id = ns.register(server_events[i], 1, &sock).unwrap();
|
||||
assert!(event_id != 0);
|
||||
@@ -545,7 +569,7 @@ mod test {
|
||||
let addr = format!("127.0.0.1:{}", &port)
|
||||
.parse::<SocketAddr>()
|
||||
.unwrap();
|
||||
let sock = NetworkState::connect(&addr).unwrap();
|
||||
let sock = NetworkState::connect(&addr, 4096, 4096).unwrap();
|
||||
|
||||
// can't use non-server events
|
||||
assert_eq!(
|
||||
@@ -568,7 +592,7 @@ mod test {
|
||||
.unwrap();
|
||||
event_ids.insert(server_event_id);
|
||||
|
||||
let sock = NetworkState::connect(&addr).unwrap();
|
||||
let sock = NetworkState::connect(&addr, 4096, 4096).unwrap();
|
||||
|
||||
// register 10 client events
|
||||
let event_id = ns.register(server_event_id, 11, &sock).unwrap();
|
||||
@@ -577,7 +601,7 @@ mod test {
|
||||
|
||||
// the 11th socket should fail
|
||||
let addr = format!("127.0.0.1:{}", port).parse::<SocketAddr>().unwrap();
|
||||
let sock = NetworkState::connect(&addr).unwrap();
|
||||
let sock = NetworkState::connect(&addr, 4096, 4096).unwrap();
|
||||
let res = ns.register(server_event_id, 11, &sock);
|
||||
assert_eq!(Err(net_error::TooManyPeers), res);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user