mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-01-12 22:43:42 +08:00
Merge branch 'develop' into 4595-nakamoto-stacks-signer-should-store-its-dkg-shares-in-stackerdb-to-enable-disaster-recovery
This commit is contained in:
@@ -44,11 +44,11 @@ pub struct Cli {
|
||||
/// Subcommands for the stacks signer binary
|
||||
#[derive(clap::Subcommand, Debug)]
|
||||
pub enum Command {
|
||||
/// Get a chunk from the stacker-db instance
|
||||
/// Get a chunk from the stacker-db instance in hex encoding
|
||||
GetChunk(GetChunkArgs),
|
||||
/// Get the latest chunk from the stacker-db instance
|
||||
/// Get the latest chunk from the stacker-db instance in hex encoding
|
||||
GetLatestChunk(GetLatestChunkArgs),
|
||||
/// List chunks from the stacker-db instance
|
||||
/// List chunks from the stacker-db instance in hex encoding
|
||||
ListChunks(StackerDBArgs),
|
||||
/// Upload a chunk to the stacker-db instance
|
||||
PutChunk(PutChunkArgs),
|
||||
|
||||
@@ -71,11 +71,13 @@ fn stackerdb_session(host: &str, contract: QualifiedContractIdentifier) -> Stack
|
||||
/// Write the chunk to stdout
|
||||
fn write_chunk_to_stdout(chunk_opt: Option<Vec<u8>>) {
|
||||
if let Some(chunk) = chunk_opt.as_ref() {
|
||||
let bytes = io::stdout().write(chunk).unwrap();
|
||||
if bytes < chunk.len() {
|
||||
let hexed_string = to_hex(chunk);
|
||||
let hexed_chunk = hexed_string.as_bytes();
|
||||
let bytes = io::stdout().write(&hexed_chunk).unwrap();
|
||||
if bytes < hexed_chunk.len() {
|
||||
print!(
|
||||
"Failed to write complete chunk to stdout. Missing {} bytes",
|
||||
chunk.len() - bytes
|
||||
hexed_chunk.len() - bytes
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -176,7 +178,9 @@ fn handle_list_chunks(args: StackerDBArgs) {
|
||||
debug!("Listing chunks...");
|
||||
let mut session = stackerdb_session(&args.host, args.contract);
|
||||
let chunk_list = session.list_chunks().unwrap();
|
||||
println!("{}", serde_json::to_string(&chunk_list).unwrap());
|
||||
let chunk_list_json = serde_json::to_string(&chunk_list).unwrap();
|
||||
let hexed_json = to_hex(chunk_list_json.as_bytes());
|
||||
println!("{}", hexed_json);
|
||||
}
|
||||
|
||||
fn handle_put_chunk(args: PutChunkArgs) {
|
||||
|
||||
@@ -383,10 +383,9 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
|
||||
if event_parity == Some(other_signer_parity) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if signer.approved_aggregate_public_key.is_none() {
|
||||
if let Err(e) = signer.update_dkg(&self.stacks_client) {
|
||||
error!("{signer}: failed to update DKG: {e}");
|
||||
if let Err(e) = signer.refresh_dkg(&self.stacks_client) {
|
||||
error!("{signer}: failed to refresh DKG: {e}");
|
||||
}
|
||||
}
|
||||
signer.refresh_coordinator();
|
||||
|
||||
@@ -21,6 +21,7 @@ use std::time::Instant;
|
||||
use blockstack_lib::chainstate::burn::ConsensusHashExtensions;
|
||||
use blockstack_lib::chainstate::nakamoto::signer_set::NakamotoSigners;
|
||||
use blockstack_lib::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockVote};
|
||||
use blockstack_lib::chainstate::stacks::boot::SIGNERS_VOTING_FUNCTION_NAME;
|
||||
use blockstack_lib::chainstate::stacks::StacksTransaction;
|
||||
use blockstack_lib::net::api::postblock_proposal::BlockValidateResponse;
|
||||
use blockstack_lib::util_lib::db::Error as DBError;
|
||||
@@ -126,13 +127,22 @@ pub enum Command {
|
||||
},
|
||||
}
|
||||
|
||||
/// The specific operations that a signer can perform
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
pub enum Operation {
|
||||
/// A DKG operation
|
||||
Dkg,
|
||||
/// A Sign operation
|
||||
Sign,
|
||||
}
|
||||
|
||||
/// The Signer state
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
pub enum State {
|
||||
/// The signer is idle, waiting for messages and commands
|
||||
Idle,
|
||||
/// The signer is executing a DKG or Sign round
|
||||
OperationInProgress,
|
||||
OperationInProgress(Operation),
|
||||
}
|
||||
|
||||
/// The stacks signer registered for the reward cycle
|
||||
@@ -349,8 +359,8 @@ impl Signer {
|
||||
}
|
||||
|
||||
/// Update operation
|
||||
fn update_operation(&mut self) {
|
||||
self.state = State::OperationInProgress;
|
||||
fn update_operation(&mut self, operation: Operation) {
|
||||
self.state = State::OperationInProgress(operation);
|
||||
self.coordinator_selector.last_message_time = Some(Instant::now());
|
||||
}
|
||||
|
||||
@@ -380,6 +390,7 @@ impl Signer {
|
||||
Ok(msg) => {
|
||||
let ack = self.stackerdb.send_message_with_retry(msg.into());
|
||||
debug!("{self}: ACK: {ack:?}",);
|
||||
self.update_operation(Operation::Dkg);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{self}: Failed to start DKG: {e:?}",);
|
||||
@@ -425,6 +436,7 @@ impl Signer {
|
||||
.unwrap_or_else(|e| {
|
||||
error!("{self}: Failed to insert block in DB: {e:?}");
|
||||
});
|
||||
self.update_operation(Operation::Sign);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{self}: Failed to start signing block: {e:?}",);
|
||||
@@ -433,7 +445,6 @@ impl Signer {
|
||||
}
|
||||
}
|
||||
}
|
||||
self.update_operation();
|
||||
}
|
||||
|
||||
/// Attempt to process the next command in the queue, and update state accordingly
|
||||
@@ -466,10 +477,10 @@ impl Signer {
|
||||
.expect("BUG: Already asserted that the command queue was not empty");
|
||||
self.execute_command(stacks_client, &command);
|
||||
}
|
||||
State::OperationInProgress => {
|
||||
State::OperationInProgress(op) => {
|
||||
// We cannot execute the next command until the current one is finished...
|
||||
debug!(
|
||||
"{self}: Waiting for operation to finish. Coordinator state = {:?}",
|
||||
"{self}: Waiting for {op:?} operation to finish. Coordinator state = {:?}",
|
||||
self.coordinator.state
|
||||
);
|
||||
}
|
||||
@@ -703,9 +714,26 @@ impl Signer {
|
||||
self.process_operation_results(stacks_client, &operation_results);
|
||||
self.send_operation_results(res, operation_results);
|
||||
self.finish_operation();
|
||||
} else if !packets.is_empty() && self.coordinator.state != CoordinatorState::Idle {
|
||||
// We have received a message and are in the middle of an operation. Update our state accordingly
|
||||
self.update_operation();
|
||||
} else if !packets.is_empty() {
|
||||
// We have received a message. Update our state accordingly
|
||||
// Let us be extra explicit in case a new state type gets added to wsts' state machine
|
||||
match &self.coordinator.state {
|
||||
CoordinatorState::Idle => {}
|
||||
CoordinatorState::DkgPublicDistribute
|
||||
| CoordinatorState::DkgPublicGather
|
||||
| CoordinatorState::DkgPrivateDistribute
|
||||
| CoordinatorState::DkgPrivateGather
|
||||
| CoordinatorState::DkgEndDistribute
|
||||
| CoordinatorState::DkgEndGather => {
|
||||
self.update_operation(Operation::Dkg);
|
||||
}
|
||||
CoordinatorState::NonceRequest(_, _)
|
||||
| CoordinatorState::NonceGather(_, _)
|
||||
| CoordinatorState::SigShareRequest(_, _)
|
||||
| CoordinatorState::SigShareGather(_, _) => {
|
||||
self.update_operation(Operation::Sign);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if packets.iter().any(|packet| match packet.msg {
|
||||
@@ -1029,6 +1057,10 @@ impl Signer {
|
||||
/// Process a dkg result by broadcasting a vote to the stacks node
|
||||
fn process_dkg(&mut self, stacks_client: &StacksClient, dkg_public_key: &Point) {
|
||||
let mut dkg_results_bytes = vec![];
|
||||
debug!(
|
||||
"{self}: Received DKG result. Broadcasting vote to the stacks node...";
|
||||
"dkg_public_key" => %dkg_public_key
|
||||
);
|
||||
if let Err(e) = SignerMessage::serialize_dkg_result(
|
||||
&mut dkg_results_bytes,
|
||||
dkg_public_key,
|
||||
@@ -1326,7 +1358,49 @@ impl Signer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Refresh DKG value and queue DKG command if necessary
|
||||
pub fn refresh_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
|
||||
// First check if we should queue DKG based on contract vote state and stackerdb transactions
|
||||
let should_queue = self.should_queue_dkg(stacks_client)?;
|
||||
// Before queueing the command, check one last time if DKG has been
|
||||
// approved. It could have happened after the last call to
|
||||
// `get_approved_aggregate_key` but before the theshold check in
|
||||
// `should_queue_dkg`.
|
||||
let old_dkg = self.approved_aggregate_public_key;
|
||||
self.approved_aggregate_public_key =
|
||||
stacks_client.get_approved_aggregate_key(self.reward_cycle)?;
|
||||
if self.approved_aggregate_public_key.is_some() {
|
||||
// TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key.
|
||||
// Need to update state to store the necessary info, check against it to see if we have participated in the winning round and
|
||||
// then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate.
|
||||
self.coordinator
|
||||
.set_aggregate_public_key(self.approved_aggregate_public_key);
|
||||
if old_dkg != self.approved_aggregate_public_key {
|
||||
warn!(
|
||||
"{self}: updated DKG value to {:?}.",
|
||||
self.approved_aggregate_public_key
|
||||
);
|
||||
}
|
||||
if let State::OperationInProgress(Operation::Dkg) = self.state {
|
||||
debug!(
|
||||
"{self}: DKG has already been set. Aborting DKG operation {}.",
|
||||
self.coordinator.current_dkg_id
|
||||
);
|
||||
self.finish_operation();
|
||||
}
|
||||
} else if should_queue {
|
||||
if self.commands.front() != Some(&Command::Dkg) {
|
||||
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
|
||||
self.commands.push_front(Command::Dkg);
|
||||
} else {
|
||||
debug!("{self}: DKG command already queued...");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Should DKG be queued to the current signer's command queue
|
||||
/// This assumes that no key has been approved by the contract yet
|
||||
pub fn should_queue_dkg(&mut self, stacks_client: &StacksClient) -> Result<bool, ClientError> {
|
||||
if self.state != State::Idle
|
||||
|| self.signer_id != self.get_coordinator_dkg().0
|
||||
@@ -1336,6 +1410,25 @@ impl Signer {
|
||||
return Ok(false);
|
||||
}
|
||||
let signer_address = stacks_client.get_signer_address();
|
||||
let account_nonces = self.get_account_nonces(stacks_client, &[*signer_address]);
|
||||
let old_transactions = self.get_signer_transactions(&account_nonces).map_err(|e| {
|
||||
warn!("{self}: Failed to get old signer transactions: {e:?}. May trigger DKG unnecessarily");
|
||||
}).unwrap_or_default();
|
||||
// Check if we have an existing vote transaction for the same round and reward cycle
|
||||
for transaction in old_transactions.iter() {
|
||||
let params =
|
||||
NakamotoSigners::parse_vote_for_aggregate_public_key(transaction).unwrap_or_else(|| panic!("BUG: {self}: Received an invalid {SIGNERS_VOTING_FUNCTION_NAME} transaction in an already filtered list: {transaction:?}"));
|
||||
if Some(params.aggregate_key) == self.coordinator.aggregate_public_key
|
||||
&& params.voting_round == self.coordinator.current_dkg_id
|
||||
{
|
||||
debug!("{self}: Not triggering a DKG round. Already have a pending vote transaction.";
|
||||
"txid" => %transaction.txid(),
|
||||
"aggregate_key" => %params.aggregate_key,
|
||||
"voting_round" => params.voting_round
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
if let Some(aggregate_key) = stacks_client.get_vote_for_aggregate_public_key(
|
||||
self.coordinator.current_dkg_id,
|
||||
self.reward_cycle,
|
||||
@@ -1364,12 +1457,6 @@ impl Signer {
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
warn!("{self}: Vote for DKG failed.";
|
||||
"voting_round" => self.coordinator.current_dkg_id,
|
||||
"aggregate_key" => %aggregate_key,
|
||||
"round_weight" => round_weight,
|
||||
"threshold_weight" => threshold_weight
|
||||
);
|
||||
} else {
|
||||
// Have I already voted, but the vote is still pending in StackerDB? Check stackerdb for the same round number and reward cycle vote transaction
|
||||
// Only get the account nonce of THIS signer as we only care about our own votes, not other signer votes
|
||||
@@ -1416,32 +1503,6 @@ impl Signer {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Update the DKG for the provided signer info, triggering it if required
|
||||
pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
|
||||
let old_dkg = self.approved_aggregate_public_key;
|
||||
self.approved_aggregate_public_key =
|
||||
stacks_client.get_approved_aggregate_key(self.reward_cycle)?;
|
||||
if self.approved_aggregate_public_key.is_some() {
|
||||
// TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key.
|
||||
// Need to update state to store the necessary info, check against it to see if we have participated in the winning round and
|
||||
// then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate.
|
||||
self.coordinator
|
||||
.set_aggregate_public_key(self.approved_aggregate_public_key);
|
||||
if old_dkg != self.approved_aggregate_public_key {
|
||||
warn!(
|
||||
"{self}: updated DKG value to {:?}.",
|
||||
self.approved_aggregate_public_key
|
||||
);
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
if self.should_queue_dkg(stacks_client)? {
|
||||
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
|
||||
self.commands.push_front(Command::Dkg);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Process the event
|
||||
pub fn process_event(
|
||||
&mut self,
|
||||
|
||||
Reference in New Issue
Block a user