mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-30 12:42:10 +08:00
Subscribe signer to new Burn block events
Signed-off-by: Jacinta Ferrant <jacinta@trustmachines.co>
This commit is contained in:
@@ -73,6 +73,8 @@ pub enum SignerEvent {
|
|||||||
BlockValidationResponse(BlockValidateResponse),
|
BlockValidationResponse(BlockValidateResponse),
|
||||||
/// Status endpoint request
|
/// Status endpoint request
|
||||||
StatusCheck,
|
StatusCheck,
|
||||||
|
/// A new burn block event was received
|
||||||
|
NewBurnBlock,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StacksMessageCodec for BlockProposalSigners {
|
impl StacksMessageCodec for BlockProposalSigners {
|
||||||
@@ -281,6 +283,8 @@ impl EventReceiver for SignerEventReceiver {
|
|||||||
process_stackerdb_event(event_receiver.local_addr, request, is_mainnet)
|
process_stackerdb_event(event_receiver.local_addr, request, is_mainnet)
|
||||||
} else if request.url() == "/proposal_response" {
|
} else if request.url() == "/proposal_response" {
|
||||||
process_proposal_response(request)
|
process_proposal_response(request)
|
||||||
|
} else if request.url() == "/new_burn_block" {
|
||||||
|
process_new_burn_block_event(request)
|
||||||
} else {
|
} else {
|
||||||
let url = request.url().to_string();
|
let url = request.url().to_string();
|
||||||
|
|
||||||
@@ -438,6 +442,16 @@ fn process_proposal_response(mut request: HttpRequest) -> Result<SignerEvent, Ev
|
|||||||
Ok(SignerEvent::BlockValidationResponse(event))
|
Ok(SignerEvent::BlockValidationResponse(event))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Process a new burn block event from the node
|
||||||
|
fn process_new_burn_block_event(mut request: HttpRequest) -> Result<SignerEvent, EventError> {
|
||||||
|
debug!("Got burn_block event");
|
||||||
|
let event = SignerEvent::NewBurnBlock;
|
||||||
|
if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
|
||||||
|
error!("Failed to respond to request: {:?}", &e);
|
||||||
|
}
|
||||||
|
Ok(event)
|
||||||
|
}
|
||||||
|
|
||||||
fn get_signers_db_signer_set_message_id(name: &str) -> Option<(u32, u32)> {
|
fn get_signers_db_signer_set_message_id(name: &str) -> Option<(u32, u32)> {
|
||||||
// Splitting the string by '-'
|
// Splitting the string by '-'
|
||||||
let parts: Vec<&str> = name.split('-').collect();
|
let parts: Vec<&str> = name.split('-').collect();
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use clarity::vm::errors::Error as ClarityError;
|
use clarity::vm::errors::Error as ClarityError;
|
||||||
use clarity::vm::types::serialization::SerializationError;
|
use clarity::vm::types::serialization::SerializationError;
|
||||||
use libsigner::RPCError;
|
|
||||||
use libstackerdb::Error as StackerDBError;
|
use libstackerdb::Error as StackerDBError;
|
||||||
use slog::slog_debug;
|
use slog::slog_debug;
|
||||||
pub use stackerdb::*;
|
pub use stackerdb::*;
|
||||||
@@ -48,9 +47,6 @@ pub enum ClientError {
|
|||||||
/// Failed to sign stacker-db chunk
|
/// Failed to sign stacker-db chunk
|
||||||
#[error("Failed to sign stacker-db chunk: {0}")]
|
#[error("Failed to sign stacker-db chunk: {0}")]
|
||||||
FailToSign(#[from] StackerDBError),
|
FailToSign(#[from] StackerDBError),
|
||||||
/// Failed to write to stacker-db due to RPC error
|
|
||||||
#[error("Failed to write to stacker-db instance: {0}")]
|
|
||||||
PutChunkFailed(#[from] RPCError),
|
|
||||||
/// Stacker-db instance rejected the chunk
|
/// Stacker-db instance rejected the chunk
|
||||||
#[error("Stacker-db rejected the chunk. Reason: {0}")]
|
#[error("Stacker-db rejected the chunk. Reason: {0}")]
|
||||||
PutChunkRejected(String),
|
PutChunkRejected(String),
|
||||||
@@ -72,33 +68,18 @@ pub enum ClientError {
|
|||||||
/// Failed to parse a Clarity value
|
/// Failed to parse a Clarity value
|
||||||
#[error("Received a malformed clarity value: {0}")]
|
#[error("Received a malformed clarity value: {0}")]
|
||||||
MalformedClarityValue(String),
|
MalformedClarityValue(String),
|
||||||
/// Invalid Clarity Name
|
|
||||||
#[error("Invalid Clarity Name: {0}")]
|
|
||||||
InvalidClarityName(String),
|
|
||||||
/// Backoff retry timeout
|
/// Backoff retry timeout
|
||||||
#[error("Backoff retry timeout occurred. Stacks node may be down.")]
|
#[error("Backoff retry timeout occurred. Stacks node may be down.")]
|
||||||
RetryTimeout,
|
RetryTimeout,
|
||||||
/// Not connected
|
/// Not connected
|
||||||
#[error("Not connected")]
|
#[error("Not connected")]
|
||||||
NotConnected,
|
NotConnected,
|
||||||
/// Invalid signing key
|
|
||||||
#[error("Signing key not represented in the list of signers")]
|
|
||||||
InvalidSigningKey,
|
|
||||||
/// Clarity interpreter error
|
/// Clarity interpreter error
|
||||||
#[error("Clarity interpreter error: {0}")]
|
#[error("Clarity interpreter error: {0}")]
|
||||||
ClarityError(#[from] ClarityError),
|
ClarityError(#[from] ClarityError),
|
||||||
/// Our stacks address does not belong to a registered signer
|
|
||||||
#[error("Our stacks address does not belong to a registered signer")]
|
|
||||||
NotRegistered,
|
|
||||||
/// Reward set not yet calculated for the given reward cycle
|
|
||||||
#[error("Reward set not yet calculated for reward cycle: {0}")]
|
|
||||||
RewardSetNotYetCalculated(u64),
|
|
||||||
/// Malformed reward set
|
/// Malformed reward set
|
||||||
#[error("Malformed contract data: {0}")]
|
#[error("Malformed contract data: {0}")]
|
||||||
MalformedContractData(String),
|
MalformedContractData(String),
|
||||||
/// No reward set exists for the given reward cycle
|
|
||||||
#[error("No reward set exists for reward cycle {0}")]
|
|
||||||
NoRewardSet(u64),
|
|
||||||
/// Stacks node does not support a feature we need
|
/// Stacks node does not support a feature we need
|
||||||
#[error("Stacks node does not support a required feature: {0}")]
|
#[error("Stacks node does not support a required feature: {0}")]
|
||||||
UnsupportedStacksFeature(String),
|
UnsupportedStacksFeature(String),
|
||||||
|
|||||||
@@ -44,12 +44,14 @@ pub struct RunLoopCommand {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// The runloop state
|
/// The runloop state
|
||||||
#[derive(PartialEq, Debug)]
|
#[derive(PartialEq, Debug, Clone, Copy)]
|
||||||
pub enum State {
|
pub enum State {
|
||||||
/// The runloop is uninitialized
|
/// The runloop is uninitialized
|
||||||
Uninitialized,
|
Uninitialized,
|
||||||
/// The runloop is initialized
|
/// The runloop has no registered signers
|
||||||
Initialized,
|
NoRegisteredSigners,
|
||||||
|
/// The runloop has registered signers
|
||||||
|
RegisteredSigners,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The runloop for the stacks signer
|
/// The runloop for the stacks signer
|
||||||
@@ -262,9 +264,9 @@ impl RunLoop {
|
|||||||
signer.next_signer_slot_ids = new_signer_config.signer_slot_ids.clone();
|
signer.next_signer_slot_ids = new_signer_config.signer_slot_ids.clone();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.stacks_signers
|
let new_signer = Signer::from(new_signer_config);
|
||||||
.insert(reward_index, Signer::from(new_signer_config));
|
info!("{new_signer} initialized.");
|
||||||
debug!("Reward cycle #{reward_cycle} Signer #{signer_id} initialized.");
|
self.stacks_signers.insert(reward_index, new_signer);
|
||||||
} else {
|
} else {
|
||||||
// TODO: Update `current` here once the signer binary is tracking its own latest burnchain/stacks views.
|
// TODO: Update `current` here once the signer binary is tracking its own latest burnchain/stacks views.
|
||||||
if current {
|
if current {
|
||||||
@@ -277,7 +279,6 @@ impl RunLoop {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Refresh the signer configuration by retrieving the necessary information from the stacks node
|
/// Refresh the signer configuration by retrieving the necessary information from the stacks node
|
||||||
/// Note: this will trigger DKG if required
|
|
||||||
fn refresh_signers(&mut self, current_reward_cycle: u64) -> Result<(), ClientError> {
|
fn refresh_signers(&mut self, current_reward_cycle: u64) -> Result<(), ClientError> {
|
||||||
let next_reward_cycle = current_reward_cycle.saturating_add(1);
|
let next_reward_cycle = current_reward_cycle.saturating_add(1);
|
||||||
self.refresh_signer_config(current_reward_cycle, true);
|
self.refresh_signer_config(current_reward_cycle, true);
|
||||||
@@ -307,28 +308,15 @@ impl RunLoop {
|
|||||||
signer.coordinator.state = CoordinatorState::Idle;
|
signer.coordinator.state = CoordinatorState::Idle;
|
||||||
signer.state = SignerState::Idle;
|
signer.state = SignerState::Idle;
|
||||||
}
|
}
|
||||||
if signer.approved_aggregate_public_key.is_none() {
|
|
||||||
retry_with_exponential_backoff(|| {
|
|
||||||
signer
|
|
||||||
.update_dkg(&self.stacks_client)
|
|
||||||
.map_err(backoff::Error::transient)
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for i in to_delete.into_iter() {
|
for idx in to_delete {
|
||||||
if let Some(signer) = self.stacks_signers.remove(&i) {
|
self.stacks_signers.remove(&idx);
|
||||||
info!("{signer}: Tenure has completed. Removing signer from runloop.",);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if self.stacks_signers.is_empty() {
|
self.state = if self.stacks_signers.is_empty() {
|
||||||
info!("Signer is not registered for the current reward cycle ({current_reward_cycle}) or next reward cycle ({next_reward_cycle}). Waiting for confirmed registration...");
|
State::NoRegisteredSigners
|
||||||
self.state = State::Uninitialized;
|
} else {
|
||||||
return Err(ClientError::NotRegistered);
|
State::RegisteredSigners
|
||||||
}
|
};
|
||||||
if self.state != State::Initialized {
|
|
||||||
info!("Signer runloop successfully initialized!");
|
|
||||||
}
|
|
||||||
self.state = State::Initialized;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -362,19 +350,39 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
|
|||||||
.map_err(backoff::Error::transient)
|
.map_err(backoff::Error::transient)
|
||||||
}) else {
|
}) else {
|
||||||
error!("Failed to retrieve current reward cycle");
|
error!("Failed to retrieve current reward cycle");
|
||||||
warn!("Ignoring event: {event:?}");
|
|
||||||
return None;
|
return None;
|
||||||
};
|
};
|
||||||
if let Err(e) = self.refresh_signers(current_reward_cycle) {
|
if self.state == State::Uninitialized || event == Some(SignerEvent::NewBurnBlock) {
|
||||||
|
let old_state = self.state;
|
||||||
if self.state == State::Uninitialized {
|
if self.state == State::Uninitialized {
|
||||||
// If we were never actually initialized, we cannot process anything. Just return.
|
info!("Initializing signer...");
|
||||||
warn!("Failed to initialize signers. Are you sure this signer is correctly registered for the current or next reward cycle?");
|
} else {
|
||||||
warn!("Ignoring event: {event:?}");
|
info!("New burn block event received. Refreshing signer state...");
|
||||||
|
}
|
||||||
|
if let Err(e) = self.refresh_signers(current_reward_cycle) {
|
||||||
|
error!("Failed to refresh signers: {e}. Signer may have an outdated view of the network");
|
||||||
|
}
|
||||||
|
if self.state == State::NoRegisteredSigners {
|
||||||
|
let next_reward_cycle = current_reward_cycle.saturating_add(1);
|
||||||
|
info!("Signer is not registered for the current reward cycle ({current_reward_cycle}) or next reward cycle ({next_reward_cycle}). Waiting for confirmed registration...");
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
error!("Failed to refresh signers: {e}. Signer may have an outdated view of the network. Attempting to process event anyway.");
|
if old_state == State::Uninitialized {
|
||||||
|
info!("Signer successfully initialized.");
|
||||||
|
} else {
|
||||||
|
info!("Signer state successfully refreshed.");
|
||||||
|
};
|
||||||
}
|
}
|
||||||
for signer in self.stacks_signers.values_mut() {
|
for signer in self.stacks_signers.values_mut() {
|
||||||
|
if signer.approved_aggregate_public_key.is_none() {
|
||||||
|
if let Err(e) = retry_with_exponential_backoff(|| {
|
||||||
|
signer
|
||||||
|
.update_dkg(&self.stacks_client)
|
||||||
|
.map_err(backoff::Error::transient)
|
||||||
|
}) {
|
||||||
|
error!("{signer}: failed to update DKG: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
if signer.state == SignerState::TenureCompleted {
|
if signer.state == SignerState::TenureCompleted {
|
||||||
warn!("{signer}: Signer's tenure has completed. This signer should have been cleaned up during refresh.");
|
warn!("{signer}: Signer's tenure has completed. This signer should have been cleaned up during refresh.");
|
||||||
continue;
|
continue;
|
||||||
@@ -383,12 +391,13 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
|
|||||||
Some(SignerEvent::BlockValidationResponse(_)) => Some(current_reward_cycle % 2),
|
Some(SignerEvent::BlockValidationResponse(_)) => Some(current_reward_cycle % 2),
|
||||||
// Block proposal events do have reward cycles, but each proposal has its own cycle,
|
// Block proposal events do have reward cycles, but each proposal has its own cycle,
|
||||||
// and the vec could be heterogenous, so, don't differentiate.
|
// and the vec could be heterogenous, so, don't differentiate.
|
||||||
Some(SignerEvent::ProposedBlocks(_)) => None,
|
Some(SignerEvent::ProposedBlocks(_))
|
||||||
|
| Some(SignerEvent::NewBurnBlock)
|
||||||
|
| Some(SignerEvent::StatusCheck)
|
||||||
|
| None => None,
|
||||||
Some(SignerEvent::SignerMessages(msg_parity, ..)) => {
|
Some(SignerEvent::SignerMessages(msg_parity, ..)) => {
|
||||||
Some(u64::from(msg_parity) % 2)
|
Some(u64::from(msg_parity) % 2)
|
||||||
}
|
}
|
||||||
Some(SignerEvent::StatusCheck) => None,
|
|
||||||
None => None,
|
|
||||||
};
|
};
|
||||||
let other_signer_parity = (signer.reward_cycle + 1) % 2;
|
let other_signer_parity = (signer.reward_cycle + 1) % 2;
|
||||||
if event_parity == Some(other_signer_parity) {
|
if event_parity == Some(other_signer_parity) {
|
||||||
|
|||||||
@@ -1123,6 +1123,7 @@ impl Signer {
|
|||||||
/// Update the DKG for the provided signer info, triggering it if required
|
/// Update the DKG for the provided signer info, triggering it if required
|
||||||
pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
|
pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
|
||||||
let reward_cycle = self.reward_cycle;
|
let reward_cycle = self.reward_cycle;
|
||||||
|
let old_dkg = self.approved_aggregate_public_key;
|
||||||
self.approved_aggregate_public_key =
|
self.approved_aggregate_public_key =
|
||||||
stacks_client.get_approved_aggregate_key(reward_cycle)?;
|
stacks_client.get_approved_aggregate_key(reward_cycle)?;
|
||||||
if self.approved_aggregate_public_key.is_some() {
|
if self.approved_aggregate_public_key.is_some() {
|
||||||
@@ -1131,11 +1132,12 @@ impl Signer {
|
|||||||
// then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate.
|
// then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate.
|
||||||
self.coordinator
|
self.coordinator
|
||||||
.set_aggregate_public_key(self.approved_aggregate_public_key);
|
.set_aggregate_public_key(self.approved_aggregate_public_key);
|
||||||
// We have an approved aggregate public key. Do nothing further
|
if old_dkg != self.approved_aggregate_public_key {
|
||||||
debug!(
|
debug!(
|
||||||
"{self}: Have updated DKG value to {:?}.",
|
"{self}: updated DKG value to {:?}.",
|
||||||
self.approved_aggregate_public_key
|
self.approved_aggregate_public_key
|
||||||
);
|
);
|
||||||
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
let coordinator_id = self.coordinator_selector.get_coordinator().0;
|
let coordinator_id = self.coordinator_selector.get_coordinator().0;
|
||||||
@@ -1225,6 +1227,9 @@ impl Signer {
|
|||||||
Some(SignerEvent::StatusCheck) => {
|
Some(SignerEvent::StatusCheck) => {
|
||||||
debug!("{self}: Received a status check event.")
|
debug!("{self}: Received a status check event.")
|
||||||
}
|
}
|
||||||
|
Some(SignerEvent::NewBurnBlock) => {
|
||||||
|
// Already handled this case in the main loop
|
||||||
|
}
|
||||||
None => {
|
None => {
|
||||||
// No event. Do nothing.
|
// No event. Do nothing.
|
||||||
debug!("{self}: No event received")
|
debug!("{self}: No event received")
|
||||||
|
|||||||
@@ -804,7 +804,11 @@ fn setup_stx_btc_node(
|
|||||||
|
|
||||||
naka_conf.events_observers.insert(EventObserverConfig {
|
naka_conf.events_observers.insert(EventObserverConfig {
|
||||||
endpoint: format!("{}", signer_config.endpoint),
|
endpoint: format!("{}", signer_config.endpoint),
|
||||||
events_keys: vec![EventKeyType::StackerDBChunks, EventKeyType::BlockProposal],
|
events_keys: vec![
|
||||||
|
EventKeyType::StackerDBChunks,
|
||||||
|
EventKeyType::BlockProposal,
|
||||||
|
EventKeyType::BurnchainBlocks,
|
||||||
|
],
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user