diff --git a/testnet/stacks-node/src/config.rs b/testnet/stacks-node/src/config.rs index 18640a5f4..1f05edd7a 100644 --- a/testnet/stacks-node/src/config.rs +++ b/testnet/stacks-node/src/config.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; -use std::net::{SocketAddr, ToSocketAddrs}; +use std::net::{Ipv4Addr, SocketAddr, ToSocketAddrs}; use std::path::PathBuf; +use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{fs, thread}; @@ -1764,6 +1765,18 @@ impl Default for NodeConfig { } impl NodeConfig { + /// Get a SocketAddr for this node's RPC endpoint which uses the loopback address + pub fn get_rpc_loopback(&self) -> Option { + let rpc_port = SocketAddr::from_str(&self.rpc_bind) + .or_else(|e| { + error!("Could not parse node.rpc_bind configuration setting as SocketAddr: {e}"); + Err(()) + }) + .ok()? + .port(); + Some(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port)) + } + pub fn add_signers_stackerdbs(&mut self, is_mainnet: bool) { for signer_set in 0..2 { for message_id in 0..SIGNER_SLOTS_PER_USER { diff --git a/testnet/stacks-node/src/event_dispatcher.rs b/testnet/stacks-node/src/event_dispatcher.rs index 7660d2ada..aafaec99a 100644 --- a/testnet/stacks-node/src/event_dispatcher.rs +++ b/testnet/stacks-node/src/event_dispatcher.rs @@ -1,6 +1,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Mutex; use std::thread::sleep; use std::time::Duration; @@ -12,7 +12,6 @@ use clarity::vm::costs::ExecutionCost; use clarity::vm::events::{FTEventType, NFTEventType, STXEventType}; use clarity::vm::types::{AssetIdentifier, QualifiedContractIdentifier, Value}; use http_types::{Method, Request, Url}; -use lazy_static::lazy_static; use serde_json::json; use stacks::burnchains::{PoxConstants, Txid}; use stacks::chainstate::burn::operations::BlockstackOperationType; @@ -20,7 +19,7 @@ use stacks::chainstate::burn::ConsensusHash; use stacks::chainstate::coordinator::BlockEventDispatcher; use stacks::chainstate::nakamoto::NakamotoBlock; use stacks::chainstate::stacks::address::PoxAddress; -use stacks::chainstate::stacks::boot::RewardSetData; +use stacks::chainstate::stacks::boot::{RewardSetData, SIGNERS_NAME}; use stacks::chainstate::stacks::db::accounts::MinerReward; use stacks::chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState; use stacks::chainstate::stacks::db::{MinerRewardInfo, StacksBlockHeaderTypes, StacksHeaderInfo}; @@ -78,9 +77,7 @@ pub const PATH_BLOCK_PROCESSED: &str = "new_block"; pub const PATH_ATTACHMENT_PROCESSED: &str = "attachments/new"; pub const PATH_PROPOSAL_RESPONSE: &str = "proposal_response"; -lazy_static! { - pub static ref STACKER_DB_CHANNEL: StackerDBChannel = StackerDBChannel::new(); -} +pub static STACKER_DB_CHANNEL: StackerDBChannel = StackerDBChannel::new(); /// This struct receives StackerDB event callbacks without registering /// over the JSON/RPC interface. To ensure that any event observer @@ -92,8 +89,17 @@ lazy_static! { /// bad idea) or listen for events. Registering for RPC callbacks /// seems bad. So instead, it uses a singleton sync channel. pub struct StackerDBChannel { - receiver: Mutex>>, + sender_info: Mutex>, +} + +#[derive(Clone)] +struct InnerStackerDBChannel { + /// A channel for sending the chunk events to the listener sender: Sender, + /// Does the listener want to receive `.signers` chunks? + interested_in_signers: bool, + /// Which StackerDB contracts is the listener interested in? + other_interests: Vec, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -130,59 +136,84 @@ pub struct MinedNakamotoBlockEvent { pub signer_bitvec: String, } -impl StackerDBChannel { - pub fn new() -> Self { - let (sender, recv_channel) = std::sync::mpsc::channel(); - Self { - receiver: Mutex::new(Some(recv_channel)), +impl InnerStackerDBChannel { + pub fn new_miner_receiver() -> (Receiver, Self) { + let (sender, recv) = channel(); + let sender_info = Self { sender, + interested_in_signers: true, + other_interests: vec![], + }; + + (recv, sender_info) + } +} + +impl StackerDBChannel { + pub const fn new() -> Self { + Self { + sender_info: Mutex::new(None), } } - pub fn send(&self, event: StackerDBChunksEvent) { - if let Err(send_err) = self.sender.send(event) { - error!( - "Failed to send StackerDB event to WSTS coordinator channel. Miner thread may have crashed."; - "err" => ?send_err - ); - } - } - - /// Return the receiver to the StackerDBChannel. This must be done before - /// another interested thread can subscribe to events. + /// Consume the receiver for the StackerDBChannel and drop the senders. This should be done + /// before another interested thread can subscribe to events, but it is not absolutely necessary + /// to do so (it would just result in temporary over-use of memory while the prior channel is still + /// open). /// /// The StackerDBChnnel's receiver is guarded with a Mutex, so that ownership can /// be taken by different threads without unsafety. pub fn replace_receiver(&self, receiver: Receiver) { + // not strictly necessary, but do this rather than mark the `receiver` argument as unused + // so that we're explicit about the fact that `replace_receiver` consumes. + drop(receiver); let mut guard = self - .receiver + .sender_info .lock() .expect("FATAL: poisoned StackerDBChannel lock"); - guard.replace(receiver); + guard.take(); } - /// Try to take ownership of the event receiver channel. If another thread - /// already has the channel (or failed to return it), this will return None. + /// Create a new event receiver channel for receiving events relevant to the miner coordinator, + /// dropping the old StackerDB event sender channels if they are still registered. + /// Returns the new receiver channel and a bool indicating whether or not sender channels were + /// still in place. /// - /// The StackerDBChnnel's receiver is guarded with a Mutex, so that ownership can - /// be taken by different threads without unsafety. - pub fn take_receiver(&self) -> Option> { - self.receiver + /// The StackerDBChannel senders are guarded by mutexes so that they can be replaced + /// by different threads without unsafety. + pub fn register_miner_coordinator(&self) -> (Receiver, bool) { + let mut sender_info = self + .sender_info .lock() - .expect("FATAL: poisoned StackerDBChannel lock") - .take() + .expect("FATAL: poisoned StackerDBChannel lock"); + let (recv, new_sender) = InnerStackerDBChannel::new_miner_receiver(); + let replaced_receiver = sender_info.replace(new_sender).is_some(); + + (recv, replaced_receiver) } - /// Is there a thread holding the receiver? - /// - /// This method is used by the event dispatcher to decide whether or not to send a StackerDB - /// event to the channel. - pub fn is_active(&self) -> bool { - // if the receiver field is empty (i.e., None), then a thread must have taken it. - self.receiver + /// Is there a thread holding the receiver, and is it interested in chunks events from `stackerdb`? + /// Returns the a sending channel to broadcast the event to if so, and `None` if not. + pub fn is_active( + &self, + stackerdb: &QualifiedContractIdentifier, + ) -> Option> { + // if the receiver field is empty (i.e., None), then there is no listening thread, return None + let guard = self + .sender_info .lock() - .expect("FATAL: poisoned StackerDBChannel lock") - .is_none() + .expect("FATAL: poisoned StackerDBChannel lock"); + let sender_info = guard.as_ref()?; + if sender_info.interested_in_signers + && stackerdb.issuer.1 == [0; 20] + && stackerdb.name.starts_with(SIGNERS_NAME) + { + return Some(sender_info.sender.clone()); + } + if sender_info.other_interests.contains(stackerdb) { + return Some(sender_info.sender.clone()); + } + None } } @@ -1172,8 +1203,8 @@ impl EventDispatcher { ) { let interested_observers = self.filter_observers(&self.stackerdb_observers_lookup, false); - let interested_receiver = STACKER_DB_CHANNEL.is_active(); - if interested_observers.is_empty() && !interested_receiver { + let interested_receiver = STACKER_DB_CHANNEL.is_active(&contract_id); + if interested_observers.is_empty() && interested_receiver.is_none() { return; } @@ -1184,8 +1215,13 @@ impl EventDispatcher { let payload = serde_json::to_value(&event) .expect("FATAL: failed to serialize StackerDBChunksEvent to JSON"); - if interested_receiver { - STACKER_DB_CHANNEL.send(event) + if let Some(channel) = interested_receiver { + if let Err(send_err) = channel.send(event) { + warn!( + "Failed to send StackerDB event to WSTS coordinator channel. Miner thread may have exited."; + "err" => ?send_err + ); + } } for observer in interested_observers.iter() { diff --git a/testnet/stacks-node/src/nakamoto_node/miner.rs b/testnet/stacks-node/src/nakamoto_node/miner.rs index faede01c7..9450e11f0 100644 --- a/testnet/stacks-node/src/nakamoto_node/miner.rs +++ b/testnet/stacks-node/src/nakamoto_node/miner.rs @@ -22,7 +22,9 @@ use clarity::boot_util::boot_code_id; use clarity::vm::clarity::ClarityConnection; use clarity::vm::types::{PrincipalData, QualifiedContractIdentifier}; use hashbrown::HashSet; -use libsigner::{BlockProposalSigners, MessageSlotID, SignerMessage}; +use libsigner::{ + BlockProposalSigners, MessageSlotID, SignerMessage, SignerSession, StackerDBSession, +}; use stacks::burnchains::{Burnchain, BurnchainParameters}; use stacks::chainstate::burn::db::sortdb::SortitionDB; use stacks::chainstate::burn::{BlockSnapshot, ConsensusHash}; @@ -36,7 +38,7 @@ use stacks::chainstate::stacks::{ TenureChangeCause, TenureChangePayload, ThresholdSignature, TransactionAnchorMode, TransactionPayload, TransactionVersion, }; -use stacks::net::stackerdb::{StackerDBConfig, StackerDBs}; +use stacks::net::stackerdb::StackerDBs; use stacks_common::codec::read_next; use stacks_common::types::chainstate::{StacksAddress, StacksBlockId}; use stacks_common::types::{PrivateKey, StacksEpochId}; @@ -138,36 +140,6 @@ impl BlockMinerThread { globals.unblock_miner(); } - fn make_miners_stackerdb_config( - &mut self, - stackerdbs: &mut StackerDBs, - ) -> Result { - let mut chain_state = neon_node::open_chainstate_with_faults(&self.config) - .expect("FATAL: could not open chainstate DB"); - let burn_db_path = self.config.get_burn_db_file_path(); - let sort_db = SortitionDB::open(&burn_db_path, true, self.burnchain.pox_constants.clone()) - .expect("FATAL: could not open sortition DB"); - let mut stacker_db_configs = HashMap::with_capacity(1); - let miner_contract = boot_code_id(MINERS_NAME, self.config.is_mainnet()); - stacker_db_configs.insert(miner_contract.clone(), StackerDBConfig::noop()); - let mut miners_only_config = stackerdbs - .create_or_reconfigure_stackerdbs(&mut chain_state, &sort_db, stacker_db_configs) - .map_err(|e| { - error!( - "Failed to configure .miners stackerdbs"; - "err" => ?e, - ); - NakamotoNodeError::MinerConfigurationFailed( - "Could not setup .miners stackerdbs configuration", - ) - })?; - miners_only_config.remove(&miner_contract).ok_or_else(|| { - NakamotoNodeError::MinerConfigurationFailed( - "Did not return .miners stackerdb configuration after setup", - ) - }) - } - pub fn run_miner(mut self, prior_miner: Option>) { // when starting a new tenure, block the mining thread if its currently running. // the new mining thread will join it (so that the new mining thread stalls, not the relayer) @@ -210,13 +182,7 @@ impl BlockMinerThread { }; if let Some(mut new_block) = new_block { - let Ok(stackerdb_config) = self.make_miners_stackerdb_config(&mut stackerdbs) - else { - warn!("Failed to setup stackerdb to propose block, will try mining again"); - continue; - }; - - if let Err(e) = self.propose_block(&new_block, &mut stackerdbs, &stackerdb_config) { + if let Err(e) = self.propose_block(&new_block, &stackerdbs) { error!("Unrecoverable error while proposing block to signer set: {e:?}. Ending tenure."); return; } @@ -224,7 +190,6 @@ impl BlockMinerThread { let (aggregate_public_key, signers_signature) = match self.coordinate_signature( &new_block, &mut stackerdbs, - &stackerdb_config, &mut attempts, ) { Ok(x) => x, @@ -278,7 +243,6 @@ impl BlockMinerThread { &mut self, new_block: &NakamotoBlock, stackerdbs: &mut StackerDBs, - stackerdb_config: &StackerDBConfig, attempts: &mut u64, ) -> Result<(Point, ThresholdSignature), NakamotoNodeError> { let Some(miner_privkey) = self.config.miner.mining_key else { @@ -374,7 +338,6 @@ impl BlockMinerThread { aggregate_public_key, self.config.is_mainnet(), &stackerdbs, - stackerdb_config.clone(), &self.config, ) .map_err(|e| { @@ -390,8 +353,7 @@ impl BlockMinerThread { &tip, &self.burnchain, &sort_db, - stackerdbs, - &self.event_dispatcher, + &stackerdbs, )?; Ok((aggregate_public_key, signature)) @@ -400,10 +362,14 @@ impl BlockMinerThread { fn propose_block( &mut self, new_block: &NakamotoBlock, - stackerdbs: &mut StackerDBs, - stackerdb_config: &StackerDBConfig, + stackerdbs: &StackerDBs, ) -> Result<(), NakamotoNodeError> { + let rpc_socket = self.config.node.get_rpc_loopback().ok_or_else(|| { + NakamotoNodeError::MinerConfigurationFailed("Could not parse RPC bind") + })?; let miners_contract_id = boot_code_id(MINERS_NAME, self.config.is_mainnet()); + let mut miners_session = + StackerDBSession::new(&rpc_socket.to_string(), miners_contract_id.clone()); let Some(miner_privkey) = self.config.miner.mining_key else { return Err(NakamotoNodeError::MinerConfigurationFailed( "No mining key configured, cannot mine", @@ -455,17 +421,12 @@ impl BlockMinerThread { }; // Propose the block to the observing signers through the .miners stackerdb instance - let miner_contract_id = boot_code_id(MINERS_NAME, self.config.is_mainnet()); - let Ok(stackerdb_tx) = stackerdbs.tx_begin(stackerdb_config.clone()) else { - warn!("Failed to begin stackerdbs transaction to write block proposal, will try mining again"); - return Ok(()); - }; - - match stackerdb_tx.put_chunk(&miner_contract_id, proposal, &self.event_dispatcher) { - Ok(()) => { + match miners_session.put_chunk(&proposal) { + Ok(ack) => { info!( "Proposed block to stackerdb"; - "signer_sighash" => %new_block.header.signer_signature_hash() + "signer_sighash" => %new_block.header.signer_signature_hash(), + "ack_msg" => ?ack, ); } Err(e) => { diff --git a/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs b/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs index 5d1fcc675..e7b460bc3 100644 --- a/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs +++ b/testnet/stacks-node/src/nakamoto_node/sign_coordinator.rs @@ -17,7 +17,7 @@ use std::sync::mpsc::Receiver; use std::time::{Duration, Instant}; use hashbrown::{HashMap, HashSet}; -use libsigner::{MessageSlotID, SignerEvent, SignerMessage}; +use libsigner::{MessageSlotID, SignerEvent, SignerMessage, SignerSession, StackerDBSession}; use stacks::burnchains::Burnchain; use stacks::chainstate::burn::db::sortdb::SortitionDB; use stacks::chainstate::burn::BlockSnapshot; @@ -26,7 +26,7 @@ use stacks::chainstate::stacks::boot::{NakamotoSignerEntry, RewardSet, MINERS_NA use stacks::chainstate::stacks::events::StackerDBChunksEvent; use stacks::chainstate::stacks::{Error as ChainstateError, ThresholdSignature}; use stacks::libstackerdb::StackerDBChunkData; -use stacks::net::stackerdb::{StackerDBConfig, StackerDBs}; +use stacks::net::stackerdb::StackerDBs; use stacks::util_lib::boot::boot_code_id; use stacks_common::codec::StacksMessageCodec; use stacks_common::types::chainstate::{StacksPrivateKey, StacksPublicKey}; @@ -41,7 +41,7 @@ use wsts::v2::Aggregator; use super::Error as NakamotoNodeError; use crate::event_dispatcher::STACKER_DB_CHANNEL; -use crate::{Config, EventDispatcher}; +use crate::Config; /// The `SignCoordinator` struct represents a WSTS FIRE coordinator whose /// sole function is to serve as the coordinator for Nakamoto block signing. @@ -54,7 +54,7 @@ pub struct SignCoordinator { message_key: Scalar, wsts_public_keys: PublicKeys, is_mainnet: bool, - miners_db_config: StackerDBConfig, + miners_session: StackerDBSession, signing_round_timeout: Duration, } @@ -209,7 +209,6 @@ impl SignCoordinator { aggregate_public_key: Point, is_mainnet: bool, stackerdb_conn: &StackerDBs, - miners_db_config: StackerDBConfig, config: &Config, ) -> Result { let Some(ref reward_set_signers) = reward_set.signers else { @@ -217,6 +216,13 @@ impl SignCoordinator { return Err(ChainstateError::NoRegisteredSigners(0)); }; + let rpc_socket = config + .node + .get_rpc_loopback() + .ok_or_else(|| ChainstateError::MinerAborted)?; + let miners_contract_id = boot_code_id(MINERS_NAME, is_mainnet); + let miners_session = StackerDBSession::new(&rpc_socket.to_string(), miners_contract_id); + let NakamotoSigningParams { num_signers, num_keys, @@ -259,12 +265,10 @@ impl SignCoordinator { warn!("Failed to set a valid set of party polynomials"; "error" => %e); }; - let Some(receiver) = STACKER_DB_CHANNEL.take_receiver() else { - error!("Could not obtain handle for the StackerDBChannel"); - return Err(ChainstateError::ChannelClosed( - "WSTS coordinator requires a handle to the StackerDBChannel".into(), - )); - }; + let (receiver, replaced_other) = STACKER_DB_CHANNEL.register_miner_coordinator(); + if replaced_other { + warn!("Replaced the miner/coordinator receiver of a prior thread. Prior thread may have crashed."); + } Ok(Self { coordinator, @@ -272,7 +276,7 @@ impl SignCoordinator { receiver: Some(receiver), wsts_public_keys, is_mainnet, - miners_db_config, + miners_session, signing_round_timeout: config.miner.wait_on_signers.clone(), }) } @@ -288,11 +292,10 @@ impl SignCoordinator { message_key: &Scalar, sortdb: &SortitionDB, tip: &BlockSnapshot, - stackerdbs: &mut StackerDBs, + stackerdbs: &StackerDBs, message: SignerMessage, is_mainnet: bool, - miners_db_config: &StackerDBConfig, - event_dispatcher: &EventDispatcher, + miners_session: &mut StackerDBSession, ) -> Result<(), String> { let mut miner_sk = StacksPrivateKey::from_slice(&message_key.to_bytes()).unwrap(); miner_sk.set_compress_public(true); @@ -321,14 +324,9 @@ impl SignCoordinator { .sign(&miner_sk) .map_err(|_| "Failed to sign StackerDB chunk")?; - let stackerdb_tx = stackerdbs.tx_begin(miners_db_config.clone()).map_err(|e| { - warn!("Failed to begin stackerdbs transaction to write .miners message"; "err" => ?e); - "Failed to begin StackerDBs transaction" - })?; - - match stackerdb_tx.put_chunk(&miners_contract_id, chunk, event_dispatcher) { - Ok(()) => { - debug!("Wrote message to stackerdb: {message:?}"); + match miners_session.put_chunk(&chunk) { + Ok(ack) => { + debug!("Wrote message to stackerdb: {ack:?}"); Ok(()) } Err(e) => { @@ -345,8 +343,7 @@ impl SignCoordinator { burn_tip: &BlockSnapshot, burnchain: &Burnchain, sortdb: &SortitionDB, - stackerdbs: &mut StackerDBs, - event_dispatcher: &EventDispatcher, + stackerdbs: &StackerDBs, ) -> Result { let sign_id = Self::get_sign_id(burn_tip.block_height, burnchain); let sign_iter_id = block_attempt; @@ -369,11 +366,10 @@ impl SignCoordinator { &self.message_key, sortdb, burn_tip, - stackerdbs, + &stackerdbs, nonce_req_msg.into(), self.is_mainnet, - &self.miners_db_config, - event_dispatcher, + &mut self.miners_session, ) .map_err(NakamotoNodeError::SigningCoordinatorFailure)?; @@ -490,8 +486,7 @@ impl SignCoordinator { stackerdbs, msg.into(), self.is_mainnet, - &self.miners_db_config, - event_dispatcher, + &mut self.miners_session, ) { Ok(()) => { debug!("Miner/Coordinator: sent outbound message.");