mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-05-16 16:00:48 +08:00
refactor: remove unneeded function args
This commit is contained in:
@@ -151,7 +151,11 @@ impl HttpPeer {
|
||||
return Err(net_error::AlreadyConnected(event_id, http_nk));
|
||||
}
|
||||
|
||||
let sock = NetworkState::connect(&addr)?;
|
||||
let sock = NetworkState::connect(
|
||||
&addr,
|
||||
network.connection_opts.socket_send_buffer_size,
|
||||
network.connection_opts.socket_recv_buffer_size,
|
||||
)?;
|
||||
let hint_event_id = network_state.next_event_id()?;
|
||||
let next_event_id =
|
||||
network_state.register(self.http_server_handle, hint_event_id, &sock)?;
|
||||
@@ -225,6 +229,9 @@ impl HttpPeer {
|
||||
outbound_url: Option<UrlString>,
|
||||
initial_request: Option<StacksHttpRequest>,
|
||||
) -> Result<(), net_error> {
|
||||
let send_buffer_size = node_state
|
||||
.with_node_state(|network, _, _, _, _| network.connection_opts.socket_send_buffer_size);
|
||||
|
||||
let client_addr = match socket.peer_addr() {
|
||||
Ok(addr) => addr,
|
||||
Err(e) => {
|
||||
@@ -255,6 +262,7 @@ impl HttpPeer {
|
||||
peer_host,
|
||||
&self.connection_opts,
|
||||
event_id,
|
||||
send_buffer_size,
|
||||
);
|
||||
|
||||
debug!(
|
||||
@@ -273,17 +281,9 @@ impl HttpPeer {
|
||||
}
|
||||
|
||||
// prime the socket
|
||||
let saturation_res =
|
||||
node_state.with_node_state(|_network, _sortdb, chainstate, mempool, _rpc_args| {
|
||||
HttpPeer::saturate_http_socket(&mut socket, &mut new_convo, mempool, chainstate)
|
||||
});
|
||||
|
||||
match saturation_res {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
let _ = network_state.deregister(event_id, &socket);
|
||||
return Err(e);
|
||||
}
|
||||
if let Err(e) = HttpPeer::saturate_http_socket(&mut socket, &mut new_convo) {
|
||||
let _ = network_state.deregister(event_id, &socket);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -353,12 +353,10 @@ impl HttpPeer {
|
||||
pub fn saturate_http_socket(
|
||||
client_sock: &mut mio::net::TcpStream,
|
||||
convo: &mut ConversationHttp,
|
||||
mempool: &MemPoolDB,
|
||||
chainstate: &mut StacksChainState,
|
||||
) -> Result<(), net_error> {
|
||||
// saturate the socket
|
||||
loop {
|
||||
let send_res = convo.send(client_sock, mempool, chainstate);
|
||||
let send_res = convo.send(client_sock);
|
||||
match send_res {
|
||||
Err(e) => {
|
||||
debug!("Failed to send data to socket {:?}: {:?}", &client_sock, &e);
|
||||
@@ -459,25 +457,12 @@ impl HttpPeer {
|
||||
)) {
|
||||
Ok(_) => {
|
||||
// prime the socket
|
||||
let saturation_res = node_state.with_node_state(
|
||||
|_network, _sortdb, chainstate, mempool, _rpc_args| {
|
||||
HttpPeer::saturate_http_socket(
|
||||
client_sock,
|
||||
convo,
|
||||
mempool,
|
||||
chainstate,
|
||||
)
|
||||
},
|
||||
);
|
||||
match saturation_res {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
"Failed to flush HTTP 400 to socket {:?}: {:?}",
|
||||
&client_sock, &e
|
||||
);
|
||||
convo_dead = true;
|
||||
}
|
||||
if let Err(e) = HttpPeer::saturate_http_socket(client_sock, convo) {
|
||||
debug!(
|
||||
"Failed to flush HTTP 400 to socket {:?}: {:?}",
|
||||
&client_sock, &e
|
||||
);
|
||||
convo_dead = true;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -519,19 +504,12 @@ impl HttpPeer {
|
||||
if !convo_dead {
|
||||
// (continue) sending out data in this conversation, if the conversation is still
|
||||
// ongoing
|
||||
let saturation_res =
|
||||
node_state.with_node_state(|_network, _sortdb, chainstate, mempool, _rpc_args| {
|
||||
HttpPeer::saturate_http_socket(client_sock, convo, mempool, chainstate)
|
||||
});
|
||||
match saturation_res {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
"Failed to send HTTP data to event {} (socket {:?}): {:?}",
|
||||
event_id, &client_sock, &e
|
||||
);
|
||||
convo_dead = true;
|
||||
}
|
||||
if let Err(e) = HttpPeer::saturate_http_socket(client_sock, convo) {
|
||||
debug!(
|
||||
"Failed to send HTTP data to event {} (socket {:?}): {:?}",
|
||||
event_id, &client_sock, &e
|
||||
);
|
||||
convo_dead = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -635,16 +613,12 @@ impl HttpPeer {
|
||||
/// Flush outgoing replies, but don't block.
|
||||
/// Drop broken handles.
|
||||
/// Return the list of conversation event IDs to close (i.e. they're broken, or the request is done)
|
||||
fn flush_conversations(&mut self, node_state: &mut StacksNodeState) -> Vec<usize> {
|
||||
fn flush_conversations(&mut self) -> Vec<usize> {
|
||||
let mut close = vec![];
|
||||
|
||||
// flush each outgoing conversation
|
||||
for (event_id, ref mut convo) in self.peers.iter_mut() {
|
||||
let flush_res =
|
||||
node_state.with_node_state(|_network, _sortdb, chainstate, mempool, _rpc_args| {
|
||||
convo.try_flush(mempool, chainstate)
|
||||
});
|
||||
if let Err(e) = flush_res {
|
||||
if let Err(e) = convo.try_flush() {
|
||||
info!("Broken HTTP connection {:?}: {:?}", convo, &e);
|
||||
close.push(*event_id);
|
||||
}
|
||||
@@ -684,7 +658,7 @@ impl HttpPeer {
|
||||
}
|
||||
|
||||
// move conversations along
|
||||
let close_events = self.flush_conversations(node_state);
|
||||
let close_events = self.flush_conversations();
|
||||
for close_event in close_events {
|
||||
debug!("Close HTTP connection on event {}", close_event);
|
||||
self.deregister_http(network_state, close_event);
|
||||
|
||||
Reference in New Issue
Block a user