diff --git a/Cargo.lock b/Cargo.lock index 230908330..70b20ecf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -498,6 +498,28 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bindgen" +version = "0.64.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4" +dependencies = [ + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "log", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 1.0.109", + "which", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -610,6 +632,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "0.1.10" @@ -661,6 +692,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "2.34.0" @@ -1457,6 +1499,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "gloo-timers" version = "0.2.6" @@ -1877,6 +1925,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.153" @@ -1903,6 +1957,16 @@ dependencies = [ "rle-decode-fast", ] +[[package]] +name = "libloading" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c571b676ddfc9a8c12f1f3d3085a7b163966a8fd8098a90640953ce5f6170161" +dependencies = [ + "cfg-if 1.0.0", + "windows-sys 0.48.0", +] + [[package]] name = "libredox" version = "0.0.1" @@ -2025,6 +2089,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.2" @@ -2124,6 +2194,16 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2207,6 +2287,7 @@ version = "7.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a64d160b891178fb9d43d1a58ddcafb6502daeb54d810e5e92a7c3c9bfacc07" dependencies = [ + "bindgen", "bitvec", "bs58 0.4.0", "cc", @@ -2255,6 +2336,12 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -2890,6 +2977,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hex" version = "2.1.0" @@ -3251,6 +3344,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signature" version = "2.2.0" @@ -4371,6 +4470,18 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix 0.38.31", +] + [[package]] name = "winapi" version = "0.2.8" @@ -4596,8 +4707,7 @@ dependencies = [ [[package]] name = "wsts" version = "8.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "467aa8e40ed0277d19922fd0e7357c16552cb900e5138f61a48ac23c4b7878e0" +source = "git+https://github.com/stacks-network/wsts.git?branch=feat/public-sign-ids#99c1ed3d528d98585ba4b50084e8a6c37f8f5793" dependencies = [ "aes-gcm 0.10.3", "bs58 0.5.0", diff --git a/Cargo.toml b/Cargo.toml index 66791df99..dc344554e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,8 @@ rand_core = "0.6" rand = "0.8" rand_chacha = "0.3.1" tikv-jemallocator = "0.5.4" -wsts = { version = "8.1", default-features = false } +# wsts = { version = "8.1", default-features = false } +wsts = { git = "https://github.com/stacks-network/wsts.git", branch = "feat/public-sign-ids" } # Use a bit more than default optimization for # dev builds to speed up test execution diff --git a/libsigner/src/events.rs b/libsigner/src/events.rs index 1c29ec941..7554154af 100644 --- a/libsigner/src/events.rs +++ b/libsigner/src/events.rs @@ -36,6 +36,7 @@ use stacks_common::codec::{ StacksMessageCodec, }; pub use stacks_common::consts::SIGNER_SLOTS_PER_USER; +use stacks_common::types::chainstate::StacksPublicKey; use stacks_common::util::hash::Sha512Trunc256Sum; use tiny_http::{ Method as HttpMethod, Request as HttpRequest, Response as HttpResponse, Server as HttpServer, @@ -64,8 +65,12 @@ pub struct BlockProposalSigners { /// Event enum for newly-arrived signer subscribed events #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum SignerEvent { - /// The miner proposed blocks for signers to observe and sign - ProposedBlocks(Vec), + /// The miner sent proposed blocks or messages for signers to observe and sign + ProposedBlocks( + Vec, + Vec, + Option, + ), /// The signer messages for other signers and miners to observe /// The u32 is the signer set to which the message belongs (either 0 or 1) SignerMessages(u32, Vec), @@ -255,7 +260,7 @@ impl EventReceiver for SignerEventReceiver { /// Errors are recoverable -- the caller should call this method again even if it returns an /// error. fn next_event(&mut self) -> Result { - self.with_server(|event_receiver, http_server, is_mainnet| { + self.with_server(|event_receiver, http_server, _is_mainnet| { // were we asked to terminate? if event_receiver.is_stopped() { return Err(EventError::Terminated); @@ -278,21 +283,22 @@ impl EventReceiver for SignerEventReceiver { ))); } if request.url() == "/stackerdb_chunks" { - process_stackerdb_event(event_receiver.local_addr, request, is_mainnet) + process_stackerdb_event(event_receiver.local_addr, request) + .map_err(|e| { + error!("Error processing stackerdb_chunks message"; "err" => ?e); + e + }) } else if request.url() == "/proposal_response" { process_proposal_response(request) } else { let url = request.url().to_string(); - info!( + debug!( "[{:?}] next_event got request with unexpected url {}, return OK so other side doesn't keep sending this", event_receiver.local_addr, request.url() ); - - if let Err(e) = request.respond(HttpResponse::empty(200u16)) { - error!("Failed to respond to request: {:?}", &e); - } + ack_dispatcher(request); Err(EventError::UnrecognizedEvent(url)) } })? @@ -348,20 +354,22 @@ impl EventReceiver for SignerEventReceiver { } } +fn ack_dispatcher(request: HttpRequest) { + if let Err(e) = request.respond(HttpResponse::empty(200u16)) { + error!("Failed to respond to request: {:?}", &e); + }; +} + /// Process a stackerdb event from the node fn process_stackerdb_event( local_addr: Option, mut request: HttpRequest, - is_mainnet: bool, ) -> Result { debug!("Got stackerdb_chunks event"); let mut body = String::new(); if let Err(e) = request.as_reader().read_to_string(&mut body) { error!("Failed to read body: {:?}", &e); - - if let Err(e) = request.respond(HttpResponse::empty(200u16)) { - error!("Failed to respond to request: {:?}", &e); - }; + ack_dispatcher(request); return Err(EventError::MalformedRequest(format!( "Failed to read body: {:?}", &e @@ -371,47 +379,86 @@ fn process_stackerdb_event( let event: StackerDBChunksEvent = serde_json::from_slice(body.as_bytes()) .map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?; - let signer_event = if event.contract_id == boot_code_id(MINERS_NAME, is_mainnet) { - let blocks: Vec = event - .modified_slots - .iter() - .filter_map(|chunk| read_next::(&mut &chunk.data[..]).ok()) - .collect(); - SignerEvent::ProposedBlocks(blocks) - } else if event.contract_id.name.to_string().starts_with(SIGNERS_NAME) - && event.contract_id.issuer.1 == [0u8; 20] - { - let Some((signer_set, _)) = - get_signers_db_signer_set_message_id(event.contract_id.name.as_str()) - else { - return Err(EventError::UnrecognizedStackerDBContract(event.contract_id)); - }; - // signer-XXX-YYY boot contract - let signer_messages: Vec = event - .modified_slots - .iter() - .filter_map(|chunk| read_next::(&mut &chunk.data[..]).ok()) - .collect(); - SignerEvent::SignerMessages(signer_set, signer_messages) - } else { - info!( - "[{:?}] next_event got event from an unexpected contract id {}, return OK so other side doesn't keep sending this", - local_addr, - event.contract_id - ); - if let Err(e) = request.respond(HttpResponse::empty(200u16)) { - error!("Failed to respond to request: {:?}", &e); + let event_contract_id = event.contract_id.clone(); + + let signer_event = match SignerEvent::try_from(event) { + Err(e) => { + info!( + "[{:?}] next_event got event from an unexpected contract id {}, return OK so other side doesn't keep sending this", + local_addr, + event_contract_id + ); + ack_dispatcher(request); + return Err(e.into()); } - return Err(EventError::UnrecognizedStackerDBContract(event.contract_id)); + Ok(x) => x, }; - if let Err(e) = request.respond(HttpResponse::empty(200u16)) { - error!("Failed to respond to request: {:?}", &e); - } + ack_dispatcher(request); Ok(signer_event) } +impl TryFrom for SignerEvent { + type Error = EventError; + + fn try_from(event: StackerDBChunksEvent) -> Result { + let signer_event = if event.contract_id.name.as_str() == MINERS_NAME + && event.contract_id.issuer.1 == [0; 20] + { + let mut blocks = vec![]; + let mut messages = vec![]; + let mut miner_pk = None; + for chunk in event.modified_slots { + miner_pk = Some(chunk.recover_pk().map_err(|e| { + EventError::MalformedRequest(format!( + "Failed to recover PK from StackerDB chunk: {e}" + )) + })?); + if chunk.slot_id % 2 == 0 { + // block + let Ok(block) = + BlockProposalSigners::consensus_deserialize(&mut chunk.data.as_slice()) + else { + continue; + }; + blocks.push(block); + } else if chunk.slot_id % 2 == 1 { + // message + let Ok(msg) = SignerMessage::consensus_deserialize(&mut chunk.data.as_slice()) + else { + continue; + }; + messages.push(msg); + } else { + return Err(EventError::UnrecognizedEvent( + "Unrecognized slot_id for miners contract".into(), + )); + }; + } + SignerEvent::ProposedBlocks(blocks, messages, miner_pk) + } else if event.contract_id.name.starts_with(SIGNERS_NAME) + && event.contract_id.issuer.1 == [0u8; 20] + { + let Some((signer_set, _)) = + get_signers_db_signer_set_message_id(event.contract_id.name.as_str()) + else { + return Err(EventError::UnrecognizedStackerDBContract(event.contract_id)); + }; + // signer-XXX-YYY boot contract + let signer_messages: Vec = event + .modified_slots + .iter() + .filter_map(|chunk| read_next::(&mut &chunk.data[..]).ok()) + .collect(); + SignerEvent::SignerMessages(signer_set, signer_messages) + } else { + return Err(EventError::UnrecognizedStackerDBContract(event.contract_id)); + }; + Ok(signer_event) + } +} + /// Process a proposal response from the node fn process_proposal_response(mut request: HttpRequest) -> Result { debug!("Got proposal_response event"); @@ -438,7 +485,7 @@ fn process_proposal_response(mut request: HttpRequest) -> Result Option<(u32, u32)> { +pub fn get_signers_db_signer_set_message_id(name: &str) -> Option<(u32, u32)> { // Splitting the string by '-' let parts: Vec<&str> = name.split('-').collect(); if parts.len() != 3 { diff --git a/libsigner/src/libsigner.rs b/libsigner/src/libsigner.rs index 1ae699d6e..33c5918fe 100644 --- a/libsigner/src/libsigner.rs +++ b/libsigner/src/libsigner.rs @@ -49,7 +49,7 @@ pub use crate::events::{ SignerStopSignaler, }; pub use crate::messages::{ - BlockRejection, BlockResponse, RejectCode, SignerMessage, BLOCK_MSG_ID, TRANSACTIONS_MSG_ID, + BlockRejection, BlockResponse, MessageSlotID, RejectCode, SignerMessage, }; pub use crate::runloop::{RunningSigner, Signer, SignerRunLoop}; pub use crate::session::{SignerSession, StackerDBSession}; diff --git a/libsigner/src/messages.rs b/libsigner/src/messages.rs index debb43218..f1378a712 100644 --- a/libsigner/src/messages.rs +++ b/libsigner/src/messages.rs @@ -14,12 +14,23 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +//! Messages in the signer-miner interaction have a multi-level hierarchy. +//! Signers send messages to each other through Packet messages. These messages, +//! as well as `BlockResponse`, `Transactions`, and `DkgResults` messages are stored +//! StackerDBs based on the `MessageSlotID` for the particular message type. This is a +//! shared identifier space between the four message kinds and their subtypes. +//! +//! These four message kinds are differentiated with a `SignerMessageTypePrefix` +//! and the `SignerMessage` enum. + +use std::fmt::{Debug, Display}; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Sender; use std::sync::Arc; +use blockstack_lib::chainstate::nakamoto::signer_set::NakamotoSigners; use blockstack_lib::chainstate::nakamoto::NakamotoBlock; use blockstack_lib::chainstate::stacks::events::StackerDBChunksEvent; use blockstack_lib::chainstate::stacks::{StacksTransaction, ThresholdSignature}; @@ -54,28 +65,67 @@ use wsts::state_machine::{signer, SignError}; use crate::http::{decode_http_body, decode_http_request}; use crate::EventError; -// The slot IDS for each message type -const DKG_BEGIN_MSG_ID: u32 = 0; -const DKG_PRIVATE_BEGIN_MSG_ID: u32 = 1; -const DKG_END_BEGIN_MSG_ID: u32 = 2; -const DKG_END_MSG_ID: u32 = 3; -const DKG_PUBLIC_SHARES_MSG_ID: u32 = 4; -const DKG_PRIVATE_SHARES_MSG_ID: u32 = 5; -const NONCE_REQUEST_MSG_ID: u32 = 6; -const NONCE_RESPONSE_MSG_ID: u32 = 7; -const SIGNATURE_SHARE_REQUEST_MSG_ID: u32 = 8; -const SIGNATURE_SHARE_RESPONSE_MSG_ID: u32 = 9; -/// The slot ID for the block response for miners to observe -pub const BLOCK_MSG_ID: u32 = 10; -/// The slot ID for the transactions list for miners and signers to observe -pub const TRANSACTIONS_MSG_ID: u32 = 11; +define_u8_enum!( +/// Enum representing the stackerdb message identifier: this is +/// the contract index in the signers contracts (i.e., X in signers-0-X) +MessageSlotID { + /// DkgBegin message + DkgBegin = 0, + /// DkgPrivateBegin + DkgPrivateBegin = 1, + /// DkgEndBegin + DkgEndBegin = 2, + /// DkgEnd + DkgEnd = 3, + /// DkgPublicshares + DkgPublicShares = 4, + /// DkgPrivateShares + DkgPrivateShares = 5, + /// NonceRequest + NonceRequest = 6, + /// NonceResponse + NonceResponse = 7, + /// SignatureShareRequest + SignatureShareRequest = 8, + /// SignatureShareResponse + SignatureShareResponse = 9, + /// Block proposal responses for miners to observe + BlockResponse = 10, + /// Transactions list for miners and signers to observe + Transactions = 11, + /// DKG Results + DkgResults = 12 +}); define_u8_enum!(SignerMessageTypePrefix { BlockResponse = 0, Packet = 1, - Transactions = 2 + Transactions = 2, + DkgResults = 3 }); +impl MessageSlotID { + /// Return the StackerDB contract corresponding to messages of this type + pub fn stacker_db_contract( + &self, + mainnet: bool, + reward_cycle: u64, + ) -> QualifiedContractIdentifier { + NakamotoSigners::make_signers_db_contract_id(reward_cycle, self.to_u32(), mainnet) + } + + /// Return the u32 identifier for the message slot (used to index the contract that stores it) + pub fn to_u32(&self) -> u32 { + self.to_u8().into() + } +} + +impl Display for MessageSlotID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}({})", self, self.to_u8()) + } +} + impl TryFrom for SignerMessageTypePrefix { type Error = CodecError; fn try_from(value: u8) -> Result { @@ -91,6 +141,7 @@ impl From<&SignerMessage> for SignerMessageTypePrefix { SignerMessage::Packet(_) => SignerMessageTypePrefix::Packet, SignerMessage::BlockResponse(_) => SignerMessageTypePrefix::BlockResponse, SignerMessage::Transactions(_) => SignerMessageTypePrefix::Transactions, + SignerMessage::DkgResults { .. } => SignerMessageTypePrefix::DkgResults, } } } @@ -168,7 +219,7 @@ impl From<&RejectCode> for RejectCodeTypePrefix { } /// The messages being sent through the stacker db contracts -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Serialize, Deserialize)] pub enum SignerMessage { /// The signed/validated Nakamoto block for miners to observe BlockResponse(BlockResponse), @@ -176,30 +227,121 @@ pub enum SignerMessage { Packet(Packet), /// The list of transactions for miners and signers to observe that this signer cares about Transactions(Vec), + /// The results of a successful DKG + DkgResults { + /// The aggregate key from the DKG round + aggregate_key: Point, + /// The polynomial commits used to construct the aggregate key + party_polynomials: Vec<(u32, PolyCommitment)>, + }, +} + +impl Debug for SignerMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::BlockResponse(b) => Debug::fmt(b, f), + Self::Packet(p) => Debug::fmt(p, f), + Self::Transactions(t) => f.debug_tuple("Transactions").field(t).finish(), + Self::DkgResults { + aggregate_key, + party_polynomials, + } => { + let party_polynomials: Vec<_> = party_polynomials + .iter() + .map(|(ix, commit)| (ix, commit.to_string())) + .collect(); + f.debug_struct("DkgResults") + .field("aggregate_key", &aggregate_key.to_string()) + .field("party_polynomials", &party_polynomials) + .finish() + } + } + } } impl SignerMessage { /// Helper function to determine the slot ID for the provided stacker-db writer id - pub fn msg_id(&self) -> u32 { + pub fn msg_id(&self) -> MessageSlotID { match self { Self::Packet(packet) => match packet.msg { - Message::DkgBegin(_) => DKG_BEGIN_MSG_ID, - Message::DkgPrivateBegin(_) => DKG_PRIVATE_BEGIN_MSG_ID, - Message::DkgEndBegin(_) => DKG_END_BEGIN_MSG_ID, - Message::DkgEnd(_) => DKG_END_MSG_ID, - Message::DkgPublicShares(_) => DKG_PUBLIC_SHARES_MSG_ID, - Message::DkgPrivateShares(_) => DKG_PRIVATE_SHARES_MSG_ID, - Message::NonceRequest(_) => NONCE_REQUEST_MSG_ID, - Message::NonceResponse(_) => NONCE_RESPONSE_MSG_ID, - Message::SignatureShareRequest(_) => SIGNATURE_SHARE_REQUEST_MSG_ID, - Message::SignatureShareResponse(_) => SIGNATURE_SHARE_RESPONSE_MSG_ID, + Message::DkgBegin(_) => MessageSlotID::DkgBegin, + Message::DkgPrivateBegin(_) => MessageSlotID::DkgPrivateBegin, + Message::DkgEndBegin(_) => MessageSlotID::DkgEndBegin, + Message::DkgEnd(_) => MessageSlotID::DkgEnd, + Message::DkgPublicShares(_) => MessageSlotID::DkgPublicShares, + Message::DkgPrivateShares(_) => MessageSlotID::DkgPrivateShares, + Message::NonceRequest(_) => MessageSlotID::NonceRequest, + Message::NonceResponse(_) => MessageSlotID::NonceResponse, + Message::SignatureShareRequest(_) => MessageSlotID::SignatureShareRequest, + Message::SignatureShareResponse(_) => MessageSlotID::SignatureShareResponse, }, - Self::BlockResponse(_) => BLOCK_MSG_ID, - Self::Transactions(_) => TRANSACTIONS_MSG_ID, + Self::BlockResponse(_) => MessageSlotID::BlockResponse, + Self::Transactions(_) => MessageSlotID::Transactions, + Self::DkgResults { .. } => MessageSlotID::DkgResults, } } } +impl SignerMessage { + /// Provide an interface for consensus serializing a DkgResults message + /// without constructing the DkgResults struct (this eliminates a clone) + pub fn serialize_dkg_result<'a, W: Write, I>( + fd: &mut W, + aggregate_key: &Point, + party_polynomials: I, + write_prefix: bool, + ) -> Result<(), CodecError> + where + I: ExactSizeIterator + Iterator, + { + if write_prefix { + SignerMessageTypePrefix::DkgResults + .to_u8() + .consensus_serialize(fd)?; + } + fd.write_all(&aggregate_key.compress().data) + .map_err(CodecError::WriteError)?; + let polynomials_len: u32 = party_polynomials + .len() + .try_into() + .map_err(|_| CodecError::ArrayTooLong)?; + polynomials_len.consensus_serialize(fd)?; + for (party_id, polynomial) in party_polynomials { + party_id.consensus_serialize(fd)?; + fd.write_all(&polynomial.id.id.to_bytes()) + .map_err(CodecError::WriteError)?; + fd.write_all(&polynomial.id.kG.compress().data) + .map_err(CodecError::WriteError)?; + fd.write_all(&polynomial.id.kca.to_bytes()) + .map_err(CodecError::WriteError)?; + let commit_len: u32 = polynomial + .poly + .len() + .try_into() + .map_err(|_| CodecError::ArrayTooLong)?; + commit_len.consensus_serialize(fd)?; + for poly in polynomial.poly.iter() { + fd.write_all(&poly.compress().data) + .map_err(CodecError::WriteError)?; + } + } + Ok(()) + } + + fn deserialize_point(fd: &mut R) -> Result { + let mut bytes = [0; 33]; + fd.read_exact(&mut bytes).map_err(CodecError::ReadError)?; + Point::try_from(&Compressed::from(bytes)) + .map_err(|e| CodecError::DeserializeError(e.to_string())) + } + + fn deserialize_scalar(fd: &mut R) -> Result { + let mut bytes = [0; 32]; + fd.read_exact(&mut bytes).map_err(CodecError::ReadError)?; + Ok(Scalar::from(bytes)) + } +} + impl StacksMessageCodec for SignerMessage { fn consensus_serialize(&self, fd: &mut W) -> Result<(), CodecError> { write_next(fd, &(SignerMessageTypePrefix::from(self) as u8))?; @@ -213,6 +355,17 @@ impl StacksMessageCodec for SignerMessage { SignerMessage::Transactions(transactions) => { write_next(fd, transactions)?; } + SignerMessage::DkgResults { + aggregate_key, + party_polynomials, + } => { + Self::serialize_dkg_result( + fd, + aggregate_key, + party_polynomials.iter().map(|(a, b)| (a, b)), + false, + )?; + } }; Ok(()) } @@ -233,6 +386,46 @@ impl StacksMessageCodec for SignerMessage { let transactions = read_next::, _>(fd)?; SignerMessage::Transactions(transactions) } + SignerMessageTypePrefix::DkgResults => { + let aggregate_key = Self::deserialize_point(fd)?; + let party_polynomial_len = u32::consensus_deserialize(fd)?; + let mut party_polynomials = Vec::with_capacity( + party_polynomial_len + .try_into() + .expect("FATAL: u32 could not fit in usize"), + ); + for _ in 0..party_polynomial_len { + let party_id = u32::consensus_deserialize(fd)?; + let polynomial_id_id = Self::deserialize_scalar(fd)?; + let polynomial_id_kg = Self::deserialize_point(fd)?; + let polynomial_id_kca = Self::deserialize_scalar(fd)?; + + let commit_len = u32::consensus_deserialize(fd)?; + let mut polynomial_poly = Vec::with_capacity( + commit_len + .try_into() + .expect("FATAL: u32 could not fit in usize"), + ); + for _ in 0..commit_len { + let poly = Self::deserialize_point(fd)?; + polynomial_poly.push(poly); + } + let polynomial_id = ID { + id: polynomial_id_id, + kG: polynomial_id_kg, + kca: polynomial_id_kca, + }; + let polynomial = PolyCommitment { + id: polynomial_id, + poly: polynomial_poly, + }; + party_polynomials.push((party_id, polynomial)); + } + Self::DkgResults { + aggregate_key, + party_polynomials, + } + } }; Ok(message) } @@ -1103,7 +1296,6 @@ impl From for SignerMessage { #[cfg(test)] mod test { - use blockstack_lib::chainstate::stacks::{ TransactionAnchorMode, TransactionAuth, TransactionPayload, TransactionPostConditionMode, TransactionSmartContract, TransactionVersion, @@ -1116,6 +1308,18 @@ mod test { use wsts::common::Signature; use super::{StacksMessageCodecExtensions, *}; + + #[test] + fn signer_slots_count_is_sane() { + let slot_identifiers_len = MessageSlotID::ALL.len(); + assert!( + SIGNER_SLOTS_PER_USER as usize >= slot_identifiers_len, + "stacks_common::SIGNER_SLOTS_PER_USER ({}) must be >= slot identifiers ({})", + SIGNER_SLOTS_PER_USER, + slot_identifiers_len, + ); + } + #[test] fn serde_reject_code() { let code = RejectCode::ValidationFailed(ValidateRejectCode::InvalidBlock); diff --git a/libstackerdb/src/libstackerdb.rs b/libstackerdb/src/libstackerdb.rs index 0a04015e7..8c38d8be7 100644 --- a/libstackerdb/src/libstackerdb.rs +++ b/libstackerdb/src/libstackerdb.rs @@ -194,6 +194,12 @@ impl StackerDBChunkData { Ok(()) } + pub fn recover_pk(&self) -> Result { + let digest = self.get_slot_metadata().auth_digest(); + StacksPublicKey::recover_to_pubkey(digest.as_bytes(), &self.sig) + .map_err(|ve| Error::VerifyingError(ve.to_string())) + } + /// Verify that this chunk was signed by the given /// public key hash (`addr`). Only fails if the underlying signing library fails. pub fn verify(&self, addr: &StacksAddress) -> Result { diff --git a/stacks-common/src/libcommon.rs b/stacks-common/src/libcommon.rs index 2f7221bd5..0a9fa9d64 100644 --- a/stacks-common/src/libcommon.rs +++ b/stacks-common/src/libcommon.rs @@ -62,5 +62,5 @@ pub mod consts { /// The number of StackerDB slots each signing key needs /// to use to participate in DKG and block validation signing. - pub const SIGNER_SLOTS_PER_USER: u32 = 12; + pub const SIGNER_SLOTS_PER_USER: u32 = 13; } diff --git a/stacks-common/src/util/macros.rs b/stacks-common/src/util/macros.rs index cd2578e9c..57ce30ad9 100644 --- a/stacks-common/src/util/macros.rs +++ b/stacks-common/src/util/macros.rs @@ -210,16 +210,25 @@ macro_rules! guarded_string { /// gives you a try_from(u8) -> Option function #[macro_export] macro_rules! define_u8_enum { - ($Name:ident { $($Variant:ident = $Val:literal),+ }) => + ($(#[$outer:meta])* + $Name:ident { + $( + $(#[$inner:meta])* + $Variant:ident = $Val:literal),+ + }) => { #[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash, Serialize, Deserialize)] #[repr(u8)] + $(#[$outer])* pub enum $Name { - $($Variant = $Val),*, + $( $(#[$inner])* + $Variant = $Val),*, } impl $Name { + /// All members of the enum pub const ALL: &'static [$Name] = &[$($Name::$Variant),*]; + /// Return the u8 representation of the variant pub fn to_u8(&self) -> u8 { match self { $( @@ -228,6 +237,8 @@ macro_rules! define_u8_enum { } } + /// Returns Some and the variant if `v` is a u8 corresponding to a variant in this enum. + /// Returns None otherwise pub fn from_u8(v: u8) -> Option { match v { $( diff --git a/stacks-common/src/util/mod.rs b/stacks-common/src/util/mod.rs index 97cbc4104..bec0edd68 100644 --- a/stacks-common/src/util/mod.rs +++ b/stacks-common/src/util/mod.rs @@ -27,6 +27,7 @@ pub mod secp256k1; pub mod uint; pub mod vrf; +use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; use std::{error, fmt, thread, time}; @@ -69,6 +70,48 @@ impl fmt::Display for HexError { } } +pub struct HashMapDisplay<'a, K: std::hash::Hash, V>(pub &'a HashMap); + +impl<'a, K, V> fmt::Display for HashMapDisplay<'a, K, V> +where + K: fmt::Display + std::hash::Hash, + V: fmt::Display, + K: Ord, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut keys: Vec<_> = self.0.keys().collect(); + keys.sort(); + write!(f, "{{")?; + for key in keys.into_iter() { + let Some(value) = self.0.get(key) else { + continue; + }; + write!(f, "{key}: {value}")?; + } + write!(f, "}}") + } +} + +impl<'a, K, V> fmt::Debug for HashMapDisplay<'a, K, V> +where + K: fmt::Display + std::hash::Hash, + V: fmt::Debug, + K: Ord, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut keys: Vec<_> = self.0.keys().collect(); + keys.sort(); + write!(f, "{{")?; + for key in keys.into_iter() { + let Some(value) = self.0.get(key) else { + continue; + }; + write!(f, "{key}: {value:?}")?; + } + write!(f, "}}") + } +} + impl error::Error for HexError { fn cause(&self) -> Option<&dyn error::Error> { None diff --git a/stacks-common/src/util/secp256k1.rs b/stacks-common/src/util/secp256k1.rs index 5d1a5f5ae..0274f41b0 100644 --- a/stacks-common/src/util/secp256k1.rs +++ b/stacks-common/src/util/secp256k1.rs @@ -346,6 +346,10 @@ impl Secp256k1PrivateKey { } to_hex(&bytes) } + + pub fn as_slice(&self) -> &[u8; 32] { + self.key.as_ref() + } } impl PrivateKey for Secp256k1PrivateKey { diff --git a/stacks-signer/src/client/stackerdb.rs b/stacks-signer/src/client/stackerdb.rs index b6a7accdc..6418b8a0b 100644 --- a/stacks-signer/src/client/stackerdb.rs +++ b/stacks-signer/src/client/stackerdb.rs @@ -14,18 +14,13 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . // -use blockstack_lib::chainstate::nakamoto::signer_set::NakamotoSigners; use blockstack_lib::chainstate::stacks::StacksTransaction; use blockstack_lib::net::api::poststackerdbchunk::StackerDBErrorCodes; -use blockstack_lib::util_lib::boot::boot_code_addr; -use clarity::vm::types::QualifiedContractIdentifier; -use clarity::vm::ContractName; use hashbrown::HashMap; -use libsigner::{SignerMessage, SignerSession, StackerDBSession, TRANSACTIONS_MSG_ID}; +use libsigner::{MessageSlotID, SignerMessage, SignerSession, StackerDBSession}; use libstackerdb::{StackerDBChunkAckData, StackerDBChunkData}; use slog::{slog_debug, slog_warn}; use stacks_common::codec::{read_next, StacksMessageCodec}; -use stacks_common::consts::SIGNER_SLOTS_PER_USER; use stacks_common::types::chainstate::StacksPrivateKey; use stacks_common::{debug, warn}; @@ -38,11 +33,11 @@ use crate::signer::SignerSlotID; pub struct StackerDB { /// The stacker-db sessions for each signer set and message type. /// Maps message ID to the DB session. - signers_message_stackerdb_sessions: HashMap, + signers_message_stackerdb_sessions: HashMap, /// The private key used in all stacks node communications stacks_private_key: StacksPrivateKey, /// A map of a message ID to last chunk version for each session - slot_versions: HashMap>, + slot_versions: HashMap>, /// The signer slot ID -- the index into the signer list for this signer daemon's signing key. signer_slot_id: SignerSlotID, /// The reward cycle of the connecting signer @@ -72,33 +67,16 @@ impl StackerDB { signer_slot_id: SignerSlotID, ) -> Self { let mut signers_message_stackerdb_sessions = HashMap::new(); - let stackerdb_issuer = boot_code_addr(is_mainnet); - for msg_id in 0..SIGNER_SLOTS_PER_USER { + for msg_id in MessageSlotID::ALL { signers_message_stackerdb_sessions.insert( - msg_id, - StackerDBSession::new( - host, - QualifiedContractIdentifier::new( - stackerdb_issuer.into(), - ContractName::from( - NakamotoSigners::make_signers_db_name(reward_cycle, msg_id).as_str(), - ), - ), - ), + *msg_id, + StackerDBSession::new(host, msg_id.stacker_db_contract(is_mainnet, reward_cycle)), ); } let next_transaction_session = StackerDBSession::new( host, - QualifiedContractIdentifier::new( - stackerdb_issuer.into(), - ContractName::from( - NakamotoSigners::make_signers_db_name( - reward_cycle.wrapping_add(1), - TRANSACTIONS_MSG_ID, - ) - .as_str(), - ), - ), + MessageSlotID::Transactions + .stacker_db_contract(is_mainnet, reward_cycle.wrapping_add(1)), ); Self { @@ -116,11 +94,21 @@ impl StackerDB { &mut self, message: SignerMessage, ) -> Result { - let message_bytes = message.serialize_to_vec(); let msg_id = message.msg_id(); + let message_bytes = message.serialize_to_vec(); + self.send_message_bytes_with_retry(&msg_id, message_bytes) + } + + /// Sends message (as a raw msg ID and bytes) to the .signers stacker-db with an + /// exponential backoff retry + pub fn send_message_bytes_with_retry( + &mut self, + msg_id: &MessageSlotID, + message_bytes: Vec, + ) -> Result { let slot_id = self.signer_slot_id; loop { - let mut slot_version = if let Some(versions) = self.slot_versions.get_mut(&msg_id) { + let mut slot_version = if let Some(versions) = self.slot_versions.get_mut(msg_id) { if let Some(version) = versions.get(&slot_id) { *version } else { @@ -130,14 +118,14 @@ impl StackerDB { } else { let mut versions = HashMap::new(); versions.insert(slot_id, 0); - self.slot_versions.insert(msg_id, versions); + self.slot_versions.insert(*msg_id, versions); 1 }; let mut chunk = StackerDBChunkData::new(slot_id.0, slot_version, message_bytes.clone()); chunk.sign(&self.stacks_private_key)?; - let Some(session) = self.signers_message_stackerdb_sessions.get_mut(&msg_id) else { + let Some(session) = self.signers_message_stackerdb_sessions.get_mut(msg_id) else { panic!("FATAL: would loop forever trying to send a message with ID {}, for which we don't have a session", msg_id); }; @@ -149,7 +137,7 @@ impl StackerDB { let send_request = || session.put_chunk(&chunk).map_err(backoff::Error::transient); let chunk_ack: StackerDBChunkAckData = retry_with_exponential_backoff(send_request)?; - if let Some(versions) = self.slot_versions.get_mut(&msg_id) { + if let Some(versions) = self.slot_versions.get_mut(msg_id) { // NOTE: per the above, this is always executed versions.insert(slot_id, slot_version.saturating_add(1)); } else { @@ -171,7 +159,7 @@ impl StackerDB { } else { warn!("Failed to send message to stackerdb due to wrong version number. Attempted {}. Expected unkown version number. Incrementing and retrying...", slot_version); } - if let Some(versions) = self.slot_versions.get_mut(&msg_id) { + if let Some(versions) = self.slot_versions.get_mut(msg_id) { // NOTE: per the above, this is always executed versions.insert(slot_id, slot_version.saturating_add(1)); } else { @@ -241,7 +229,7 @@ impl StackerDB { ) -> Result, ClientError> { let Some(transactions_session) = self .signers_message_stackerdb_sessions - .get_mut(&TRANSACTIONS_MSG_ID) + .get_mut(&MessageSlotID::Transactions) else { return Err(ClientError::NotConnected); }; diff --git a/stacks-signer/src/client/stacks_client.rs b/stacks-signer/src/client/stacks_client.rs index 1cf142e13..c3db54132 100644 --- a/stacks-signer/src/client/stacks_client.rs +++ b/stacks-signer/src/client/stacks_client.rs @@ -375,6 +375,22 @@ impl StacksClient { Ok(blocks_mined / reward_cycle_length) } + /// Get the current reward cycle and whether the prepare phase has started for the next cycle + pub fn get_current_reward_cycle_and_prepare_status(&self) -> Result<(u64, bool), ClientError> { + let pox_data = self.get_pox_data()?; + let blocks_mined = pox_data + .current_burnchain_block_height + .saturating_sub(pox_data.first_burnchain_block_height); + let reward_cycle_length = pox_data + .reward_phase_block_length + .saturating_add(pox_data.prepare_phase_block_length); + let reward_phase_length = pox_data.reward_phase_block_length; + let reward_cycle = blocks_mined / reward_cycle_length; + let reward_cycle_index = blocks_mined % reward_cycle_length; + let in_prepare_for_next = reward_cycle_index >= reward_phase_length; + Ok((reward_cycle, in_prepare_for_next)) + } + /// Helper function to retrieve the account info from the stacks node for a specific address fn get_account_entry( &self, diff --git a/stacks-signer/src/config.rs b/stacks-signer/src/config.rs index e3e647c3d..d2b2de905 100644 --- a/stacks-signer/src/config.rs +++ b/stacks-signer/src/config.rs @@ -361,7 +361,13 @@ pub fn build_signer_config_tomls( let mut signer_config_tomls = vec![]; let mut port = 30000; - for stacks_private_key in stacks_private_keys { + let run_stamp = rand::random::(); + let db_dir = format!( + "/tmp/stacks-node-tests/integrations-signers/{:#X}", + run_stamp, + ); + fs::create_dir_all(&db_dir).unwrap(); + for (ix, stacks_private_key) in stacks_private_keys.iter().enumerate() { let endpoint = format!("localhost:{}", port); port += 1; let stacks_private_key = stacks_private_key.to_hex(); @@ -372,7 +378,7 @@ node_host = "{node_host}" endpoint = "{endpoint}" network = "{network}" auth_password = "{password}" -db_path = ":memory:" +db_path = "{db_dir}/{ix}.sqlite" "# ); diff --git a/stacks-signer/src/main.rs b/stacks-signer/src/main.rs index e9c0af22f..b86941d29 100644 --- a/stacks-signer/src/main.rs +++ b/stacks-signer/src/main.rs @@ -104,8 +104,8 @@ fn process_dkg_result(dkg_res: &[OperationResult]) { assert!(dkg_res.len() == 1, "Received unexpected number of results"); let dkg = dkg_res.first().unwrap(); match dkg { - OperationResult::Dkg(point) => { - println!("Received aggregate group key: {point}"); + OperationResult::Dkg(aggregate_key) => { + println!("Received aggregate group key: {aggregate_key}"); } OperationResult::Sign(signature) => { panic!( @@ -133,8 +133,8 @@ fn process_sign_result(sign_res: &[OperationResult]) { assert!(sign_res.len() == 1, "Received unexpected number of results"); let sign = sign_res.first().unwrap(); match sign { - OperationResult::Dkg(point) => { - panic!("Received unexpected aggregate group key: {point}"); + OperationResult::Dkg(aggregate_key) => { + panic!("Received unexpected aggregate group key: {aggregate_key}"); } OperationResult::Sign(signature) => { panic!( diff --git a/stacks-signer/src/runloop.rs b/stacks-signer/src/runloop.rs index 0d76a36ee..a70340c93 100644 --- a/stacks-signer/src/runloop.rs +++ b/stacks-signer/src/runloop.rs @@ -277,9 +277,14 @@ impl RunLoop { /// Refresh the signer configuration by retrieving the necessary information from the stacks node /// Note: this will trigger DKG if required - fn refresh_signers(&mut self, current_reward_cycle: u64) -> Result<(), ClientError> { + fn refresh_signers( + &mut self, + current_reward_cycle: u64, + _in_prepare_phase: bool, + ) -> Result<(), ClientError> { let next_reward_cycle = current_reward_cycle.saturating_add(1); self.refresh_signer_config(current_reward_cycle, true); + // don't try to refresh the next reward cycle's signer state if there's no state for that cycle yet. self.refresh_signer_config(next_reward_cycle, false); // TODO: do not use an empty consensus hash let pox_consensus_hash = ConsensusHash::empty(); @@ -309,7 +314,7 @@ impl RunLoop { if signer.approved_aggregate_public_key.is_none() { retry_with_exponential_backoff(|| { signer - .update_dkg(&self.stacks_client) + .update_dkg(&self.stacks_client, current_reward_cycle) .map_err(backoff::Error::transient) })?; } @@ -355,16 +360,16 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { self.commands.push_back(cmd); } // TODO: queue events and process them potentially after initialization success (similar to commands)? - let Ok(current_reward_cycle) = retry_with_exponential_backoff(|| { + let Ok((current_reward_cycle, in_prepare_phase)) = retry_with_exponential_backoff(|| { self.stacks_client - .get_current_reward_cycle() + .get_current_reward_cycle_and_prepare_status() .map_err(backoff::Error::transient) }) else { error!("Failed to retrieve current reward cycle"); warn!("Ignoring event: {event:?}"); return None; }; - if let Err(e) = self.refresh_signers(current_reward_cycle) { + if let Err(e) = self.refresh_signers(current_reward_cycle, in_prepare_phase) { if self.state == State::Uninitialized { // If we were never actually initialized, we cannot process anything. Just return. warn!("Failed to initialize signers. Are you sure this signer is correctly registered for the current or next reward cycle?"); @@ -382,7 +387,7 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { Some(SignerEvent::BlockValidationResponse(_)) => Some(current_reward_cycle % 2), // Block proposal events do have reward cycles, but each proposal has its own cycle, // and the vec could be heterogenous, so, don't differentiate. - Some(SignerEvent::ProposedBlocks(_)) => None, + Some(SignerEvent::ProposedBlocks(..)) => None, Some(SignerEvent::SignerMessages(msg_parity, ..)) => { Some(u64::from(msg_parity) % 2) } @@ -421,7 +426,7 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { } } // After processing event, run the next command for each signer - signer.process_next_command(&self.stacks_client); + signer.process_next_command(&self.stacks_client, current_reward_cycle); } None } diff --git a/stacks-signer/src/signer.rs b/stacks-signer/src/signer.rs index 65c32dc1c..599a87526 100644 --- a/stacks-signer/src/signer.rs +++ b/stacks-signer/src/signer.rs @@ -25,7 +25,8 @@ use blockstack_lib::chainstate::stacks::StacksTransaction; use blockstack_lib::net::api::postblock_proposal::BlockValidateResponse; use hashbrown::HashSet; use libsigner::{ - BlockProposalSigners, BlockRejection, BlockResponse, RejectCode, SignerEvent, SignerMessage, + BlockProposalSigners, BlockRejection, BlockResponse, MessageSlotID, RejectCode, SignerEvent, + SignerMessage, }; use serde_derive::{Deserialize, Serialize}; use slog::{slog_debug, slog_error, slog_info, slog_warn}; @@ -164,6 +165,8 @@ pub struct Signer { pub coordinator_selector: CoordinatorSelector, /// The approved key registered to the contract pub approved_aggregate_public_key: Option, + /// The current active miner's key (if we know it!) + pub miner_key: Option, /// Signer DB path pub db_path: PathBuf, /// SignerDB for state management @@ -182,6 +185,28 @@ impl std::fmt::Display for Signer { } } +impl Signer { + /// Return the current coordinator. If in the active reward cycle, this is the miner, + /// so the first element of the tuple will be None (because the miner does not have a signer index). + fn get_coordinator(&self, current_reward_cycle: u64) -> (Option, PublicKey) { + if self.reward_cycle == current_reward_cycle { + let Some(ref cur_miner) = self.miner_key else { + error!( + "Signer #{}: Could not lookup current miner while in active reward cycle", + self.signer_id + ); + let selected = self.coordinator_selector.get_coordinator(); + return (Some(selected.0), selected.1); + }; + // coordinator is the current miner. + (None, cur_miner.clone()) + } else { + let selected = self.coordinator_selector.get_coordinator(); + return (Some(selected.0), selected.1); + } + } +} + impl From for Signer { fn from(signer_config: SignerConfig) -> Self { let stackerdb = StackerDB::from(&signer_config); @@ -249,6 +274,7 @@ impl From for Signer { tx_fee_ustx: signer_config.tx_fee_ustx, coordinator_selector, approved_aggregate_public_key: None, + miner_key: None, db_path: signer_config.db_path.clone(), signer_db, } @@ -355,11 +381,15 @@ impl Signer { } /// Attempt to process the next command in the queue, and update state accordingly - pub fn process_next_command(&mut self, stacks_client: &StacksClient) { - let coordinator_id = self.coordinator_selector.get_coordinator().0; + pub fn process_next_command( + &mut self, + stacks_client: &StacksClient, + current_reward_cycle: u64, + ) { + let coordinator_id = self.get_coordinator(current_reward_cycle).0; match &self.state { State::Idle => { - if coordinator_id != self.signer_id { + if coordinator_id != Some(self.signer_id) { debug!( "{self}: Coordinator is {coordinator_id:?}. Will not process any commands...", ); @@ -387,7 +417,9 @@ impl Signer { stacks_client: &StacksClient, block_validate_response: &BlockValidateResponse, res: Sender>, + current_reward_cycle: u64, ) { + let coordinator_id = self.get_coordinator(current_reward_cycle).0; let mut block_info = match block_validate_response { BlockValidateResponse::Ok(block_validate_ok) => { let signer_signature_hash = block_validate_ok.signer_signature_hash; @@ -458,12 +490,11 @@ impl Signer { msg: Message::NonceRequest(nonce_request), sig: vec![], }; - self.handle_packets(stacks_client, res, &[packet]); + self.handle_packets(stacks_client, res, &[packet], current_reward_cycle); } else { - let coordinator_id = self.coordinator_selector.get_coordinator().0; if block_info.valid.unwrap_or(false) && !block_info.signed_over - && coordinator_id == self.signer_id + && coordinator_id == Some(self.signer_id) { // We are the coordinator. Trigger a signing round for this block debug!( @@ -497,19 +528,22 @@ impl Signer { stacks_client: &StacksClient, res: Sender>, messages: &[SignerMessage], + current_reward_cycle: u64, ) { - let coordinator_pubkey = self.coordinator_selector.get_coordinator().1; + let coordinator_pubkey = self.get_coordinator(current_reward_cycle).1; let packets: Vec = messages .iter() .filter_map(|msg| match msg { - SignerMessage::BlockResponse(_) | SignerMessage::Transactions(_) => None, + SignerMessage::DkgResults { .. } + | SignerMessage::BlockResponse(_) + | SignerMessage::Transactions(_) => None, // TODO: if a signer tries to trigger DKG and we already have one set in the contract, ignore the request. SignerMessage::Packet(packet) => { self.verify_packet(stacks_client, packet.clone(), &coordinator_pubkey) } }) .collect(); - self.handle_packets(stacks_client, res, &packets); + self.handle_packets(stacks_client, res, &packets, current_reward_cycle); } /// Handle proposed blocks submitted by the miners to stackerdb @@ -576,6 +610,7 @@ impl Signer { stacks_client: &StacksClient, res: Sender>, packets: &[Packet], + current_reward_cycle: u64, ) { let signer_outbound_messages = self .signing_round @@ -586,13 +621,18 @@ impl Signer { }); // Next process the message as the coordinator - let (coordinator_outbound_messages, operation_results) = self - .coordinator - .process_inbound_messages(packets) - .unwrap_or_else(|e| { - error!("{self}: Failed to process inbound messages as a coordinator: {e:?}"); - (vec![], vec![]) - }); + let (coordinator_outbound_messages, operation_results) = if self.reward_cycle + != current_reward_cycle + { + self.coordinator + .process_inbound_messages(packets) + .unwrap_or_else(|e| { + error!("{self}: Failed to process inbound messages as a coordinator: {e:?}"); + (vec![], vec![]) + }) + } else { + (vec![], vec![]) + }; if !operation_results.is_empty() { // We have finished a signing or DKG round, either successfully or due to error. @@ -667,47 +707,42 @@ impl Signer { &mut self, stacks_client: &StacksClient, nonce_request: &mut NonceRequest, - ) -> bool { - let Some(block): Option = read_next(&mut &nonce_request.message[..]).ok() + ) -> Option { + let Some(block) = + NakamotoBlock::consensus_deserialize(&mut nonce_request.message.as_slice()).ok() else { // We currently reject anything that is not a block - debug!("{self}: Received a nonce request for an unknown message stream. Reject it.",); - return false; + warn!("{self}: Received a nonce request for an unknown message stream. Reject it.",); + return None; }; let signer_signature_hash = block.header.signer_signature_hash(); - let mut block_info = match self + let Some(mut block_info) = self .signer_db .block_lookup(self.reward_cycle, &signer_signature_hash) .expect("Failed to connect to signer DB") - { - Some(block_info) => block_info, - None => { - debug!("{self}: We have received a block sign request for a block we have not seen before. Cache the nonce request and submit the block for validation..."); - let block_info = BlockInfo::new_with_request(block.clone(), nonce_request.clone()); - self.signer_db - .insert_block(self.reward_cycle, &block_info) - .expect(&format!("{self}: Failed to insert block in DB")); - stacks_client - .submit_block_for_validation(block) - .unwrap_or_else(|e| { - warn!("{self}: Failed to submit block for validation: {e:?}",); - }); - return false; - } + else { + debug!( + "{self}: We have received a block sign request for a block we have not seen before. Cache the nonce request and submit the block for validation..."; + "signer_sighash" => %block.header.signer_signature_hash(), + ); + let block_info = BlockInfo::new_with_request(block.clone(), nonce_request.clone()); + stacks_client + .submit_block_for_validation(block) + .unwrap_or_else(|e| { + warn!("{self}: Failed to submit block for validation: {e:?}",); + }); + return Some(block_info); }; if block_info.valid.is_none() { // We have not yet received validation from the stacks node. Cache the request and wait for validation debug!("{self}: We have yet to receive validation from the stacks node for a nonce request. Cache the nonce request and wait for block validation..."); block_info.nonce_request = Some(nonce_request.clone()); - return false; + return Some(block_info); } self.determine_vote(&mut block_info, nonce_request); - self.signer_db - .insert_block(self.reward_cycle, &block_info) - .expect(&format!("{self}: Failed to insert block in DB")); - true + Some(block_info) } /// Verify the transactions in a block are as expected @@ -854,7 +889,18 @@ impl Signer { } } Message::NonceRequest(request) => { - if !self.validate_nonce_request(stacks_client, request) { + let Some(updated_block_info) = + self.validate_nonce_request(stacks_client, request) + else { + warn!("Failed to validate and parse nonce request"); + return None; + }; + self.signer_db + .insert_block(self.reward_cycle, &updated_block_info) + .expect(&format!("{self}: Failed to insert block in DB")); + let process_request = updated_block_info.vote.is_some(); + if !process_request { + debug!("Failed to validate nonce request"); return None; } } @@ -889,8 +935,8 @@ impl Signer { OperationResult::SignTaproot(_) => { debug!("{self}: Received a signature result for a taproot signature. Nothing to broadcast as we currently sign blocks with a FROST signature."); } - OperationResult::Dkg(dkg_public_key) => { - self.process_dkg(stacks_client, dkg_public_key); + OperationResult::Dkg(aggregate_key) => { + self.process_dkg(stacks_client, aggregate_key); } OperationResult::SignError(e) => { warn!("{self}: Received a Sign error: {e:?}"); @@ -906,6 +952,25 @@ impl Signer { /// Process a dkg result by broadcasting a vote to the stacks node fn process_dkg(&mut self, stacks_client: &StacksClient, dkg_public_key: &Point) { + let mut dkg_results_bytes = vec![]; + if let Err(e) = SignerMessage::serialize_dkg_result( + &mut dkg_results_bytes, + dkg_public_key, + self.coordinator.party_polynomials.iter(), + true, + ) { + error!("{}: Failed to serialize DKGResults message for StackerDB, will continue operating.", self.signer_id; + "error" => %e); + } else { + if let Err(e) = self + .stackerdb + .send_message_bytes_with_retry(&MessageSlotID::DkgResults, dkg_results_bytes) + { + error!("{}: Failed to send DKGResults message to StackerDB, will continue operating.", self.signer_id; + "error" => %e); + } + } + let epoch = retry_with_exponential_backoff(|| { stacks_client .get_node_epoch() @@ -1121,7 +1186,11 @@ impl Signer { } /// Update the DKG for the provided signer info, triggering it if required - pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> { + pub fn update_dkg( + &mut self, + stacks_client: &StacksClient, + current_reward_cycle: u64, + ) -> Result<(), ClientError> { let reward_cycle = self.reward_cycle; self.approved_aggregate_public_key = stacks_client.get_approved_aggregate_key(reward_cycle)?; @@ -1138,8 +1207,8 @@ impl Signer { ); return Ok(()); }; - let coordinator_id = self.coordinator_selector.get_coordinator().0; - if self.signer_id == coordinator_id && self.state == State::Idle { + let coordinator_id = self.get_coordinator(current_reward_cycle).0; + if Some(self.signer_id) == coordinator_id && self.state == State::Idle { debug!("{self}: Checking if old vote transaction exists in StackerDB..."); // Have I already voted and have a pending transaction? Check stackerdb for the same round number and reward cycle vote transaction // Only get the account nonce of THIS signer as we only care about our own votes, not other signer votes @@ -1197,7 +1266,12 @@ impl Signer { match event { Some(SignerEvent::BlockValidationResponse(block_validate_response)) => { debug!("{self}: Received a block proposal result from the stacks node..."); - self.handle_block_validate_response(stacks_client, block_validate_response, res) + self.handle_block_validate_response( + stacks_client, + block_validate_response, + res, + current_reward_cycle, + ) } Some(SignerEvent::SignerMessages(signer_set, messages)) => { if *signer_set != self.stackerdb.get_signer_set() { @@ -1208,18 +1282,26 @@ impl Signer { "{self}: Received {} messages from the other signers...", messages.len() ); - self.handle_signer_messages(stacks_client, res, messages); + self.handle_signer_messages(stacks_client, res, messages, current_reward_cycle); } - Some(SignerEvent::ProposedBlocks(blocks)) => { + Some(SignerEvent::ProposedBlocks(blocks, messages, miner_key)) => { + if let Some(miner_key) = miner_key { + let miner_key = PublicKey::try_from(miner_key.to_bytes_compressed().as_slice()) + .expect("FATAL: could not convert from StacksPublicKey to PublicKey"); + self.miner_key = Some(miner_key); + }; if current_reward_cycle != self.reward_cycle { // There is not point in processing blocks if we are not the current reward cycle (we can never actually contribute to signing these blocks) debug!("{self}: Received a proposed block, but this signer's reward cycle is not the current one ({current_reward_cycle}). Ignoring..."); return Ok(()); } debug!( - "{self}: Received {} block proposals from the miners...", - blocks.len() + "{self}: Received {} block proposals and {} messages from the miner", + blocks.len(), + messages.len(); + "miner_key" => ?miner_key, ); + self.handle_signer_messages(stacks_client, res, messages, current_reward_cycle); self.handle_proposed_blocks(stacks_client, blocks); } Some(SignerEvent::StatusCheck) => { diff --git a/stackslib/src/burnchains/mod.rs b/stackslib/src/burnchains/mod.rs index aa3c83323..ef1474dd0 100644 --- a/stackslib/src/burnchains/mod.rs +++ b/stackslib/src/burnchains/mod.rs @@ -534,6 +534,11 @@ impl PoxConstants { first_block_height + reward_cycle * u64::from(self.reward_cycle_length) + 1 } + pub fn reward_cycle_index(&self, first_block_height: u64, burn_height: u64) -> Option { + let effective_height = burn_height.checked_sub(first_block_height)?; + Some(effective_height % u64::from(self.reward_cycle_length)) + } + pub fn block_height_to_reward_cycle( &self, first_block_height: u64, diff --git a/stackslib/src/chainstate/burn/db/sortdb.rs b/stackslib/src/chainstate/burn/db/sortdb.rs index a18b0355e..7ff0ed7fb 100644 --- a/stackslib/src/chainstate/burn/db/sortdb.rs +++ b/stackslib/src/chainstate/burn/db/sortdb.rs @@ -3468,6 +3468,96 @@ impl SortitionDB { Ok(()) } + pub fn find_first_prepare_phase_sortition( + &self, + from_tip: &SortitionId, + ) -> Result, db_error> { + let from_tip = + SortitionDB::get_block_snapshot(self.conn(), &from_tip)?.ok_or_else(|| { + error!( + "Could not find snapshot for sortition"; + "sortition_id" => %from_tip, + ); + db_error::NotFoundError + })?; + let mut cursor = from_tip; + let mut last = None; + while self + .pox_constants + .is_in_prepare_phase(self.first_block_height, cursor.block_height) + { + let parent = cursor.parent_sortition_id; + last = Some(cursor.sortition_id); + cursor = SortitionDB::get_block_snapshot(self.conn(), &parent)?.ok_or_else(|| { + error!( + "Could not find snapshot for sortition"; + "sortition_id" => %parent, + ); + db_error::NotFoundError + })?; + } + Ok(last) + } + + /// Figure out the reward cycle for `tip` and lookup the preprocessed + /// reward set (if it exists) for the active reward cycle during `tip` + pub fn get_preprocessed_reward_set_of( + &self, + tip: &SortitionId, + ) -> Result, db_error> { + let tip_sn = SortitionDB::get_block_snapshot(self.conn(), tip)?.ok_or_else(|| { + error!( + "Could not find snapshot for sortition while fetching reward set"; + "tip_sortition_id" => %tip, + ); + db_error::NotFoundError + })?; + + let reward_cycle_id = self + .pox_constants + .block_height_to_reward_cycle(self.first_block_height, tip_sn.block_height) + .expect("FATAL: stored snapshot with block height < first_block_height"); + + let prepare_phase_end = self + .pox_constants + .reward_cycle_to_block_height(self.first_block_height, reward_cycle_id) + .saturating_sub(1); + + // find the sortition at height + let prepare_phase_end = + get_ancestor_sort_id(&self.index_conn(), prepare_phase_end, &tip_sn.sortition_id)? + .ok_or_else(|| { + error!( + "Could not find prepare phase end ancestor while fetching reward set"; + "tip_sortition_id" => %tip, + "reward_cycle_id" => reward_cycle_id, + "prepare_phase_end_height" => prepare_phase_end + ); + db_error::NotFoundError + })?; + + let first_sortition = self + .find_first_prepare_phase_sortition(&prepare_phase_end)? + .ok_or_else(|| { + error!( + "Could not find the first prepare phase sortition for the active reward cycle"; + "tip_sortition_id" => %tip, + "reward_cycle_id" => reward_cycle_id, + "prepare_phase_end_sortition_id" => %prepare_phase_end, + ); + db_error::NotFoundError + })?; + + info!("Fetching preprocessed reward set"; + "tip_sortition_id" => %tip, + "reward_cycle_id" => reward_cycle_id, + "prepare_phase_end_sortition_id" => %prepare_phase_end, + "prepare_phase_start_sortition_id" => %first_sortition, + ); + + Self::get_preprocessed_reward_set(self.conn(), &first_sortition) + } + /// Get a pre-processed reawrd set. /// `sortition_id` is the first sortition ID of the prepare phase. pub fn get_preprocessed_reward_set( diff --git a/stackslib/src/chainstate/nakamoto/miner.rs b/stackslib/src/chainstate/nakamoto/miner.rs index 961fd32db..33ee26536 100644 --- a/stackslib/src/chainstate/nakamoto/miner.rs +++ b/stackslib/src/chainstate/nakamoto/miner.rs @@ -60,7 +60,7 @@ use crate::chainstate::stacks::db::{ }; use crate::chainstate::stacks::events::{StacksTransactionEvent, StacksTransactionReceipt}; use crate::chainstate::stacks::miner::{ - BlockBuilder, BlockBuilderSettings, BlockLimitFunction, TransactionError, + BlockBuilder, BlockBuilderSettings, BlockLimitFunction, TransactionError, TransactionEvent, TransactionProblematic, TransactionResult, TransactionSkipped, }; use crate::chainstate::stacks::{Error, StacksBlockHeader, *}; @@ -406,7 +406,7 @@ impl NakamotoBlockBuilder { settings: BlockBuilderSettings, event_observer: Option<&dyn MemPoolEventDispatcher>, signer_transactions: Vec, - ) -> Result<(NakamotoBlock, ExecutionCost, u64), Error> { + ) -> Result<(NakamotoBlock, ExecutionCost, u64, Vec), Error> { let (tip_consensus_hash, tip_block_hash, tip_height) = ( parent_stacks_header.consensus_hash.clone(), parent_stacks_header.anchored_header.block_hash(), @@ -485,16 +485,6 @@ impl NakamotoBlockBuilder { let ts_end = get_epoch_time_ms(); - if let Some(observer) = event_observer { - observer.mined_nakamoto_block_event( - SortitionDB::get_canonical_burn_chain_tip(burn_dbconn.conn())?.block_height + 1, - &block, - size, - &consumed, - tx_events, - ); - } - set_last_mined_block_transaction_count(block.txs.len() as u64); set_last_mined_execution_cost_observed(&consumed, &block_limit); @@ -511,7 +501,7 @@ impl NakamotoBlockBuilder { "assembly_time_ms" => ts_end.saturating_sub(ts_start), ); - Ok((block, consumed, size)) + Ok((block, consumed, size, tx_events)) } pub fn get_bytes_so_far(&self) -> u64 { @@ -533,10 +523,13 @@ impl NakamotoBlockBuilder { miners_contract_id: &QualifiedContractIdentifier, ) -> Result, Error> { let miner_pubkey = StacksPublicKey::from_private(&miner_privkey); - let Some(slot_id) = NakamotoChainState::get_miner_slot(sortdb, tip, &miner_pubkey)? else { + let Some(slot_range) = NakamotoChainState::get_miner_slot(sortdb, tip, &miner_pubkey)? + else { // No slot exists for this miner return Ok(None); }; + // proposal slot is the first slot. + let slot_id = slot_range.start; // Get the LAST slot version number written to the DB. If not found, use 0. // Add 1 to get the NEXT version number // Note: we already check above for the slot's existence diff --git a/stackslib/src/chainstate/nakamoto/mod.rs b/stackslib/src/chainstate/nakamoto/mod.rs index fb6b3fea3..2316aaaaf 100644 --- a/stackslib/src/chainstate/nakamoto/mod.rs +++ b/stackslib/src/chainstate/nakamoto/mod.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs; -use std::ops::{Deref, DerefMut}; +use std::ops::{Deref, DerefMut, Range}; use std::path::PathBuf; use clarity::vm::ast::ASTRules; @@ -3154,13 +3154,13 @@ impl NakamotoChainState { let signers = miner_key_hash160s .into_iter() .map(|hash160| - // each miner gets one slot + // each miner gets two slots ( StacksAddress { version: 1, // NOTE: the version is ignored in stackerdb; we only care about the hashbytes bytes: hash160 }, - 1 + 2 )) .collect(); @@ -3174,36 +3174,34 @@ impl NakamotoChainState { }) } - /// Get the slot number for the given miner's public key. - /// Returns Some(u32) if the miner is in the StackerDB config. + /// Get the slot range for the given miner's public key. + /// Returns Some(Range) if the miner is in the StackerDB config, where the range of slots for the miner is [start, end). + /// i.e., inclusive of `start`, exclusive of `end`. /// Returns None if the miner is not in the StackerDB config. /// Returns an error if the miner is in the StackerDB config but the slot number is invalid. pub fn get_miner_slot( sortdb: &SortitionDB, tip: &BlockSnapshot, miner_pubkey: &StacksPublicKey, - ) -> Result, ChainstateError> { + ) -> Result>, ChainstateError> { let miner_hash160 = Hash160::from_node_public_key(&miner_pubkey); let stackerdb_config = Self::make_miners_stackerdb_config(sortdb, &tip)?; // find out which slot we're in - let Some(slot_id_res) = - stackerdb_config - .signers - .iter() - .enumerate() - .find_map(|(i, (addr, _))| { - if addr.bytes == miner_hash160 { - Some(u32::try_from(i).map_err(|_| { - CodecError::OverflowError( - "stackerdb config slot ID cannot fit into u32".into(), - ) - })) - } else { - None - } - }) - else { + let mut slot_index = 0; + let mut slot_id_result = None; + for (addr, slot_count) in stackerdb_config.signers.iter() { + if addr.bytes == miner_hash160 { + slot_id_result = Some(Range { + start: slot_index, + end: slot_index + slot_count, + }); + break; + } + slot_index += slot_count; + } + + let Some(slot_id_range) = slot_id_result else { // miner key does not match any slot warn!("Miner is not in the miners StackerDB config"; "miner" => %miner_hash160, @@ -3211,7 +3209,7 @@ impl NakamotoChainState { return Ok(None); }; - Ok(Some(slot_id_res?)) + Ok(Some(slot_id_range)) } /// Boot code instantiation for the aggregate public key. diff --git a/stackslib/src/net/rpc.rs b/stackslib/src/net/rpc.rs index e2f93d728..275c26de7 100644 --- a/stackslib/src/net/rpc.rs +++ b/stackslib/src/net/rpc.rs @@ -553,7 +553,7 @@ impl ConversationHttp { self.handle_request(req, node) })?; - info!("Handled StacksHTTPRequest"; + debug!("Handled StacksHTTPRequest"; "verb" => %verb, "path" => %request_path, "processing_time_ms" => start_time.elapsed().as_millis(), diff --git a/stackslib/src/net/stackerdb/db.rs b/stackslib/src/net/stackerdb/db.rs index a1b0db94e..6cdebb69d 100644 --- a/stackslib/src/net/stackerdb/db.rs +++ b/stackslib/src/net/stackerdb/db.rs @@ -28,6 +28,7 @@ use stacks_common::util::get_epoch_time_secs; use stacks_common::util::hash::Sha512Trunc256Sum; use stacks_common::util::secp256k1::MessageSignature; +use super::StackerDBEventDispatcher; use crate::chainstate::stacks::address::PoxAddress; use crate::net::stackerdb::{StackerDBConfig, StackerDBTx, StackerDBs, STACKERDB_INV_MAX}; use crate::net::{Error as net_error, StackerDBChunkData, StackerDBHandshakeData}; @@ -387,6 +388,20 @@ impl<'a> StackerDBTx<'a> { Ok(()) } + /// Try to upload a chunk to the StackerDB instance, notifying + /// and subscribed listeners via the `dispatcher` + pub fn put_chunk( + self, + contract: &QualifiedContractIdentifier, + chunk: StackerDBChunkData, + dispatcher: &ED, + ) -> Result<(), net_error> { + self.try_replace_chunk(contract, &chunk.get_slot_metadata(), &chunk.data)?; + self.commit()?; + dispatcher.new_stackerdb_chunks(contract.clone(), vec![chunk]); + Ok(()) + } + /// Add or replace a chunk for a given reward cycle, if it is valid /// Otherwise, this errors out with Error::StaleChunk pub fn try_replace_chunk( diff --git a/testnet/stacks-node/src/event_dispatcher.rs b/testnet/stacks-node/src/event_dispatcher.rs index 7b8e4108c..ffc9c6df7 100644 --- a/testnet/stacks-node/src/event_dispatcher.rs +++ b/testnet/stacks-node/src/event_dispatcher.rs @@ -1,5 +1,7 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::Mutex; use std::thread::sleep; use std::time::Duration; @@ -10,6 +12,7 @@ use clarity::vm::costs::ExecutionCost; use clarity::vm::events::{FTEventType, NFTEventType, STXEventType}; use clarity::vm::types::{AssetIdentifier, QualifiedContractIdentifier, Value}; use http_types::{Method, Request, Url}; +use lazy_static::lazy_static; use serde_json::json; use stacks::burnchains::{PoxConstants, Txid}; use stacks::chainstate::burn::operations::BlockstackOperationType; @@ -20,7 +23,7 @@ use stacks::chainstate::stacks::address::PoxAddress; use stacks::chainstate::stacks::boot::RewardSetData; use stacks::chainstate::stacks::db::accounts::MinerReward; use stacks::chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState; -use stacks::chainstate::stacks::db::{MinerRewardInfo, StacksHeaderInfo}; +use stacks::chainstate::stacks::db::{MinerRewardInfo, StacksBlockHeaderTypes, StacksHeaderInfo}; use stacks::chainstate::stacks::events::{ StackerDBChunksEvent, StacksBlockEventData, StacksTransactionEvent, StacksTransactionReceipt, TransactionOrigin, @@ -39,7 +42,8 @@ use stacks::net::stackerdb::StackerDBEventDispatcher; use stacks_common::bitvec::BitVec; use stacks_common::codec::StacksMessageCodec; use stacks_common::types::chainstate::{BlockHeaderHash, BurnchainHeaderHash, StacksBlockId}; -use stacks_common::util::hash::bytes_to_hex; +use stacks_common::util::hash::{bytes_to_hex, Sha512Trunc256Sum}; +use stacks_common::util::secp256k1::MessageSignature; use super::config::{EventKeyType, EventObserverConfig}; @@ -74,6 +78,21 @@ pub const PATH_BLOCK_PROCESSED: &str = "new_block"; pub const PATH_ATTACHMENT_PROCESSED: &str = "attachments/new"; pub const PATH_PROPOSAL_RESPONSE: &str = "proposal_response"; +lazy_static! { + pub static ref STACKER_DB_CHANNEL: StackerDBChannel = StackerDBChannel::new(); +} + +/// This struct receives StackerDB event callbacks without registering +/// over the JSON/RPC interface. To ensure that any event observer +/// uses the same channel, we use a lazy_static global for the channel. +/// +/// This channel (currently) only supports receiving events on the +/// boot .signers-* contracts. +pub struct StackerDBChannel { + pub receiver: Mutex>>, + pub sender: Sender, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct MinedBlockEvent { pub target_burn_height: u64, @@ -102,10 +121,46 @@ pub struct MinedNakamotoBlockEvent { pub stacks_height: u64, pub block_size: u64, pub cost: ExecutionCost, + pub miner_signature: MessageSignature, + pub signer_signature_hash: Sha512Trunc256Sum, pub tx_events: Vec, pub signer_bitvec: String, } +impl StackerDBChannel { + pub fn new() -> Self { + let (sender, recv_channel) = std::sync::mpsc::channel(); + Self { + receiver: Mutex::new(Some(recv_channel)), + sender, + } + } + + pub fn replace_receiver(&self, receiver: Receiver) { + let mut guard = self + .receiver + .lock() + .expect("FATAL: poisoned StackerDBChannel lock"); + guard.replace(receiver); + } + + pub fn take_receiver(&self) -> Option> { + self.receiver + .lock() + .expect("FATAL: poisoned StackerDBChannel lock") + .take() + } + + /// Is there a thread holding the receiver? + pub fn is_active(&self) -> bool { + // if the receiver field is empty (i.e., None), then a thread must have taken it. + self.receiver + .lock() + .expect("FATAL: poisoned StackerDBChannel lock") + .is_none() + } +} + impl EventObserver { pub fn send_payload(&self, payload: &serde_json::Value, path: &str) { let body = match serde_json::to_vec(&payload) { @@ -436,24 +491,45 @@ impl EventObserver { "pox_v3_unlock_height": pox_constants.v3_unlock_height, }); + let as_object_mut = payload.as_object_mut().unwrap(); + if let Some(signer_bitvec) = signer_bitvec_opt { - payload.as_object_mut().unwrap().insert( + as_object_mut.insert( "signer_bitvec".to_string(), serde_json::to_value(signer_bitvec).unwrap_or_default(), ); } if let Some(reward_set_data) = reward_set_data { - payload.as_object_mut().unwrap().insert( + as_object_mut.insert( "reward_set".to_string(), serde_json::to_value(&reward_set_data.reward_set).unwrap_or_default(), ); - payload.as_object_mut().unwrap().insert( + as_object_mut.insert( "cycle_number".to_string(), serde_json::to_value(reward_set_data.cycle_number).unwrap_or_default(), ); } + if let StacksBlockHeaderTypes::Nakamoto(ref header) = &metadata.anchored_header { + as_object_mut.insert( + "signer_signature_hash".into(), + format!("0x{}", header.signer_signature_hash()).into(), + ); + as_object_mut.insert( + "signer_signature".into(), + format!("0x{}", header.signer_signature_hash()).into(), + ); + as_object_mut.insert( + "miner_signature".into(), + format!("0x{}", &header.miner_signature).into(), + ); + as_object_mut.insert( + "signer_signature".into(), + format!("0x{}", &header.signer_signature).into(), + ); + } + payload } } @@ -1051,6 +1127,8 @@ impl EventDispatcher { block_size: block_size_bytes, cost: consumed.clone(), tx_events, + miner_signature: block.header.miner_signature.clone(), + signer_signature_hash: block.header.signer_signature_hash(), signer_bitvec, }) .unwrap(); @@ -1065,19 +1143,30 @@ impl EventDispatcher { pub fn process_new_stackerdb_chunks( &self, contract_id: QualifiedContractIdentifier, - new_chunks: Vec, + modified_slots: Vec, ) { let interested_observers = self.filter_observers(&self.stackerdb_observers_lookup, false); - if interested_observers.len() < 1 { + let interested_receiver = STACKER_DB_CHANNEL.is_active(); + if interested_observers.is_empty() && !interested_receiver { return; } - let payload = serde_json::to_value(StackerDBChunksEvent { + let event = StackerDBChunksEvent { contract_id, - modified_slots: new_chunks, - }) - .expect("FATAL: failed to serialize StackerDBChunksEvent to JSON"); + modified_slots, + }; + let payload = serde_json::to_value(&event) + .expect("FATAL: failed to serialize StackerDBChunksEvent to JSON"); + + if interested_receiver { + if let Err(send_err) = STACKER_DB_CHANNEL.sender.send(event) { + error!( + "Failed to send StackerDB event to WSTS coordinator channel. Miner thread may have crashed."; + "err" => ?send_err + ); + } + } for observer in interested_observers.iter() { observer.send_stackerdb_chunks(&payload); diff --git a/testnet/stacks-node/src/nakamoto_node.rs b/testnet/stacks-node/src/nakamoto_node.rs index 302382f17..7b7fb32a6 100644 --- a/testnet/stacks-node/src/nakamoto_node.rs +++ b/testnet/stacks-node/src/nakamoto_node.rs @@ -39,6 +39,7 @@ use crate::run_loop::RegisteredKey; pub mod miner; pub mod peer; pub mod relayer; +pub mod sign_coordinator; use self::peer::PeerThread; use self::relayer::{RelayerDirective, RelayerThread}; @@ -94,7 +95,11 @@ pub enum Error { CannotSelfSign, MiningFailure(ChainstateError), MinerSignatureError(&'static str), - SignerSignatureError(&'static str), + SignerSignatureError(String), + /// A failure occurred while configuring the miner thread + MinerConfigurationFailed(&'static str), + /// An error occurred while operating as the signing coordinator + SigningCoordinatorFailure(String), // The thread that we tried to send to has closed ChannelClosed, } diff --git a/testnet/stacks-node/src/nakamoto_node/miner.rs b/testnet/stacks-node/src/nakamoto_node/miner.rs index 088299083..faede01c7 100644 --- a/testnet/stacks-node/src/nakamoto_node/miner.rs +++ b/testnet/stacks-node/src/nakamoto_node/miner.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation // Copyright (C) 2020-2023 Stacks Open Internet Foundation // @@ -14,6 +13,7 @@ use std::collections::HashMap; // // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use std::collections::HashMap; use std::thread; use std::thread::JoinHandle; use std::time::{Duration, Instant}; @@ -22,16 +22,13 @@ use clarity::boot_util::boot_code_id; use clarity::vm::clarity::ClarityConnection; use clarity::vm::types::{PrincipalData, QualifiedContractIdentifier}; use hashbrown::HashSet; -use libsigner::{ - BlockProposalSigners, BlockResponse, RejectCode, SignerMessage, SignerSession, - StackerDBSession, BLOCK_MSG_ID, TRANSACTIONS_MSG_ID, -}; +use libsigner::{BlockProposalSigners, MessageSlotID, SignerMessage}; use stacks::burnchains::{Burnchain, BurnchainParameters}; use stacks::chainstate::burn::db::sortdb::SortitionDB; use stacks::chainstate::burn::{BlockSnapshot, ConsensusHash}; use stacks::chainstate::nakamoto::miner::{NakamotoBlockBuilder, NakamotoTenureInfo}; use stacks::chainstate::nakamoto::signer_set::NakamotoSigners; -use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockVote, NakamotoChainState}; +use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState}; use stacks::chainstate::stacks::boot::MINERS_NAME; use stacks::chainstate::stacks::db::{StacksChainState, StacksHeaderInfo}; use stacks::chainstate::stacks::{ @@ -39,15 +36,17 @@ use stacks::chainstate::stacks::{ TenureChangeCause, TenureChangePayload, ThresholdSignature, TransactionAnchorMode, TransactionPayload, TransactionVersion, }; -use stacks::net::stackerdb::StackerDBs; -use stacks_common::codec::{read_next, StacksMessageCodec}; +use stacks::net::stackerdb::{StackerDBConfig, StackerDBs}; +use stacks_common::codec::read_next; use stacks_common::types::chainstate::{StacksAddress, StacksBlockId}; use stacks_common::types::{PrivateKey, StacksEpochId}; -use stacks_common::util::hash::{Hash160, Sha512Trunc256Sum}; +use stacks_common::util::hash::Hash160; use stacks_common::util::vrf::VRFProof; use wsts::curve::point::Point; +use wsts::curve::scalar::Scalar; use super::relayer::RelayerThread; +use super::sign_coordinator::SignCoordinator; use super::{Config, Error as NakamotoNodeError, EventDispatcher, Keychain}; use crate::nakamoto_node::VRF_MOCK_MINER_KEY; use crate::run_loop::nakamoto::Globals; @@ -59,6 +58,7 @@ use crate::{neon_node, ChainTip}; const ABORT_TRY_AGAIN_MS: u64 = 200; /// If the signers have not responded to a block proposal, how long should /// the miner thread sleep before trying again? +#[allow(unused)] const WAIT_FOR_SIGNERS_MS: u64 = 200; pub enum MinerDirective { @@ -138,6 +138,36 @@ impl BlockMinerThread { globals.unblock_miner(); } + fn make_miners_stackerdb_config( + &mut self, + stackerdbs: &mut StackerDBs, + ) -> Result { + let mut chain_state = neon_node::open_chainstate_with_faults(&self.config) + .expect("FATAL: could not open chainstate DB"); + let burn_db_path = self.config.get_burn_db_file_path(); + let sort_db = SortitionDB::open(&burn_db_path, true, self.burnchain.pox_constants.clone()) + .expect("FATAL: could not open sortition DB"); + let mut stacker_db_configs = HashMap::with_capacity(1); + let miner_contract = boot_code_id(MINERS_NAME, self.config.is_mainnet()); + stacker_db_configs.insert(miner_contract.clone(), StackerDBConfig::noop()); + let mut miners_only_config = stackerdbs + .create_or_reconfigure_stackerdbs(&mut chain_state, &sort_db, stacker_db_configs) + .map_err(|e| { + error!( + "Failed to configure .miners stackerdbs"; + "err" => ?e, + ); + NakamotoNodeError::MinerConfigurationFailed( + "Could not setup .miners stackerdbs configuration", + ) + })?; + miners_only_config.remove(&miner_contract).ok_or_else(|| { + NakamotoNodeError::MinerConfigurationFailed( + "Did not return .miners stackerdb configuration after setup", + ) + }) + } + pub fn run_miner(mut self, prior_miner: Option>) { // when starting a new tenure, block the mining thread if its currently running. // the new mining thread will join it (so that the new mining thread stalls, not the relayer) @@ -150,13 +180,10 @@ impl BlockMinerThread { if let Some(prior_miner) = prior_miner { Self::stop_miner(&self.globals, prior_miner); } - let miners_contract_id = boot_code_id(MINERS_NAME, self.config.is_mainnet()); - let stackerdbs = StackerDBs::connect(&self.config.get_stacker_db_file_path(), true) + let mut stackerdbs = StackerDBs::connect(&self.config.get_stacker_db_file_path(), true) .expect("FATAL: failed to connect to stacker DB"); - let Some(miner_privkey) = self.config.miner.mining_key else { - warn!("No mining key configured, cannot mine"); - return; - }; + + let mut attempts = 0; // now, actually run this tenure loop { let new_block = loop { @@ -182,67 +209,35 @@ impl BlockMinerThread { } }; - let sort_db = SortitionDB::open( - &self.config.get_burn_db_file_path(), - true, - self.burnchain.pox_constants.clone(), - ) - .expect("FATAL: could not open sortition DB"); - let tip = SortitionDB::get_canonical_burn_chain_tip(sort_db.conn()) - .expect("FATAL: could not retrieve chain tip"); - let reward_cycle = self - .burnchain - .pox_constants - .block_height_to_reward_cycle( - self.burnchain.first_block_height, - self.burn_block.block_height, - ) - .expect("FATAL: building on a burn block that is before the first burn block"); - if let Some(new_block) = new_block { - let proposal_msg = BlockProposalSigners { - block: new_block.clone(), - burn_height: self.burn_block.block_height, - reward_cycle, - }; - let proposal = match NakamotoBlockBuilder::make_stackerdb_block_proposal( - &sort_db, - &tip, - &stackerdbs, - &proposal_msg, - &miner_privkey, - &miners_contract_id, - ) { - Ok(Some(chunk)) => chunk, - Ok(None) => { - warn!("Failed to propose block to stackerdb: no slot available"); - continue; - } - Err(e) => { - warn!("Failed to propose block to stackerdb: {e:?}"); - continue; - } + if let Some(mut new_block) = new_block { + let Ok(stackerdb_config) = self.make_miners_stackerdb_config(&mut stackerdbs) + else { + warn!("Failed to setup stackerdb to propose block, will try mining again"); + continue; }; - // Propose the block to the observing signers through the .miners stackerdb instance - let miner_contract_id = boot_code_id(MINERS_NAME, self.config.is_mainnet()); - let mut miners_stackerdb = - StackerDBSession::new(&self.config.node.rpc_bind, miner_contract_id); - match miners_stackerdb.put_chunk(&proposal) { - Ok(ack) => { - info!("Proposed block to stackerdb: {ack:?}"); - } - Err(e) => { - warn!("Failed to propose block to stackerdb {e:?}"); - return; - } + if let Err(e) = self.propose_block(&new_block, &mut stackerdbs, &stackerdb_config) { + error!("Unrecoverable error while proposing block to signer set: {e:?}. Ending tenure."); + return; } - self.globals.counters.bump_naka_proposed_blocks(); + let (aggregate_public_key, signers_signature) = match self.coordinate_signature( + &new_block, + &mut stackerdbs, + &stackerdb_config, + &mut attempts, + ) { + Ok(x) => x, + Err(e) => { + error!("Unrecoverable error while proposing block to signer set: {e:?}. Ending tenure."); + return; + } + }; - if let Err(e) = - self.wait_for_signer_signature_and_broadcast(&stackerdbs, new_block.clone()) - { - warn!("Error broadcasting block: {e:?}"); + new_block.header.signer_signature = signers_signature; + if let Err(e) = self.broadcast(new_block.clone(), &aggregate_public_key) { + warn!("Error accepting own block: {e:?}. Will try mining again."); + continue; } else { info!( "Miner: Block signed by signer set and broadcasted"; @@ -263,6 +258,12 @@ impl BlockMinerThread { self.mined_blocks.push(new_block); } + let sort_db = SortitionDB::open( + &self.config.get_burn_db_file_path(), + true, + self.burnchain.pox_constants.clone(), + ) + .expect("FATAL: could not open sortition DB"); let wait_start = Instant::now(); while wait_start.elapsed() < self.config.miner.wait_on_interim_blocks { thread::sleep(Duration::from_millis(ABORT_TRY_AGAIN_MS)); @@ -273,24 +274,226 @@ impl BlockMinerThread { } } + fn coordinate_signature( + &mut self, + new_block: &NakamotoBlock, + stackerdbs: &mut StackerDBs, + stackerdb_config: &StackerDBConfig, + attempts: &mut u64, + ) -> Result<(Point, ThresholdSignature), NakamotoNodeError> { + let Some(miner_privkey) = self.config.miner.mining_key else { + return Err(NakamotoNodeError::MinerConfigurationFailed( + "No mining key configured, cannot mine", + )); + }; + let sort_db = SortitionDB::open( + &self.config.get_burn_db_file_path(), + true, + self.burnchain.pox_constants.clone(), + ) + .expect("FATAL: could not open sortition DB"); + let tip = SortitionDB::get_block_snapshot_consensus( + sort_db.conn(), + &new_block.header.consensus_hash, + ) + .expect("FATAL: could not retrieve chain tip") + .expect("FATAL: could not retrieve chain tip"); + let reward_cycle = self + .burnchain + .pox_constants + .block_height_to_reward_cycle( + self.burnchain.first_block_height, + self.burn_block.block_height, + ) + .expect("FATAL: building on a burn block that is before the first burn block"); + + let reward_info = match sort_db.get_preprocessed_reward_set_of(&tip.sortition_id) { + Ok(Some(x)) => x, + Ok(None) => { + return Err(NakamotoNodeError::SigningCoordinatorFailure( + "No reward set found. Cannot initialize miner coordinator.".into(), + )); + } + Err(e) => { + return Err(NakamotoNodeError::SigningCoordinatorFailure(format!( + "Failure while fetching reward set. Cannot initialize miner coordinator. {e:?}" + ))); + } + }; + + let Some(reward_set) = reward_info.known_selected_anchor_block_owned() else { + return Err(NakamotoNodeError::SigningCoordinatorFailure( + "Current reward cycle did not select a reward set. Cannot mine!".into(), + )); + }; + + let mut chain_state = neon_node::open_chainstate_with_faults(&self.config) + .expect("FATAL: could not open chainstate DB"); + let sortition_handle = sort_db.index_handle_at_tip(); + let Ok(aggregate_public_key) = NakamotoChainState::get_aggregate_public_key( + &mut chain_state, + &sort_db, + &sortition_handle, + &new_block, + ) else { + return Err(NakamotoNodeError::SigningCoordinatorFailure( + "Failed to obtain the active aggregate public key. Cannot mine!".into(), + )); + }; + + #[cfg(test)] + { + // In test mode, short-circuit spinning up the SignCoordinator if the TEST_SIGNING + // channel has been created. This allows integration tests for the stacks-node + // independent of the stacks-signer. + let mut signer = crate::tests::nakamoto_integrations::TEST_SIGNING + .lock() + .unwrap(); + if signer.as_ref().is_some() { + let sign_channels = signer.as_mut().unwrap(); + let recv = sign_channels.recv.take().unwrap(); + drop(signer); // drop signer so we don't hold the lock while receiving. + let signature = recv.recv_timeout(Duration::from_secs(30)).unwrap(); + let overwritten = crate::tests::nakamoto_integrations::TEST_SIGNING + .lock() + .unwrap() + .as_mut() + .unwrap() + .recv + .replace(recv); + assert!(overwritten.is_none()); + return Ok((aggregate_public_key, signature)); + } + } + + let miner_privkey_as_scalar = Scalar::from(miner_privkey.as_slice().clone()); + let mut coordinator = SignCoordinator::new( + &reward_set, + reward_cycle, + miner_privkey_as_scalar, + aggregate_public_key, + self.config.is_mainnet(), + &stackerdbs, + stackerdb_config.clone(), + &self.config, + ) + .map_err(|e| { + NakamotoNodeError::SigningCoordinatorFailure(format!( + "Failed to initialize the signing coordinator. Cannot mine! {e:?}" + )) + })?; + + *attempts += 1; + let signature = coordinator.begin_sign( + new_block, + *attempts, + &tip, + &self.burnchain, + &sort_db, + stackerdbs, + &self.event_dispatcher, + )?; + + Ok((aggregate_public_key, signature)) + } + + fn propose_block( + &mut self, + new_block: &NakamotoBlock, + stackerdbs: &mut StackerDBs, + stackerdb_config: &StackerDBConfig, + ) -> Result<(), NakamotoNodeError> { + let miners_contract_id = boot_code_id(MINERS_NAME, self.config.is_mainnet()); + let Some(miner_privkey) = self.config.miner.mining_key else { + return Err(NakamotoNodeError::MinerConfigurationFailed( + "No mining key configured, cannot mine", + )); + }; + let sort_db = SortitionDB::open( + &self.config.get_burn_db_file_path(), + true, + self.burnchain.pox_constants.clone(), + ) + .expect("FATAL: could not open sortition DB"); + let tip = SortitionDB::get_block_snapshot_consensus( + sort_db.conn(), + &new_block.header.consensus_hash, + ) + .expect("FATAL: could not retrieve chain tip") + .expect("FATAL: could not retrieve chain tip"); + let reward_cycle = self + .burnchain + .pox_constants + .block_height_to_reward_cycle( + self.burnchain.first_block_height, + self.burn_block.block_height, + ) + .expect("FATAL: building on a burn block that is before the first burn block"); + + let proposal_msg = BlockProposalSigners { + block: new_block.clone(), + burn_height: self.burn_block.block_height, + reward_cycle, + }; + let proposal = match NakamotoBlockBuilder::make_stackerdb_block_proposal( + &sort_db, + &tip, + &stackerdbs, + &proposal_msg, + &miner_privkey, + &miners_contract_id, + ) { + Ok(Some(chunk)) => chunk, + Ok(None) => { + warn!("Failed to propose block to stackerdb: no slot available"); + return Ok(()); + } + Err(e) => { + warn!("Failed to propose block to stackerdb: {e:?}"); + return Ok(()); + } + }; + + // Propose the block to the observing signers through the .miners stackerdb instance + let miner_contract_id = boot_code_id(MINERS_NAME, self.config.is_mainnet()); + let Ok(stackerdb_tx) = stackerdbs.tx_begin(stackerdb_config.clone()) else { + warn!("Failed to begin stackerdbs transaction to write block proposal, will try mining again"); + return Ok(()); + }; + + match stackerdb_tx.put_chunk(&miner_contract_id, proposal, &self.event_dispatcher) { + Ok(()) => { + info!( + "Proposed block to stackerdb"; + "signer_sighash" => %new_block.header.signer_signature_hash() + ); + } + Err(e) => { + return Err(NakamotoNodeError::SigningCoordinatorFailure(format!( + "Failed to propose block to stackerdb {e:?}" + ))); + } + } + + self.globals.counters.bump_naka_proposed_blocks(); + Ok(()) + } + fn get_stackerdb_contract_and_slots( &self, stackerdbs: &StackerDBs, - msg_id: u32, + msg_id: &MessageSlotID, reward_cycle: u64, ) -> Result<(QualifiedContractIdentifier, HashMap), NakamotoNodeError> { let stackerdb_contracts = stackerdbs .get_stackerdb_contract_ids() .expect("FATAL: could not get the stacker DB contract ids"); - let signers_contract_id = NakamotoSigners::make_signers_db_contract_id( - reward_cycle, - msg_id, - self.config.is_mainnet(), - ); + let signers_contract_id = + msg_id.stacker_db_contract(self.config.is_mainnet(), reward_cycle); if !stackerdb_contracts.contains(&signers_contract_id) { return Err(NakamotoNodeError::SignerSignatureError( - "No signers contract found, cannot wait for signers", + "No signers contract found, cannot wait for signers".into(), )); }; // Get the slots for every signer @@ -325,7 +528,7 @@ impl BlockMinerThread { .wrapping_add(1); let (signers_contract_id, slot_ids_addresses) = self.get_stackerdb_contract_and_slots( stackerdbs, - TRANSACTIONS_MSG_ID, + &MessageSlotID::Transactions, next_reward_cycle, )?; let slot_ids = slot_ids_addresses.keys().cloned().collect::>(); @@ -389,135 +592,10 @@ impl BlockMinerThread { Ok(filtered_transactions.into_values().collect()) } - fn wait_for_signer_signature( + fn broadcast( &self, - sortdb: &SortitionDB, - stackerdbs: &StackerDBs, + block: NakamotoBlock, aggregate_public_key: &Point, - signer_signature_hash: &Sha512Trunc256Sum, - signer_weights: HashMap, - reward_cycle: u64, - ) -> Result { - let (signers_contract_id, slot_ids_addresses) = - self.get_stackerdb_contract_and_slots(stackerdbs, BLOCK_MSG_ID, reward_cycle)?; - let slot_ids = slot_ids_addresses.keys().cloned().collect::>(); - // If more than a threshold percentage of the signers reject the block, we should not wait any further - let weights: u64 = signer_weights.values().sum(); - let rejection_threshold: u64 = (weights as f64 * 7_f64 / 10_f64).ceil() as u64; - let mut rejections = HashSet::new(); - let mut rejections_weight: u64 = 0; - let now = Instant::now(); - debug!("Miner: waiting for block response from reward cycle {reward_cycle } signers..."); - while now.elapsed() < self.config.miner.wait_on_signers { - if self.check_burn_tip_changed(&sortdb).is_err() { - info!("Miner: burnchain tip changed while waiting for signer signature."); - return Err(NakamotoNodeError::BurnchainTipChanged); - } - // Get the block responses from the signers for the block we just proposed - debug!("Miner: retreiving latest signer messsages"; - "signers_contract_id" => %signers_contract_id, - "slot_ids" => ?slot_ids, - ); - let signer_chunks = stackerdbs - .get_latest_chunks(&signers_contract_id, &slot_ids) - .expect("FATAL: could not get latest chunks from stacker DB"); - let signer_messages: Vec<(u32, SignerMessage)> = slot_ids - .iter() - .zip(signer_chunks.into_iter()) - .filter_map(|(slot_id, chunk)| { - chunk.and_then(|chunk| { - read_next::(&mut &chunk[..]) - .ok() - .map(|msg| (*slot_id, msg)) - }) - }) - .collect(); - debug!("Miner: retrieved {} signer messages", signer_messages.len()); - for (signer_id, signer_message) in signer_messages { - match signer_message { - SignerMessage::BlockResponse(BlockResponse::Accepted((hash, signature))) => { - // First check that this signature is for the block we proposed and that it is valid - if hash == *signer_signature_hash - && signature - .0 - .verify(aggregate_public_key, &signer_signature_hash.0) - { - // The signature is valid across the signer signature hash of the original proposed block - // Immediately return and update the block with this new signature before appending it to the chain - info!("Miner: received a signature accross the proposed block's signer signature hash ({signer_signature_hash:?}): {signature:?}"); - return Ok(signature); - } - // We received an accepted block for some unknown block hash...Useless! Ignore it. - // Keep waiting for a threshold number of signers to either reject the proposed block - // or return valid signature to show up across the proposed block - debug!("Miner: received a signature for an unknown block hash: {hash:?}. Ignoring it."); - } - SignerMessage::BlockResponse(BlockResponse::Rejected(block_rejection)) => { - // First check that this block rejection is for the block we proposed - if block_rejection.signer_signature_hash != *signer_signature_hash { - // This rejection is not for the block we proposed, so we can ignore it - continue; - } - if let RejectCode::SignedRejection(signature) = block_rejection.reason_code - { - let block_vote = NakamotoBlockVote { - signer_signature_hash: *signer_signature_hash, - rejected: true, - }; - let message = block_vote.serialize_to_vec(); - if signature.0.verify(aggregate_public_key, &message) { - // A threshold number of signers signed a denial of the proposed block - // Miner will NEVER get a signed block from the signers for this particular block - // Immediately return and attempt to mine a new block - return Err(NakamotoNodeError::SignerSignatureError( - "Signers signed a rejection of the proposed block", - )); - } - } else { - if rejections.contains(&signer_id) { - // We have already received a rejection from this signer - continue; - } - - // We received a rejection that is not signed. We will keep waiting for a threshold number of rejections. - // Ensure that we do not double count a rejection from the same signer. - rejections.insert(signer_id); - rejections_weight = rejections_weight.saturating_add( - *signer_weights - .get( - &slot_ids_addresses - .get(&signer_id) - .expect("FATAL: signer not found in slot ids"), - ) - .expect("FATAL: signer not found in signer weights"), - ); - if rejections_weight > rejection_threshold { - // A threshold number of signers rejected the proposed block. - // Miner will likely never get a signed block from the signers for this particular block - // Return and attempt to mine a new block - return Err(NakamotoNodeError::SignerSignatureError( - "Threshold number of signers rejected the proposed block", - )); - } - } - } - _ => {} // Any other message is ignored - } - } - // We have not received a signed block or enough information to reject the proposed block. Wait a bit and try again. - thread::sleep(Duration::from_millis(WAIT_FOR_SIGNERS_MS)); - } - // We have waited for the signers for too long: stop waiting so we can propose a new block - debug!("Miner: exceeded signer signature timeout. Will propose a new block"); - Err(NakamotoNodeError::SignerSignatureError( - "Timed out waiting for signers", - )) - } - - fn wait_for_signer_signature_and_broadcast( - &self, - stackerdbs: &StackerDBs, - mut block: NakamotoBlock, ) -> Result<(), ChainstateError> { let mut chain_state = neon_node::open_chainstate_with_faults(&self.config) .expect("FATAL: could not open chainstate DB"); @@ -528,37 +606,8 @@ impl BlockMinerThread { self.burnchain.pox_constants.clone(), ) .expect("FATAL: could not open sortition DB"); - let mut sortition_handle = sort_db.index_handle_at_tip(); - let aggregate_public_key = NakamotoChainState::get_aggregate_public_key( - &mut chain_state, - &sort_db, - &sortition_handle, - &block, - )?; - let reward_cycle = self - .burnchain - .block_height_to_reward_cycle(self.burn_block.block_height) - .expect("FATAL: no reward cycle for burn block"); - let signer_weights = NakamotoSigners::get_signers_weights( - &mut chain_state, - &sort_db, - &self.parent_tenure_id, - reward_cycle, - )?; - let signature = self - .wait_for_signer_signature( - &sort_db, - &stackerdbs, - &aggregate_public_key, - &block.header.signer_signature_hash(), - signer_weights, - reward_cycle, - ) - .map_err(|e| { - ChainstateError::InvalidStacksBlock(format!("Invalid Nakamoto block: {e:?}")) - })?; - block.header.signer_signature = signature; + let mut sortition_handle = sort_db.index_handle_at_tip(); let (headers_conn, staging_tx) = chain_state.headers_conn_and_staging_tx_begin()?; NakamotoChainState::accept_block( &chainstate_config, @@ -820,7 +869,7 @@ impl BlockMinerThread { self.get_signer_transactions(&mut chain_state, &burn_db, &stackerdbs)?; // build the block itself - let (mut block, _, _) = NakamotoBlockBuilder::build_nakamoto_block( + let (mut block, consumed, size, tx_events) = NakamotoBlockBuilder::build_nakamoto_block( &chain_state, &burn_db.index_conn(), &mut mem_pool, @@ -833,6 +882,8 @@ impl BlockMinerThread { false, self.globals.get_miner_status(), ), + // we'll invoke the event dispatcher ourselves so that it calculates the + // correct signer_sighash for `process_mined_nakamoto_block_event` Some(&self.event_dispatcher), signer_transactions, ) @@ -866,6 +917,14 @@ impl BlockMinerThread { "signer_sighash" => %block.header.signer_signature_hash(), ); + self.event_dispatcher.process_mined_nakamoto_block_event( + self.burn_block.block_height, + &block, + size, + &consumed, + tx_events, + ); + // last chance -- confirm that the stacks tip is unchanged (since it could have taken long // enough to build this block that another block could have arrived), and confirm that all // Stacks blocks with heights higher than the canoincal tip are processed. diff --git a/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs b/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs new file mode 100644 index 000000000..54895ab08 --- /dev/null +++ b/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs @@ -0,0 +1,517 @@ +// Copyright (C) 2024 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::sync::mpsc::Receiver; +use std::time::{Duration, Instant}; + +use hashbrown::{HashMap, HashSet}; +use libsigner::{MessageSlotID, SignerEvent, SignerMessage}; +use stacks::burnchains::Burnchain; +use stacks::chainstate::burn::db::sortdb::SortitionDB; +use stacks::chainstate::burn::BlockSnapshot; +use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState}; +use stacks::chainstate::stacks::boot::{NakamotoSignerEntry, RewardSet, MINERS_NAME, SIGNERS_NAME}; +use stacks::chainstate::stacks::events::StackerDBChunksEvent; +use stacks::chainstate::stacks::{Error as ChainstateError, ThresholdSignature}; +use stacks::libstackerdb::StackerDBChunkData; +use stacks::net::stackerdb::{StackerDBConfig, StackerDBs}; +use stacks::util_lib::boot::boot_code_id; +use stacks_common::codec::StacksMessageCodec; +use stacks_common::types::chainstate::{StacksPrivateKey, StacksPublicKey}; +use wsts::common::PolyCommitment; +use wsts::curve::ecdsa; +use wsts::curve::point::{Compressed, Point}; +use wsts::curve::scalar::Scalar; +use wsts::state_machine::coordinator::fire::Coordinator as FireCoordinator; +use wsts::state_machine::coordinator::{Config as CoordinatorConfig, Coordinator}; +use wsts::state_machine::PublicKeys; +use wsts::v2::Aggregator; + +use super::Error as NakamotoNodeError; +use crate::event_dispatcher::STACKER_DB_CHANNEL; +use crate::{Config, EventDispatcher}; + +/// The `SignCoordinator` struct represents a WSTS FIRE coordinator whose +/// sole function is to serve as the coordinator for Nakamoto block signing. +/// This coordinator does not operate as a DKG coordinator. Rather, this struct +/// is used by Nakamoto miners to act as the coordinator for the blocks they +/// produce. +pub struct SignCoordinator { + coordinator: FireCoordinator, + receiver: Option>, + message_key: Scalar, + wsts_public_keys: PublicKeys, + is_mainnet: bool, + miners_db_config: StackerDBConfig, + signing_round_timeout: Duration, +} + +pub struct NakamotoSigningParams { + /// total number of signers + pub num_signers: u32, + /// total number of keys + pub num_keys: u32, + /// threshold of keys needed to form a valid signature + pub threshold: u32, + /// map of signer_id to controlled key_ids + pub signer_key_ids: HashMap>, + /// ECDSA public keys as Point objects indexed by signer_id + pub signer_public_keys: HashMap, + pub wsts_public_keys: PublicKeys, +} + +impl Drop for SignCoordinator { + fn drop(&mut self) { + STACKER_DB_CHANNEL.replace_receiver(self.receiver.take().expect( + "FATAL: lost possession of the StackerDB channel before dropping SignCoordinator", + )); + } +} + +impl From<&[NakamotoSignerEntry]> for NakamotoSigningParams { + fn from(reward_set: &[NakamotoSignerEntry]) -> Self { + let mut weight_end = 1; + let mut signer_key_ids = HashMap::with_capacity(reward_set.len()); + let mut signer_public_keys = HashMap::with_capacity(reward_set.len()); + let mut wsts_signers = HashMap::new(); + let mut wsts_key_ids = HashMap::new(); + for (i, entry) in reward_set.iter().enumerate() { + let signer_id = u32::try_from(i).expect("FATAL: number of signers exceeds u32::MAX"); + let ecdsa_pk = ecdsa::PublicKey::try_from(entry.signing_key.as_slice()) + .map_err(|e| format!("Failed to convert signing key to ecdsa::PublicKey: {e}")) + .unwrap_or_else(|err| { + panic!("FATAL: failed to convert signing key to Point: {err}") + }); + let signer_public_key = Point::try_from(&Compressed::from(ecdsa_pk.to_bytes())) + .map_err(|e| format!("Failed to convert signing key to wsts::Point: {e}")) + .unwrap_or_else(|err| { + panic!("FATAL: failed to convert signing key to Point: {err}") + }); + + signer_public_keys.insert(signer_id, signer_public_key); + let weight_start = weight_end; + weight_end = weight_start + entry.weight; + let key_ids: HashSet = (weight_start..weight_end).collect(); + for key_id in key_ids.iter() { + wsts_key_ids.insert(*key_id, ecdsa_pk.clone()); + } + signer_key_ids.insert(signer_id, key_ids); + wsts_signers.insert(signer_id, ecdsa_pk); + } + + let num_keys = weight_end - 1; + let threshold = (num_keys * 70) / 100; + let num_signers = reward_set + .len() + .try_into() + .expect("FATAL: more than u32::max() signers in the reward set"); + + NakamotoSigningParams { + num_signers, + threshold, + num_keys, + signer_key_ids, + signer_public_keys, + wsts_public_keys: PublicKeys { + signers: wsts_signers, + key_ids: wsts_key_ids, + }, + } + } +} + +fn get_signer_commitments( + is_mainnet: bool, + reward_set: &[NakamotoSignerEntry], + stackerdbs: &StackerDBs, + reward_cycle: u64, + expected_aggregate_key: &Point, +) -> Result, ChainstateError> { + let commitment_contract = + MessageSlotID::DkgResults.stacker_db_contract(is_mainnet, reward_cycle); + let signer_set_len = u32::try_from(reward_set.len()) + .map_err(|_| ChainstateError::InvalidStacksBlock("Reward set length exceeds u32".into()))?; + for signer_id in 0..signer_set_len { + let Some(signer_data) = stackerdbs.get_latest_chunk(&commitment_contract, signer_id)? + else { + warn!( + "Failed to fetch DKG result, will look for results from other signers."; + "signer_id" => signer_id + ); + continue; + }; + let Ok(SignerMessage::DkgResults { + aggregate_key, + party_polynomials, + }) = SignerMessage::consensus_deserialize(&mut signer_data.as_slice()) + else { + warn!( + "Failed to parse DKG result, will look for results from other signers."; + "signer_id" => signer_id, + ); + continue; + }; + + if &aggregate_key != expected_aggregate_key { + warn!( + "Aggregate key in DKG results does not match expected, will look for results from other signers."; + "expected" => %expected_aggregate_key, + "reported" => %aggregate_key, + ); + continue; + } + let computed_key = party_polynomials + .iter() + .fold(Point::default(), |s, (_, comm)| s + comm.poly[0]); + + if expected_aggregate_key != &computed_key { + warn!( + "Aggregate key computed from DKG results does not match expected, will look for results from other signers."; + "expected" => %expected_aggregate_key, + "computed" => %computed_key, + ); + continue; + } + + return Ok(party_polynomials); + } + error!( + "No valid DKG results found for the active signing set, cannot coordinate a group signature"; + "reward_cycle" => reward_cycle, + ); + Err(ChainstateError::InvalidStacksBlock( + "Failed to fetch DKG results for the active signer set".into(), + )) +} + +impl SignCoordinator { + /// * `reward_set` - the active reward set data, used to construct the signer + /// set parameters. + /// * `message_key` - the signing key that the coordinator will use to sign messages + /// broadcasted to the signer set. this should be the miner's registered key. + /// * `aggregate_public_key` - the active aggregate key for this cycle + pub fn new( + reward_set: &RewardSet, + reward_cycle: u64, + message_key: Scalar, + aggregate_public_key: Point, + is_mainnet: bool, + stackerdb_conn: &StackerDBs, + miners_db_config: StackerDBConfig, + config: &Config, + ) -> Result { + let Some(ref reward_set_signers) = reward_set.signers else { + error!("Could not initialize WSTS coordinator for reward set without signer"); + return Err(ChainstateError::NoRegisteredSigners(0)); + }; + + let Some(receiver) = STACKER_DB_CHANNEL + .receiver + .lock() + .expect("FATAL: StackerDBChannel lock is poisoned") + .take() + else { + error!("Could not obtain handle for the StackerDBChannel"); + return Err(ChainstateError::ChannelClosed( + "WSTS coordinator requires a handle to the StackerDBChannel".into(), + )); + }; + + let NakamotoSigningParams { + num_signers, + num_keys, + threshold, + signer_key_ids, + signer_public_keys, + wsts_public_keys, + } = NakamotoSigningParams::from(reward_set_signers.as_slice()); + debug!( + "Initializing miner/coordinator"; + "num_signers" => num_signers, + "num_keys" => num_keys, + "threshold" => threshold, + "signer_key_ids" => ?signer_key_ids, + "signer_public_keys" => ?signer_public_keys, + "wsts_public_keys" => ?wsts_public_keys, + ); + let coord_config = CoordinatorConfig { + num_signers, + num_keys, + threshold, + signer_key_ids, + signer_public_keys, + dkg_threshold: threshold, + message_private_key: message_key.clone(), + ..Default::default() + }; + + let mut coordinator: FireCoordinator = FireCoordinator::new(coord_config); + let party_polynomials = get_signer_commitments( + is_mainnet, + reward_set_signers.as_slice(), + stackerdb_conn, + reward_cycle, + &aggregate_public_key, + )?; + if let Err(e) = coordinator + .set_key_and_party_polynomials(aggregate_public_key.clone(), party_polynomials) + { + warn!("Failed to set a valid set of party polynomials"; "error" => %e); + }; + + Ok(Self { + coordinator, + message_key, + receiver: Some(receiver), + wsts_public_keys, + is_mainnet, + miners_db_config, + signing_round_timeout: config.miner.wait_on_signers.clone(), + }) + } + + fn get_sign_id(burn_block_height: u64, burnchain: &Burnchain) -> u64 { + burnchain + .pox_constants + .reward_cycle_index(burnchain.first_block_height, burn_block_height) + .expect("FATAL: tried to initialize WSTS coordinator before first burn block height") + } + + fn send_signers_message( + message_key: &Scalar, + sortdb: &SortitionDB, + tip: &BlockSnapshot, + stackerdbs: &mut StackerDBs, + message: SignerMessage, + is_mainnet: bool, + miners_db_config: &StackerDBConfig, + event_dispatcher: &EventDispatcher, + ) -> Result<(), String> { + let mut miner_sk = StacksPrivateKey::from_slice(&message_key.to_bytes()).unwrap(); + miner_sk.set_compress_public(true); + let miner_pubkey = StacksPublicKey::from_private(&miner_sk); + let Some(slot_range) = NakamotoChainState::get_miner_slot(sortdb, tip, &miner_pubkey) + .map_err(|e| format!("Failed to read miner slot information: {e:?}"))? + else { + return Err("No slot for miner".into()); + }; + let target_slot = 1; + let slot_id = slot_range.start + target_slot; + if !slot_range.contains(&slot_id) { + return Err("Not enough slots for miner messages".into()); + } + // Get the LAST slot version number written to the DB. If not found, use 0. + // Add 1 to get the NEXT version number + // Note: we already check above for the slot's existence + let miners_contract_id = boot_code_id(MINERS_NAME, is_mainnet); + let slot_version = stackerdbs + .get_slot_version(&miners_contract_id, slot_id) + .map_err(|e| format!("Failed to read slot version: {e:?}"))? + .unwrap_or(0) + .saturating_add(1); + let mut chunk = StackerDBChunkData::new(slot_id, slot_version, message.serialize_to_vec()); + chunk + .sign(&miner_sk) + .map_err(|_| "Failed to sign StackerDB chunk")?; + + let stackerdb_tx = stackerdbs.tx_begin(miners_db_config.clone()).map_err(|e| { + warn!("Failed to begin stackerdbs transaction to write .miners message"; "err" => ?e); + "Failed to begin StackerDBs transaction" + })?; + + match stackerdb_tx.put_chunk(&miners_contract_id, chunk, event_dispatcher) { + Ok(()) => { + debug!("Wrote message to stackerdb: {message:?}"); + Ok(()) + } + Err(e) => { + warn!("Failed to write message to stackerdb {e:?}"); + Err("Failed to write message to stackerdb".into()) + } + } + } + + pub fn begin_sign( + &mut self, + block: &NakamotoBlock, + block_attempt: u64, + burn_tip: &BlockSnapshot, + burnchain: &Burnchain, + sortdb: &SortitionDB, + stackerdbs: &mut StackerDBs, + event_dispatcher: &EventDispatcher, + ) -> Result { + let sign_id = Self::get_sign_id(burn_tip.block_height, burnchain); + let sign_iter_id = block_attempt; + let reward_cycle_id = burnchain + .block_height_to_reward_cycle(burn_tip.block_height) + .expect("FATAL: tried to initialize coordinator before first burn block height"); + self.coordinator.current_sign_id = sign_id; + self.coordinator.current_sign_iter_id = sign_iter_id; + + let block_bytes = block.serialize_to_vec(); + let nonce_req_msg = self + .coordinator + .start_signing_round(&block_bytes, false, None) + .map_err(|e| { + NakamotoNodeError::SigningCoordinatorFailure(format!( + "Failed to start signing round in FIRE coordinator: {e:?}" + )) + })?; + Self::send_signers_message( + &self.message_key, + sortdb, + burn_tip, + stackerdbs, + nonce_req_msg.into(), + self.is_mainnet, + &self.miners_db_config, + event_dispatcher, + ) + .map_err(NakamotoNodeError::SigningCoordinatorFailure)?; + + let Some(ref mut receiver) = self.receiver else { + return Err(NakamotoNodeError::SigningCoordinatorFailure( + "Failed to obtain the StackerDB event receiver".into(), + )); + }; + + let start_ts = Instant::now(); + while start_ts.elapsed() <= self.signing_round_timeout { + let event = match receiver.recv_timeout(Duration::from_millis(50)) { + Ok(event) => event, + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + continue; + } + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { + return Err(NakamotoNodeError::SigningCoordinatorFailure( + "StackerDB event receiver disconnected".into(), + )) + } + }; + + let is_signer_event = event.contract_id.name.starts_with(SIGNERS_NAME) + && event.contract_id.issuer.1 == [0; 20]; + if !is_signer_event { + debug!("Ignoring StackerDB event for non-signer contract"; "contract" => %event.contract_id); + continue; + } + let Ok(signer_event) = SignerEvent::try_from(event).map_err(|e| { + warn!("Failure parsing StackerDB event into signer event. Ignoring message."; "err" => ?e); + }) else { + continue; + }; + let SignerEvent::SignerMessages(signer_set, messages) = signer_event else { + debug!("Received signer event other than a signer message. Ignoring."); + continue; + }; + if signer_set != u32::try_from(reward_cycle_id % 2).unwrap() { + debug!("Received signer event for other reward cycle. Ignoring."); + continue; + }; + debug!("Miner/Coordinator: Received messages from signers"; "count" => messages.len()); + let coordinator_pk = ecdsa::PublicKey::new(&self.message_key).map_err(|_e| { + NakamotoNodeError::MinerSignatureError("Bad signing key for the FIRE coordinator") + })?; + let packets: Vec<_> = messages + .into_iter() + .filter_map(|msg| match msg { + SignerMessage::DkgResults { .. } + | SignerMessage::BlockResponse(_) + | SignerMessage::Transactions(_) => None, + SignerMessage::Packet(packet) => { + debug!("Received signers packet: {packet:?}"); + if !packet.verify(&self.wsts_public_keys, &coordinator_pk) { + warn!("Failed to verify StackerDB packet: {packet:?}"); + None + } else { + Some(packet) + } + } + }) + .collect(); + let (outbound_msgs, op_results) = self + .coordinator + .process_inbound_messages(&packets) + .unwrap_or_else(|e| { + error!( + "Miner/Coordinator: Failed to process inbound message packets"; + "err" => ?e + ); + (vec![], vec![]) + }); + for operation_result in op_results.into_iter() { + match operation_result { + wsts::state_machine::OperationResult::Dkg { .. } + | wsts::state_machine::OperationResult::SignTaproot(_) + | wsts::state_machine::OperationResult::DkgError(_) => { + debug!("Ignoring unrelated operation result"); + } + wsts::state_machine::OperationResult::Sign(signature) => { + // check if the signature actually corresponds to our block? + let block_sighash = block.header.signer_signature_hash(); + let verified = signature.verify( + self.coordinator.aggregate_public_key.as_ref().unwrap(), + &block_sighash.0, + ); + let signature = ThresholdSignature(signature); + if !verified { + warn!( + "Processed signature but didn't validate over the expected block. Returning error."; + "signature" => %signature, + "block_signer_signature_hash" => %block_sighash + ); + return Err(NakamotoNodeError::SignerSignatureError( + "Signature failed to validate over the expected block".into(), + )); + } else { + return Ok(signature); + } + } + wsts::state_machine::OperationResult::SignError(e) => { + return Err(NakamotoNodeError::SignerSignatureError(format!( + "Signing failed: {e:?}" + ))) + } + } + } + for msg in outbound_msgs { + match Self::send_signers_message( + &self.message_key, + sortdb, + burn_tip, + stackerdbs, + msg.into(), + self.is_mainnet, + &self.miners_db_config, + event_dispatcher, + ) { + Ok(()) => { + debug!("Miner/Coordinator: sent outbound message."); + } + Err(e) => { + warn!( + "Miner/Coordinator: Failed to send message to StackerDB instance: {e:?}." + ); + } + }; + } + } + + Err(NakamotoNodeError::SignerSignatureError( + "Timed out waiting for group signature".into(), + )) + } +} diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index 49064d497..66bd5c1d9 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -516,6 +516,7 @@ pub(crate) struct BlockMinerThread { burn_block: BlockSnapshot, /// Handle to the node's event dispatcher event_dispatcher: EventDispatcher, + failed_to_submit_last_attempt: bool, } /// State representing the microblock miner. @@ -1020,6 +1021,7 @@ impl BlockMinerThread { registered_key, burn_block, event_dispatcher: rt.event_dispatcher.clone(), + failed_to_submit_last_attempt: false, } } @@ -1543,7 +1545,9 @@ impl BlockMinerThread { Self::find_inflight_mined_blocks(self.burn_block.block_height, &self.last_mined_blocks); // has the tip changed from our previously-mined block for this epoch? - let (attempt, max_txs) = if last_mined_blocks.len() <= 1 { + let should_unconditionally_mine = last_mined_blocks.is_empty() + || (last_mined_blocks.len() == 1 && !self.failed_to_submit_last_attempt); + let (attempt, max_txs) = if should_unconditionally_mine { // always mine if we've not mined a block for this epoch yet, or // if we've mined just one attempt, unconditionally try again (so we // can use `subsequent_miner_time_ms` in this attempt) @@ -2482,12 +2486,15 @@ impl BlockMinerThread { let res = bitcoin_controller.submit_operation(target_epoch_id, op, &mut op_signer, attempt); if res.is_none() { + self.failed_to_submit_last_attempt = true; if !self.config.node.mock_mining { warn!("Relayer: Failed to submit Bitcoin transaction"); return None; } else { debug!("Relayer: Mock-mining enabled; not sending Bitcoin transaction"); } + } else { + self.failed_to_submit_last_attempt = false; } Some(MinerThreadResult::Block( diff --git a/testnet/stacks-node/src/tests/mod.rs b/testnet/stacks-node/src/tests/mod.rs index 7dbabae3e..0e8b818b1 100644 --- a/testnet/stacks-node/src/tests/mod.rs +++ b/testnet/stacks-node/src/tests/mod.rs @@ -59,7 +59,7 @@ mod epoch_23; mod epoch_24; mod integrations; mod mempool; -mod nakamoto_integrations; +pub mod nakamoto_integrations; pub mod neon_integrations; mod signer; mod stackerdb; diff --git a/testnet/stacks-node/src/tests/nakamoto_integrations.rs b/testnet/stacks-node/src/tests/nakamoto_integrations.rs index 4ffed3b97..2c22950ea 100644 --- a/testnet/stacks-node/src/tests/nakamoto_integrations.rs +++ b/testnet/stacks-node/src/tests/nakamoto_integrations.rs @@ -15,6 +15,7 @@ // along with this program. If not, see . use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use std::time::{Duration, Instant}; @@ -25,7 +26,7 @@ use clarity::vm::costs::ExecutionCost; use clarity::vm::types::{PrincipalData, QualifiedContractIdentifier}; use http_types::headers::AUTHORIZATION; use lazy_static::lazy_static; -use libsigner::{BlockResponse, SignerMessage, SignerSession, StackerDBSession}; +use libsigner::{SignerSession, StackerDBSession}; use stacks::burnchains::{MagicBytes, Txid}; use stacks::chainstate::burn::db::sortdb::SortitionDB; use stacks::chainstate::burn::operations::{ @@ -33,7 +34,6 @@ use stacks::chainstate::burn::operations::{ }; use stacks::chainstate::coordinator::comm::CoordinatorChannels; use stacks::chainstate::nakamoto::miner::NakamotoBlockBuilder; -use stacks::chainstate::nakamoto::signer_set::NakamotoSigners; use stacks::chainstate::nakamoto::test_signers::TestSigners; use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState}; use stacks::chainstate::stacks::address::PoxAddress; @@ -49,7 +49,7 @@ use stacks::core::{ PEER_VERSION_EPOCH_2_1, PEER_VERSION_EPOCH_2_2, PEER_VERSION_EPOCH_2_3, PEER_VERSION_EPOCH_2_4, PEER_VERSION_EPOCH_2_5, PEER_VERSION_EPOCH_3_0, }; -use stacks::libstackerdb::{SlotMetadata, StackerDBChunkData}; +use stacks::libstackerdb::SlotMetadata; use stacks::net::api::callreadonly::CallReadOnlyRequestBody; use stacks::net::api::getstackers::GetStackersResponse; use stacks::net::api::postblock_proposal::{ @@ -154,6 +154,27 @@ lazy_static! { ]; } +pub static TEST_SIGNING: Mutex> = Mutex::new(None); + +pub struct TestSigningChannel { + pub recv: Option>, + pub send: Sender, +} + +impl TestSigningChannel { + /// Setup the TestSigningChannel as a singleton using TEST_SIGNING, + /// returning an owned Sender to the channel. + pub fn instantiate() -> Sender { + let (send, recv) = channel(); + let existed = TEST_SIGNING.lock().unwrap().replace(Self { + recv: Some(recv), + send: send.clone(), + }); + assert!(existed.is_none()); + send + } +} + pub fn get_stacker_set(http_origin: &str, cycle: u64) -> GetStackersResponse { let client = reqwest::blocking::Client::new(); let path = format!("{http_origin}/v2/stacker_set/{cycle}"); @@ -216,13 +237,12 @@ pub fn add_initial_balances( pub fn blind_signer( conf: &Config, signers: &TestSigners, - signer: &Secp256k1PrivateKey, proposals_count: RunLoopCounter, ) -> JoinHandle<()> { + let sender = TestSigningChannel::instantiate(); let mut signed_blocks = HashSet::new(); let conf = conf.clone(); let signers = signers.clone(); - let signer = signer.clone(); let mut last_count = proposals_count.load(Ordering::SeqCst); thread::spawn(move || loop { thread::sleep(Duration::from_millis(100)); @@ -231,7 +251,7 @@ pub fn blind_signer( continue; } last_count = cur_count; - match read_and_sign_block_proposal(&conf, &signers, &signer, &signed_blocks) { + match read_and_sign_block_proposal(&conf, &signers, &signed_blocks, &sender) { Ok(signed_block) => { if signed_blocks.contains(&signed_block) { continue; @@ -249,8 +269,8 @@ pub fn blind_signer( pub fn read_and_sign_block_proposal( conf: &Config, signers: &TestSigners, - signer: &Secp256k1PrivateKey, signed_blocks: &HashSet, + channel: &Sender, ) -> Result { let burnchain = conf.get_burnchain(); let sortdb = burnchain.open_sortition_db(true).unwrap(); @@ -267,12 +287,13 @@ pub fn read_and_sign_block_proposal( let miner_contract_id = boot_code_id(MINERS_NAME, false); let mut miners_stackerdb = StackerDBSession::new(&conf.node.rpc_bind, miner_contract_id); miners_stackerdb - .get_latest(miner_slot_id) + .get_latest(miner_slot_id.start) .map_err(|_| "Failed to get latest chunk from the miner slot ID")? .ok_or("No chunk found")? }; let proposed_block_hash = format!("0x{}", proposed_block.header.block_hash()); let signer_sig_hash = proposed_block.header.signer_signature_hash(); + if signed_blocks.contains(&signer_sig_hash) { // already signed off on this block, don't sign again. return Ok(signer_sig_hash); @@ -288,35 +309,10 @@ pub fn read_and_sign_block_proposal( .clone() .sign_nakamoto_block(&mut proposed_block, reward_cycle); - let signer_message = SignerMessage::BlockResponse(BlockResponse::Accepted(( - signer_sig_hash.clone(), - proposed_block.header.signer_signature.clone(), - ))); - - let signers_contract_id = - NakamotoSigners::make_signers_db_contract_id(reward_cycle, libsigner::BLOCK_MSG_ID, false); - - let http_origin = format!("http://{}", &conf.node.rpc_bind); - let signers_info = get_stacker_set(&http_origin, reward_cycle); - let signer_index = get_signer_index(&signers_info, &Secp256k1PublicKey::from_private(signer)) - .unwrap() - .try_into() + channel + .send(proposed_block.header.signer_signature) .unwrap(); - - let next_version = get_stackerdb_slot_version(&http_origin, &signers_contract_id, signer_index) - .map(|x| x + 1) - .unwrap_or(0); - let mut signers_contract_sess = StackerDBSession::new(&conf.node.rpc_bind, signers_contract_id); - let mut chunk_to_put = StackerDBChunkData::new( - u32::try_from(signer_index).unwrap(), - next_version, - signer_message.serialize_to_vec(), - ); - chunk_to_put.sign(signer).unwrap(); - signers_contract_sess - .put_chunk(&chunk_to_put) - .map_err(|e| e.to_string())?; - Ok(signer_sig_hash) + return Ok(signer_sig_hash); } /// Return a working nakamoto-neon config and the miner's bitcoin address to fund @@ -991,7 +987,7 @@ fn simple_neon_integration() { } info!("Nakamoto miner started..."); - blind_signer(&naka_conf, &signers, &sender_signer_sk, proposals_submitted); + blind_signer(&naka_conf, &signers, proposals_submitted); // first block wakes up the run loop, wait until a key registration has been submitted. next_block_and(&mut btc_regtest_controller, 60, || { @@ -1220,7 +1216,7 @@ fn mine_multiple_per_tenure_integration() { .stacks_block_height; info!("Nakamoto miner started..."); - blind_signer(&naka_conf, &signers, &sender_signer_sk, proposals_submitted); + blind_signer(&naka_conf, &signers, proposals_submitted); // first block wakes up the run loop, wait until a key registration has been submitted. next_block_and(&mut btc_regtest_controller, 60, || { @@ -1526,7 +1522,7 @@ fn correct_burn_outs() { ); info!("Bootstrapped to Epoch-3.0 boundary, Epoch2x miner should stop"); - blind_signer(&naka_conf, &signers, &sender_signer_sk, proposals_submitted); + blind_signer(&naka_conf, &signers, proposals_submitted); // we should already be able to query the stacker set via RPC let burnchain = naka_conf.get_burnchain(); @@ -1732,7 +1728,7 @@ fn block_proposal_api_endpoint() { ); info!("Bootstrapped to Epoch-3.0 boundary, starting nakamoto miner"); - blind_signer(&conf, &signers, &sender_signer_sk, proposals_submitted); + blind_signer(&conf, &signers, proposals_submitted); let burnchain = conf.get_burnchain(); let sortdb = burnchain.open_sortition_db(true).unwrap(); @@ -2100,7 +2096,7 @@ fn miner_writes_proposed_block_to_stackerdb() { ); info!("Nakamoto miner started..."); - blind_signer(&naka_conf, &signers, &sender_signer_sk, proposals_submitted); + blind_signer(&naka_conf, &signers, proposals_submitted); // first block wakes up the run loop, wait until a key registration has been submitted. next_block_and(&mut btc_regtest_controller, 60, || { let vrf_count = vrfs_submitted.load(Ordering::SeqCst); @@ -2137,7 +2133,7 @@ fn miner_writes_proposed_block_to_stackerdb() { let mut miners_stackerdb = StackerDBSession::new(&naka_conf.node.rpc_bind, miner_contract_id); miners_stackerdb - .get_latest(slot_id) + .get_latest(slot_id.start) .expect("Failed to get latest chunk from the miner slot ID") .expect("No chunk found") }; @@ -2254,7 +2250,7 @@ fn vote_for_aggregate_key_burn_op() { .unwrap(); info!("Nakamoto miner started..."); - blind_signer(&naka_conf, &signers, &signer_sk, proposals_submitted); + blind_signer(&naka_conf, &signers, proposals_submitted); // first block wakes up the run loop, wait until a key registration has been submitted. next_block_and(&mut btc_regtest_controller, 60, || { let vrf_count = vrfs_submitted.load(Ordering::SeqCst); diff --git a/testnet/stacks-node/src/tests/signer.rs b/testnet/stacks-node/src/tests/signer.rs index fb867db0a..650069943 100644 --- a/testnet/stacks-node/src/tests/signer.rs +++ b/testnet/stacks-node/src/tests/signer.rs @@ -9,8 +9,8 @@ use std::{env, thread}; use clarity::boot_util::boot_code_id; use clarity::vm::Value; use libsigner::{ - BlockResponse, RejectCode, RunningSigner, Signer, SignerEventReceiver, SignerMessage, - BLOCK_MSG_ID, + BlockResponse, MessageSlotID, RejectCode, RunningSigner, Signer, SignerEventReceiver, + SignerMessage, }; use rand::thread_rng; use rand_core::RngCore; @@ -21,6 +21,7 @@ use stacks::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockHeader, NakamotoB use stacks::chainstate::stacks::boot::{ SIGNERS_NAME, SIGNERS_VOTING_FUNCTION_NAME, SIGNERS_VOTING_NAME, }; +use stacks::chainstate::stacks::events::StackerDBChunksEvent; use stacks::chainstate::stacks::miner::TransactionEvent; use stacks::chainstate::stacks::{ StacksPrivateKey, StacksTransaction, ThresholdSignature, TransactionAnchorMode, @@ -31,13 +32,13 @@ use stacks::core::StacksEpoch; use stacks::net::api::postblock_proposal::BlockValidateResponse; use stacks::util_lib::strings::StacksString; use stacks_common::bitvec::BitVec; -use stacks_common::codec::{read_next, StacksMessageCodec}; +use stacks_common::codec::StacksMessageCodec; use stacks_common::consts::{CHAIN_ID_TESTNET, SIGNER_SLOTS_PER_USER}; use stacks_common::types::chainstate::{ ConsensusHash, StacksAddress, StacksBlockId, StacksPublicKey, TrieHash, }; use stacks_common::types::StacksEpochId; -use stacks_common::util::hash::{MerkleTree, Sha512Trunc256Sum}; +use stacks_common::util::hash::{hex_bytes, MerkleTree, Sha512Trunc256Sum}; use stacks_common::util::secp256k1::MessageSignature; use stacks_signer::client::{StackerDB, StacksClient}; use stacks_signer::config::{build_signer_config_tomls, GlobalConfig as SignerConfig, Network}; @@ -246,16 +247,10 @@ impl SignerTest { .btc_regtest_controller .get_headers_height() .saturating_add(nmb_blocks_to_mine_to_dkg); - info!("Mining {nmb_blocks_to_mine_to_dkg} Nakamoto block(s) to reach DKG calculation at block height {end_block_height}"); + info!("Mining {nmb_blocks_to_mine_to_dkg} bitcoin block(s) to reach DKG calculation at bitcoin height {end_block_height}"); for i in 1..=nmb_blocks_to_mine_to_dkg { - info!("Mining Nakamoto block #{i} of {nmb_blocks_to_mine_to_dkg}"); - self.mine_nakamoto_block(timeout); - let hash = self.wait_for_validate_ok_response(timeout); - let signatures = self.wait_for_frost_signatures(timeout); - // Verify the signers accepted the proposed block and are using the new DKG to sign it - for signature in &signatures { - assert!(signature.verify(&set_dkg, hash.0.as_slice())); - } + info!("Mining bitcoin block #{i} and nakamoto tenure of {nmb_blocks_to_mine_to_dkg}"); + self.mine_and_verify_confirmed_naka_block(&set_dkg, timeout); } if nmb_blocks_to_mine_to_dkg == 0 { None @@ -301,13 +296,7 @@ impl SignerTest { .get_approved_aggregate_key(curr_reward_cycle) .expect("Failed to get approved aggregate key") .expect("No approved aggregate key found"); - self.mine_nakamoto_block(timeout); - let hash = self.wait_for_validate_ok_response(timeout); - let signatures = self.wait_for_frost_signatures(timeout); - // Verify the signers accepted the proposed block and are using the new DKG to sign it - for signature in &signatures { - assert!(signature.verify(&set_dkg, hash.0.as_slice())); - } + self.mine_and_verify_confirmed_naka_block(&set_dkg, timeout); } total_nmb_blocks_to_mine -= nmb_blocks_to_reward_cycle; nmb_blocks_to_reward_cycle = 0; @@ -321,17 +310,23 @@ impl SignerTest { .get_approved_aggregate_key(curr_reward_cycle) .expect("Failed to get approved aggregate key") .expect("No approved aggregate key found"); - self.mine_nakamoto_block(timeout); - let hash = self.wait_for_validate_ok_response(timeout); - let signatures = self.wait_for_frost_signatures(timeout); - // Verify the signers accepted the proposed block and are using the new DKG to sign it - for signature in &signatures { - assert!(signature.verify(&set_dkg, hash.0.as_slice())); - } + self.mine_and_verify_confirmed_naka_block(&set_dkg, timeout); } points } + fn mine_and_verify_confirmed_naka_block( + &mut self, + agg_key: &Point, + timeout: Duration, + ) -> MinedNakamotoBlockEvent { + let new_block = self.mine_nakamoto_block(timeout); + let signer_sighash = new_block.signer_signature_hash.clone(); + let signature = self.wait_for_confirmed_block(&signer_sighash, timeout); + assert!(signature.0.verify(&agg_key, signer_sighash.as_bytes())); + new_block + } + fn mine_nakamoto_block(&mut self, timeout: Duration) -> MinedNakamotoBlockEvent { let commits_submitted = self.running_nodes.commits_submitted.clone(); let mined_block_time = Instant::now(); @@ -359,6 +354,41 @@ impl SignerTest { test_observer::get_mined_nakamoto_blocks().pop().unwrap() } + fn wait_for_confirmed_block( + &mut self, + block_signer_sighash: &Sha512Trunc256Sum, + timeout: Duration, + ) -> ThresholdSignature { + let t_start = Instant::now(); + while t_start.elapsed() <= timeout { + let blocks = test_observer::get_blocks(); + if let Some(signature) = blocks.iter().find_map(|block_json| { + let block_obj = block_json.as_object().unwrap(); + let sighash = block_obj + // use the try operator because non-nakamoto blocks + // do not supply this field + .get("signer_signature_hash")? + .as_str() + .unwrap(); + if sighash != &format!("0x{block_signer_sighash}") { + return None; + } + let signer_signature_hex = + block_obj.get("signer_signature").unwrap().as_str().unwrap(); + let signer_signature_bytes = hex_bytes(&signer_signature_hex[2..]).unwrap(); + let signer_signature = ThresholdSignature::consensus_deserialize( + &mut signer_signature_bytes.as_slice(), + ) + .unwrap(); + Some(signer_signature) + }) { + return signature; + } + thread::sleep(Duration::from_millis(500)); + } + panic!("Timed out while waiting for confirmation of block with signer sighash = {block_signer_sighash}") + } + fn wait_for_validate_ok_response(&mut self, timeout: Duration) -> Sha512Trunc256Sum { // Wait for the block to show up in the test observer (Don't have to wait long as if we have received a mined block already, // we know that the signers have already received their block proposal events via their event observers) @@ -1084,32 +1114,18 @@ fn stackerdb_sign() { // Verify the signers rejected the proposed block let t_start = Instant::now(); - let mut chunk = None; - while chunk.is_none() { + let signer_message = loop { assert!( t_start.elapsed() < Duration::from_secs(30), "Timed out while waiting for signers block response stacker db event" ); let nakamoto_blocks = test_observer::get_stackerdb_chunks(); - for event in nakamoto_blocks { - // Only care about the miners block slot - if event.contract_id.name == format!("signers-1-{}", BLOCK_MSG_ID).as_str().into() - || event.contract_id.name == format!("signers-0-{}", BLOCK_MSG_ID).as_str().into() - { - for slot in event.modified_slots { - chunk = Some(slot.data); - break; - } - if chunk.is_some() { - break; - } - } + if let Some(message) = find_block_response(nakamoto_blocks) { + break message; } thread::sleep(Duration::from_secs(1)); - } - let chunk = chunk.unwrap(); - let signer_message = read_next::(&mut &chunk[..]).unwrap(); + }; if let SignerMessage::BlockResponse(BlockResponse::Rejected(rejection)) = signer_message { assert!(matches!( rejection.reason_code, @@ -1121,6 +1137,23 @@ fn stackerdb_sign() { info!("Sign Time Elapsed: {:.2?}", sign_elapsed); } +pub fn find_block_response(chunk_events: Vec) -> Option { + for event in chunk_events.into_iter() { + if event.contract_id.name.as_str() + == &format!("signers-1-{}", MessageSlotID::BlockResponse.to_u8()) + || event.contract_id.name.as_str() + == &format!("signers-0-{}", MessageSlotID::BlockResponse.to_u8()) + { + let Some(data) = event.modified_slots.first() else { + continue; + }; + let msg = SignerMessage::consensus_deserialize(&mut data.data.as_slice()).unwrap(); + return Some(msg); + } + } + None +} + #[test] #[ignore] /// Test that a signer can respond to a miners request for a signature on a block proposal @@ -1164,57 +1197,11 @@ fn stackerdb_block_proposal() { info!("------------------------- Test Block Signed -------------------------"); // Verify that the signers signed the proposed block - let frost_signatures = signer_test.wait_for_frost_signatures(short_timeout); - for signature in &frost_signatures { - assert!( - signature.verify(&key, proposed_signer_signature_hash.0.as_slice()), - "Signature verification failed" - ); - } - info!("------------------------- Test Signers Broadcast Block -------------------------"); - // Verify that the signers broadcasted a signed NakamotoBlock back to the .signers contract - let t_start = Instant::now(); - let mut chunk = None; - while chunk.is_none() { - assert!( - t_start.elapsed() < short_timeout, - "Timed out while waiting for signers block response stacker db event" - ); + let signature = signer_test.wait_for_confirmed_block(&proposed_signer_signature_hash, timeout); + assert!(signature + .0 + .verify(&key, proposed_signer_signature_hash.as_bytes())); - let nakamoto_blocks = test_observer::get_stackerdb_chunks(); - for event in nakamoto_blocks { - if event.contract_id.name == format!("signers-1-{}", BLOCK_MSG_ID).as_str().into() - || event.contract_id.name == format!("signers-0-{}", BLOCK_MSG_ID).as_str().into() - { - for slot in event.modified_slots { - chunk = Some(slot.data); - break; - } - if chunk.is_some() { - break; - } - } - if chunk.is_some() { - break; - } - } - thread::sleep(Duration::from_secs(1)); - } - let chunk = chunk.unwrap(); - let signer_message = read_next::(&mut &chunk[..]).unwrap(); - if let SignerMessage::BlockResponse(BlockResponse::Accepted(( - block_signer_signature_hash, - block_signature, - ))) = signer_message - { - assert_eq!(block_signer_signature_hash, proposed_signer_signature_hash); - assert_eq!( - block_signature, - ThresholdSignature(frost_signatures.first().expect("No signature").clone()) - ); - } else { - panic!("Received unexpected message"); - } signer_test.shutdown(); } @@ -1363,13 +1350,8 @@ fn stackerdb_filter_bad_transactions() { .expect("Failed to write expected transactions to stackerdb"); info!("------------------------- Verify Nakamoto Block Mined -------------------------"); - let mined_block_event = signer_test.mine_nakamoto_block(timeout); - let hash = signer_test.wait_for_validate_ok_response(timeout); - let signatures = signer_test.wait_for_frost_signatures(timeout); - // Verify the signers accepted the proposed block and are using the previously determined dkg to sign it - for signature in &signatures { - assert!(signature.verify(¤t_signers_dkg, hash.0.as_slice())); - } + let mined_block_event = + signer_test.mine_and_verify_confirmed_naka_block(¤t_signers_dkg, timeout); for tx_event in &mined_block_event.tx_events { let TransactionEvent::Success(tx_success) = tx_event else { panic!("Received unexpected transaction event");