diff --git a/stacks-signer/src/cli.rs b/stacks-signer/src/cli.rs index 481de7106..0d305382a 100644 --- a/stacks-signer/src/cli.rs +++ b/stacks-signer/src/cli.rs @@ -44,11 +44,11 @@ pub struct Cli { /// Subcommands for the stacks signer binary #[derive(clap::Subcommand, Debug)] pub enum Command { - /// Get a chunk from the stacker-db instance + /// Get a chunk from the stacker-db instance in hex encoding GetChunk(GetChunkArgs), - /// Get the latest chunk from the stacker-db instance + /// Get the latest chunk from the stacker-db instance in hex encoding GetLatestChunk(GetLatestChunkArgs), - /// List chunks from the stacker-db instance + /// List chunks from the stacker-db instance in hex encoding ListChunks(StackerDBArgs), /// Upload a chunk to the stacker-db instance PutChunk(PutChunkArgs), diff --git a/stacks-signer/src/main.rs b/stacks-signer/src/main.rs index 285dc7f7e..c7f3fcd68 100644 --- a/stacks-signer/src/main.rs +++ b/stacks-signer/src/main.rs @@ -71,11 +71,13 @@ fn stackerdb_session(host: &str, contract: QualifiedContractIdentifier) -> Stack /// Write the chunk to stdout fn write_chunk_to_stdout(chunk_opt: Option>) { if let Some(chunk) = chunk_opt.as_ref() { - let bytes = io::stdout().write(chunk).unwrap(); - if bytes < chunk.len() { + let hexed_string = to_hex(chunk); + let hexed_chunk = hexed_string.as_bytes(); + let bytes = io::stdout().write(&hexed_chunk).unwrap(); + if bytes < hexed_chunk.len() { print!( "Failed to write complete chunk to stdout. Missing {} bytes", - chunk.len() - bytes + hexed_chunk.len() - bytes ); } } @@ -176,7 +178,9 @@ fn handle_list_chunks(args: StackerDBArgs) { debug!("Listing chunks..."); let mut session = stackerdb_session(&args.host, args.contract); let chunk_list = session.list_chunks().unwrap(); - println!("{}", serde_json::to_string(&chunk_list).unwrap()); + let chunk_list_json = serde_json::to_string(&chunk_list).unwrap(); + let hexed_json = to_hex(chunk_list_json.as_bytes()); + println!("{}", hexed_json); } fn handle_put_chunk(args: PutChunkArgs) { diff --git a/stacks-signer/src/runloop.rs b/stacks-signer/src/runloop.rs index 17b74c2fc..f7b08525c 100644 --- a/stacks-signer/src/runloop.rs +++ b/stacks-signer/src/runloop.rs @@ -383,10 +383,9 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { if event_parity == Some(other_signer_parity) { continue; } - if signer.approved_aggregate_public_key.is_none() { - if let Err(e) = signer.update_dkg(&self.stacks_client) { - error!("{signer}: failed to update DKG: {e}"); + if let Err(e) = signer.refresh_dkg(&self.stacks_client) { + error!("{signer}: failed to refresh DKG: {e}"); } } signer.refresh_coordinator(); diff --git a/stacks-signer/src/signer.rs b/stacks-signer/src/signer.rs index 504f1dc48..e7c7adb4c 100644 --- a/stacks-signer/src/signer.rs +++ b/stacks-signer/src/signer.rs @@ -21,6 +21,7 @@ use std::time::Instant; use blockstack_lib::chainstate::burn::ConsensusHashExtensions; use blockstack_lib::chainstate::nakamoto::signer_set::NakamotoSigners; use blockstack_lib::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockVote}; +use blockstack_lib::chainstate::stacks::boot::SIGNERS_VOTING_FUNCTION_NAME; use blockstack_lib::chainstate::stacks::StacksTransaction; use blockstack_lib::net::api::postblock_proposal::BlockValidateResponse; use blockstack_lib::util_lib::db::Error as DBError; @@ -126,13 +127,22 @@ pub enum Command { }, } +/// The specific operations that a signer can perform +#[derive(PartialEq, Eq, Debug, Clone)] +pub enum Operation { + /// A DKG operation + Dkg, + /// A Sign operation + Sign, +} + /// The Signer state #[derive(PartialEq, Eq, Debug, Clone)] pub enum State { /// The signer is idle, waiting for messages and commands Idle, /// The signer is executing a DKG or Sign round - OperationInProgress, + OperationInProgress(Operation), } /// The stacks signer registered for the reward cycle @@ -349,8 +359,8 @@ impl Signer { } /// Update operation - fn update_operation(&mut self) { - self.state = State::OperationInProgress; + fn update_operation(&mut self, operation: Operation) { + self.state = State::OperationInProgress(operation); self.coordinator_selector.last_message_time = Some(Instant::now()); } @@ -380,6 +390,7 @@ impl Signer { Ok(msg) => { let ack = self.stackerdb.send_message_with_retry(msg.into()); debug!("{self}: ACK: {ack:?}",); + self.update_operation(Operation::Dkg); } Err(e) => { error!("{self}: Failed to start DKG: {e:?}",); @@ -425,6 +436,7 @@ impl Signer { .unwrap_or_else(|e| { error!("{self}: Failed to insert block in DB: {e:?}"); }); + self.update_operation(Operation::Sign); } Err(e) => { error!("{self}: Failed to start signing block: {e:?}",); @@ -433,7 +445,6 @@ impl Signer { } } } - self.update_operation(); } /// Attempt to process the next command in the queue, and update state accordingly @@ -466,10 +477,10 @@ impl Signer { .expect("BUG: Already asserted that the command queue was not empty"); self.execute_command(stacks_client, &command); } - State::OperationInProgress => { + State::OperationInProgress(op) => { // We cannot execute the next command until the current one is finished... debug!( - "{self}: Waiting for operation to finish. Coordinator state = {:?}", + "{self}: Waiting for {op:?} operation to finish. Coordinator state = {:?}", self.coordinator.state ); } @@ -703,9 +714,26 @@ impl Signer { self.process_operation_results(stacks_client, &operation_results); self.send_operation_results(res, operation_results); self.finish_operation(); - } else if !packets.is_empty() && self.coordinator.state != CoordinatorState::Idle { - // We have received a message and are in the middle of an operation. Update our state accordingly - self.update_operation(); + } else if !packets.is_empty() { + // We have received a message. Update our state accordingly + // Let us be extra explicit in case a new state type gets added to wsts' state machine + match &self.coordinator.state { + CoordinatorState::Idle => {} + CoordinatorState::DkgPublicDistribute + | CoordinatorState::DkgPublicGather + | CoordinatorState::DkgPrivateDistribute + | CoordinatorState::DkgPrivateGather + | CoordinatorState::DkgEndDistribute + | CoordinatorState::DkgEndGather => { + self.update_operation(Operation::Dkg); + } + CoordinatorState::NonceRequest(_, _) + | CoordinatorState::NonceGather(_, _) + | CoordinatorState::SigShareRequest(_, _) + | CoordinatorState::SigShareGather(_, _) => { + self.update_operation(Operation::Sign); + } + } } if packets.iter().any(|packet| match packet.msg { @@ -1029,6 +1057,10 @@ impl Signer { /// Process a dkg result by broadcasting a vote to the stacks node fn process_dkg(&mut self, stacks_client: &StacksClient, dkg_public_key: &Point) { let mut dkg_results_bytes = vec![]; + debug!( + "{self}: Received DKG result. Broadcasting vote to the stacks node..."; + "dkg_public_key" => %dkg_public_key + ); if let Err(e) = SignerMessage::serialize_dkg_result( &mut dkg_results_bytes, dkg_public_key, @@ -1326,7 +1358,49 @@ impl Signer { } } + /// Refresh DKG value and queue DKG command if necessary + pub fn refresh_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> { + // First check if we should queue DKG based on contract vote state and stackerdb transactions + let should_queue = self.should_queue_dkg(stacks_client)?; + // Before queueing the command, check one last time if DKG has been + // approved. It could have happened after the last call to + // `get_approved_aggregate_key` but before the theshold check in + // `should_queue_dkg`. + let old_dkg = self.approved_aggregate_public_key; + self.approved_aggregate_public_key = + stacks_client.get_approved_aggregate_key(self.reward_cycle)?; + if self.approved_aggregate_public_key.is_some() { + // TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key. + // Need to update state to store the necessary info, check against it to see if we have participated in the winning round and + // then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate. + self.coordinator + .set_aggregate_public_key(self.approved_aggregate_public_key); + if old_dkg != self.approved_aggregate_public_key { + warn!( + "{self}: updated DKG value to {:?}.", + self.approved_aggregate_public_key + ); + } + if let State::OperationInProgress(Operation::Dkg) = self.state { + debug!( + "{self}: DKG has already been set. Aborting DKG operation {}.", + self.coordinator.current_dkg_id + ); + self.finish_operation(); + } + } else if should_queue { + if self.commands.front() != Some(&Command::Dkg) { + info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command..."); + self.commands.push_front(Command::Dkg); + } else { + debug!("{self}: DKG command already queued..."); + } + } + Ok(()) + } + /// Should DKG be queued to the current signer's command queue + /// This assumes that no key has been approved by the contract yet pub fn should_queue_dkg(&mut self, stacks_client: &StacksClient) -> Result { if self.state != State::Idle || self.signer_id != self.get_coordinator_dkg().0 @@ -1336,6 +1410,25 @@ impl Signer { return Ok(false); } let signer_address = stacks_client.get_signer_address(); + let account_nonces = self.get_account_nonces(stacks_client, &[*signer_address]); + let old_transactions = self.get_signer_transactions(&account_nonces).map_err(|e| { + warn!("{self}: Failed to get old signer transactions: {e:?}. May trigger DKG unnecessarily"); + }).unwrap_or_default(); + // Check if we have an existing vote transaction for the same round and reward cycle + for transaction in old_transactions.iter() { + let params = + NakamotoSigners::parse_vote_for_aggregate_public_key(transaction).unwrap_or_else(|| panic!("BUG: {self}: Received an invalid {SIGNERS_VOTING_FUNCTION_NAME} transaction in an already filtered list: {transaction:?}")); + if Some(params.aggregate_key) == self.coordinator.aggregate_public_key + && params.voting_round == self.coordinator.current_dkg_id + { + debug!("{self}: Not triggering a DKG round. Already have a pending vote transaction."; + "txid" => %transaction.txid(), + "aggregate_key" => %params.aggregate_key, + "voting_round" => params.voting_round + ); + return Ok(false); + } + } if let Some(aggregate_key) = stacks_client.get_vote_for_aggregate_public_key( self.coordinator.current_dkg_id, self.reward_cycle, @@ -1364,12 +1457,6 @@ impl Signer { ); return Ok(false); } - warn!("{self}: Vote for DKG failed."; - "voting_round" => self.coordinator.current_dkg_id, - "aggregate_key" => %aggregate_key, - "round_weight" => round_weight, - "threshold_weight" => threshold_weight - ); } else { // Have I already voted, but the vote is still pending in StackerDB? Check stackerdb for the same round number and reward cycle vote transaction // Only get the account nonce of THIS signer as we only care about our own votes, not other signer votes @@ -1416,32 +1503,6 @@ impl Signer { Ok(true) } - /// Update the DKG for the provided signer info, triggering it if required - pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> { - let old_dkg = self.approved_aggregate_public_key; - self.approved_aggregate_public_key = - stacks_client.get_approved_aggregate_key(self.reward_cycle)?; - if self.approved_aggregate_public_key.is_some() { - // TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key. - // Need to update state to store the necessary info, check against it to see if we have participated in the winning round and - // then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate. - self.coordinator - .set_aggregate_public_key(self.approved_aggregate_public_key); - if old_dkg != self.approved_aggregate_public_key { - warn!( - "{self}: updated DKG value to {:?}.", - self.approved_aggregate_public_key - ); - } - return Ok(()); - }; - if self.should_queue_dkg(stacks_client)? { - info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command..."); - self.commands.push_front(Command::Dkg); - } - Ok(()) - } - /// Process the event pub fn process_event( &mut self,