mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-01-13 08:40:45 +08:00
Merge branch 'develop' into chore/rust-scenario-test
This commit is contained in:
@@ -383,10 +383,9 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
|
||||
if event_parity == Some(other_signer_parity) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if signer.approved_aggregate_public_key.is_none() {
|
||||
if let Err(e) = signer.update_dkg(&self.stacks_client) {
|
||||
error!("{signer}: failed to update DKG: {e}");
|
||||
if let Err(e) = signer.refresh_dkg(&self.stacks_client) {
|
||||
error!("{signer}: failed to refresh DKG: {e}");
|
||||
}
|
||||
}
|
||||
signer.refresh_coordinator();
|
||||
|
||||
@@ -21,6 +21,7 @@ use std::time::Instant;
|
||||
use blockstack_lib::chainstate::burn::ConsensusHashExtensions;
|
||||
use blockstack_lib::chainstate::nakamoto::signer_set::NakamotoSigners;
|
||||
use blockstack_lib::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockVote};
|
||||
use blockstack_lib::chainstate::stacks::boot::SIGNERS_VOTING_FUNCTION_NAME;
|
||||
use blockstack_lib::chainstate::stacks::StacksTransaction;
|
||||
use blockstack_lib::net::api::postblock_proposal::BlockValidateResponse;
|
||||
use hashbrown::HashSet;
|
||||
@@ -123,13 +124,22 @@ pub enum Command {
|
||||
},
|
||||
}
|
||||
|
||||
/// The specific operations that a signer can perform
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
pub enum Operation {
|
||||
/// A DKG operation
|
||||
Dkg,
|
||||
/// A Sign operation
|
||||
Sign,
|
||||
}
|
||||
|
||||
/// The Signer state
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
pub enum State {
|
||||
/// The signer is idle, waiting for messages and commands
|
||||
Idle,
|
||||
/// The signer is executing a DKG or Sign round
|
||||
OperationInProgress,
|
||||
OperationInProgress(Operation),
|
||||
}
|
||||
|
||||
/// The stacks signer registered for the reward cycle
|
||||
@@ -343,8 +353,8 @@ impl Signer {
|
||||
}
|
||||
|
||||
/// Update operation
|
||||
fn update_operation(&mut self) {
|
||||
self.state = State::OperationInProgress;
|
||||
fn update_operation(&mut self, operation: Operation) {
|
||||
self.state = State::OperationInProgress(operation);
|
||||
self.coordinator_selector.last_message_time = Some(Instant::now());
|
||||
}
|
||||
|
||||
@@ -374,6 +384,7 @@ impl Signer {
|
||||
Ok(msg) => {
|
||||
let ack = self.stackerdb.send_message_with_retry(msg.into());
|
||||
debug!("{self}: ACK: {ack:?}",);
|
||||
self.update_operation(Operation::Dkg);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{self}: Failed to start DKG: {e:?}",);
|
||||
@@ -419,6 +430,7 @@ impl Signer {
|
||||
.unwrap_or_else(|e| {
|
||||
error!("{self}: Failed to insert block in DB: {e:?}");
|
||||
});
|
||||
self.update_operation(Operation::Sign);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{self}: Failed to start signing block: {e:?}",);
|
||||
@@ -427,7 +439,6 @@ impl Signer {
|
||||
}
|
||||
}
|
||||
}
|
||||
self.update_operation();
|
||||
}
|
||||
|
||||
/// Attempt to process the next command in the queue, and update state accordingly
|
||||
@@ -460,10 +471,10 @@ impl Signer {
|
||||
.expect("BUG: Already asserted that the command queue was not empty");
|
||||
self.execute_command(stacks_client, &command);
|
||||
}
|
||||
State::OperationInProgress => {
|
||||
State::OperationInProgress(op) => {
|
||||
// We cannot execute the next command until the current one is finished...
|
||||
debug!(
|
||||
"{self}: Waiting for operation to finish. Coordinator state = {:?}",
|
||||
"{self}: Waiting for {op:?} operation to finish. Coordinator state = {:?}",
|
||||
self.coordinator.state
|
||||
);
|
||||
}
|
||||
@@ -696,9 +707,26 @@ impl Signer {
|
||||
self.process_operation_results(stacks_client, &operation_results);
|
||||
self.send_operation_results(res, operation_results);
|
||||
self.finish_operation();
|
||||
} else if !packets.is_empty() && self.coordinator.state != CoordinatorState::Idle {
|
||||
// We have received a message and are in the middle of an operation. Update our state accordingly
|
||||
self.update_operation();
|
||||
} else if !packets.is_empty() {
|
||||
// We have received a message. Update our state accordingly
|
||||
// Let us be extra explicit in case a new state type gets added to wsts' state machine
|
||||
match &self.coordinator.state {
|
||||
CoordinatorState::Idle => {}
|
||||
CoordinatorState::DkgPublicDistribute
|
||||
| CoordinatorState::DkgPublicGather
|
||||
| CoordinatorState::DkgPrivateDistribute
|
||||
| CoordinatorState::DkgPrivateGather
|
||||
| CoordinatorState::DkgEndDistribute
|
||||
| CoordinatorState::DkgEndGather => {
|
||||
self.update_operation(Operation::Dkg);
|
||||
}
|
||||
CoordinatorState::NonceRequest(_, _)
|
||||
| CoordinatorState::NonceGather(_, _)
|
||||
| CoordinatorState::SigShareRequest(_, _)
|
||||
| CoordinatorState::SigShareGather(_, _) => {
|
||||
self.update_operation(Operation::Sign);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("{self}: Saving signer state");
|
||||
@@ -1016,6 +1044,10 @@ impl Signer {
|
||||
/// Process a dkg result by broadcasting a vote to the stacks node
|
||||
fn process_dkg(&mut self, stacks_client: &StacksClient, dkg_public_key: &Point) {
|
||||
let mut dkg_results_bytes = vec![];
|
||||
debug!(
|
||||
"{self}: Received DKG result. Broadcasting vote to the stacks node...";
|
||||
"dkg_public_key" => %dkg_public_key
|
||||
);
|
||||
if let Err(e) = SignerMessage::serialize_dkg_result(
|
||||
&mut dkg_results_bytes,
|
||||
dkg_public_key,
|
||||
@@ -1273,7 +1305,49 @@ impl Signer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Refresh DKG value and queue DKG command if necessary
|
||||
pub fn refresh_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
|
||||
// First check if we should queue DKG based on contract vote state and stackerdb transactions
|
||||
let should_queue = self.should_queue_dkg(stacks_client)?;
|
||||
// Before queueing the command, check one last time if DKG has been
|
||||
// approved. It could have happened after the last call to
|
||||
// `get_approved_aggregate_key` but before the theshold check in
|
||||
// `should_queue_dkg`.
|
||||
let old_dkg = self.approved_aggregate_public_key;
|
||||
self.approved_aggregate_public_key =
|
||||
stacks_client.get_approved_aggregate_key(self.reward_cycle)?;
|
||||
if self.approved_aggregate_public_key.is_some() {
|
||||
// TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key.
|
||||
// Need to update state to store the necessary info, check against it to see if we have participated in the winning round and
|
||||
// then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate.
|
||||
self.coordinator
|
||||
.set_aggregate_public_key(self.approved_aggregate_public_key);
|
||||
if old_dkg != self.approved_aggregate_public_key {
|
||||
warn!(
|
||||
"{self}: updated DKG value to {:?}.",
|
||||
self.approved_aggregate_public_key
|
||||
);
|
||||
}
|
||||
if let State::OperationInProgress(Operation::Dkg) = self.state {
|
||||
debug!(
|
||||
"{self}: DKG has already been set. Aborting DKG operation {}.",
|
||||
self.coordinator.current_dkg_id
|
||||
);
|
||||
self.finish_operation();
|
||||
}
|
||||
} else if should_queue {
|
||||
if self.commands.front() != Some(&Command::Dkg) {
|
||||
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
|
||||
self.commands.push_front(Command::Dkg);
|
||||
} else {
|
||||
debug!("{self}: DKG command already queued...");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Should DKG be queued to the current signer's command queue
|
||||
/// This assumes that no key has been approved by the contract yet
|
||||
pub fn should_queue_dkg(&mut self, stacks_client: &StacksClient) -> Result<bool, ClientError> {
|
||||
if self.state != State::Idle
|
||||
|| self.signer_id != self.get_coordinator_dkg().0
|
||||
@@ -1283,6 +1357,25 @@ impl Signer {
|
||||
return Ok(false);
|
||||
}
|
||||
let signer_address = stacks_client.get_signer_address();
|
||||
let account_nonces = self.get_account_nonces(stacks_client, &[*signer_address]);
|
||||
let old_transactions = self.get_signer_transactions(&account_nonces).map_err(|e| {
|
||||
warn!("{self}: Failed to get old signer transactions: {e:?}. May trigger DKG unnecessarily");
|
||||
}).unwrap_or_default();
|
||||
// Check if we have an existing vote transaction for the same round and reward cycle
|
||||
for transaction in old_transactions.iter() {
|
||||
let params =
|
||||
NakamotoSigners::parse_vote_for_aggregate_public_key(transaction).unwrap_or_else(|| panic!("BUG: {self}: Received an invalid {SIGNERS_VOTING_FUNCTION_NAME} transaction in an already filtered list: {transaction:?}"));
|
||||
if Some(params.aggregate_key) == self.coordinator.aggregate_public_key
|
||||
&& params.voting_round == self.coordinator.current_dkg_id
|
||||
{
|
||||
debug!("{self}: Not triggering a DKG round. Already have a pending vote transaction.";
|
||||
"txid" => %transaction.txid(),
|
||||
"aggregate_key" => %params.aggregate_key,
|
||||
"voting_round" => params.voting_round
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
if let Some(aggregate_key) = stacks_client.get_vote_for_aggregate_public_key(
|
||||
self.coordinator.current_dkg_id,
|
||||
self.reward_cycle,
|
||||
@@ -1311,12 +1404,6 @@ impl Signer {
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
warn!("{self}: Vote for DKG failed.";
|
||||
"voting_round" => self.coordinator.current_dkg_id,
|
||||
"aggregate_key" => %aggregate_key,
|
||||
"round_weight" => round_weight,
|
||||
"threshold_weight" => threshold_weight
|
||||
);
|
||||
} else {
|
||||
// Have I already voted, but the vote is still pending in StackerDB? Check stackerdb for the same round number and reward cycle vote transaction
|
||||
// Only get the account nonce of THIS signer as we only care about our own votes, not other signer votes
|
||||
@@ -1363,32 +1450,6 @@ impl Signer {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Update the DKG for the provided signer info, triggering it if required
|
||||
pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
|
||||
let old_dkg = self.approved_aggregate_public_key;
|
||||
self.approved_aggregate_public_key =
|
||||
stacks_client.get_approved_aggregate_key(self.reward_cycle)?;
|
||||
if self.approved_aggregate_public_key.is_some() {
|
||||
// TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key.
|
||||
// Need to update state to store the necessary info, check against it to see if we have participated in the winning round and
|
||||
// then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate.
|
||||
self.coordinator
|
||||
.set_aggregate_public_key(self.approved_aggregate_public_key);
|
||||
if old_dkg != self.approved_aggregate_public_key {
|
||||
warn!(
|
||||
"{self}: updated DKG value to {:?}.",
|
||||
self.approved_aggregate_public_key
|
||||
);
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
if self.should_queue_dkg(stacks_client)? {
|
||||
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
|
||||
self.commands.push_front(Command::Dkg);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Process the event
|
||||
pub fn process_event(
|
||||
&mut self,
|
||||
|
||||
Reference in New Issue
Block a user