mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-05-31 00:01:56 +08:00
Store commands to the side in case init fails
Signed-off-by: Jacinta Ferrant <jacinta@trustmachines.co>
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
use std::collections::VecDeque;
|
||||
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
|
||||
// Copyright (C) 2020-2024 Stacks Open Internet Foundation
|
||||
//
|
||||
@@ -58,6 +59,8 @@ pub struct RunLoop {
|
||||
pub stacks_signers: HashMap<u64, Signer>,
|
||||
/// The state of the runloop
|
||||
pub state: State,
|
||||
/// The commands received thus far
|
||||
pub commands: VecDeque<RunLoopCommand>,
|
||||
}
|
||||
|
||||
impl From<GlobalConfig> for RunLoop {
|
||||
@@ -69,6 +72,7 @@ impl From<GlobalConfig> for RunLoop {
|
||||
stacks_client,
|
||||
stacks_signers: HashMap::with_capacity(2),
|
||||
state: State::Uninitialized,
|
||||
commands: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -228,16 +232,21 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
|
||||
cmd: Option<RunLoopCommand>,
|
||||
res: Sender<Vec<OperationResult>>,
|
||||
) -> Option<Vec<OperationResult>> {
|
||||
info!(
|
||||
"Running one pass for the signer. Current state: {:?}",
|
||||
debug!(
|
||||
"Running one pass for the signer. state={:?}, cmd={cmd:?}, event={event:?}",
|
||||
self.state
|
||||
);
|
||||
if let Some(cmd) = cmd {
|
||||
self.commands.push_back(cmd);
|
||||
}
|
||||
// TODO: queue events and process them potentially after initialization success (similar to commands)?
|
||||
let Ok(current_reward_cycle) = retry_with_exponential_backoff(|| {
|
||||
self.stacks_client
|
||||
.get_current_reward_cycle()
|
||||
.map_err(backoff::Error::transient)
|
||||
}) else {
|
||||
error!("Failed to retrieve current reward cycle. Ignoring event: {event:?}");
|
||||
error!("Failed to retrieve current reward cycle");
|
||||
warn!("Ignoring event: {event:?}");
|
||||
return None;
|
||||
};
|
||||
if let Err(e) = self.refresh_signers(current_reward_cycle) {
|
||||
@@ -249,9 +258,20 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
|
||||
}
|
||||
error!("Failed to refresh signers: {e}. Signer may have an outdated view of the network. Attempting to process event anyway.");
|
||||
}
|
||||
if let Some(command) = cmd {
|
||||
let reward_cycle = command.reward_cycle;
|
||||
if let Some(signer) = self.stacks_signers.get_mut(&(reward_cycle % 2)) {
|
||||
for signer in self.stacks_signers.values_mut() {
|
||||
if let Err(e) = signer.process_event(
|
||||
&self.stacks_client,
|
||||
event.as_ref(),
|
||||
res.clone(),
|
||||
current_reward_cycle,
|
||||
) {
|
||||
error!(
|
||||
"Signer #{} for reward cycle {} errored processing event: {e}",
|
||||
signer.signer_id, signer.reward_cycle
|
||||
);
|
||||
}
|
||||
if let Some(command) = self.commands.pop_front() {
|
||||
let reward_cycle = command.reward_cycle;
|
||||
if signer.reward_cycle != reward_cycle {
|
||||
warn!(
|
||||
"Signer #{}: not registered for reward cycle {reward_cycle}. Ignoring command: {command:?}", signer.signer_id
|
||||
@@ -268,23 +288,6 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
|
||||
);
|
||||
signer.commands.push_back(command.command);
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
"No signer registered for reward cycle {reward_cycle}. Ignoring command: {command:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
for signer in self.stacks_signers.values_mut() {
|
||||
if let Err(e) = signer.process_event(
|
||||
&self.stacks_client,
|
||||
event.as_ref(),
|
||||
res.clone(),
|
||||
current_reward_cycle,
|
||||
) {
|
||||
error!(
|
||||
"Signer #{} for reward cycle {} errored processing event: {e}",
|
||||
signer.signer_id, signer.reward_cycle
|
||||
);
|
||||
}
|
||||
// After processing event, run the next command for each signer
|
||||
signer.process_next_command(&self.stacks_client);
|
||||
|
||||
Reference in New Issue
Block a user