refactor: use RPC interface for stackerdb chunk writes (with explicit loopback socket), use mutex for send-side of stacker db event channel

This commit is contained in:
Aaron Blankstein
2024-03-18 09:49:57 -05:00
parent ac0c0873b2
commit b4439f0cd3
4 changed files with 137 additions and 132 deletions

View File

@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::net::{SocketAddr, ToSocketAddrs};
use std::net::{Ipv4Addr, SocketAddr, ToSocketAddrs};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{fs, thread};
@@ -1764,6 +1765,18 @@ impl Default for NodeConfig {
}
impl NodeConfig {
/// Get a SocketAddr for this node's RPC endpoint which uses the loopback address
pub fn get_rpc_loopback(&self) -> Option<SocketAddr> {
let rpc_port = SocketAddr::from_str(&self.rpc_bind)
.or_else(|e| {
error!("Could not parse node.rpc_bind configuration setting as SocketAddr: {e}");
Err(())
})
.ok()?
.port();
Some(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port))
}
pub fn add_signers_stackerdbs(&mut self, is_mainnet: bool) {
for signer_set in 0..2 {
for message_id in 0..SIGNER_SLOTS_PER_USER {

View File

@@ -1,6 +1,6 @@
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Mutex;
use std::thread::sleep;
use std::time::Duration;
@@ -12,7 +12,6 @@ use clarity::vm::costs::ExecutionCost;
use clarity::vm::events::{FTEventType, NFTEventType, STXEventType};
use clarity::vm::types::{AssetIdentifier, QualifiedContractIdentifier, Value};
use http_types::{Method, Request, Url};
use lazy_static::lazy_static;
use serde_json::json;
use stacks::burnchains::{PoxConstants, Txid};
use stacks::chainstate::burn::operations::BlockstackOperationType;
@@ -20,7 +19,7 @@ use stacks::chainstate::burn::ConsensusHash;
use stacks::chainstate::coordinator::BlockEventDispatcher;
use stacks::chainstate::nakamoto::NakamotoBlock;
use stacks::chainstate::stacks::address::PoxAddress;
use stacks::chainstate::stacks::boot::RewardSetData;
use stacks::chainstate::stacks::boot::{RewardSetData, SIGNERS_NAME};
use stacks::chainstate::stacks::db::accounts::MinerReward;
use stacks::chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState;
use stacks::chainstate::stacks::db::{MinerRewardInfo, StacksBlockHeaderTypes, StacksHeaderInfo};
@@ -78,9 +77,7 @@ pub const PATH_BLOCK_PROCESSED: &str = "new_block";
pub const PATH_ATTACHMENT_PROCESSED: &str = "attachments/new";
pub const PATH_PROPOSAL_RESPONSE: &str = "proposal_response";
lazy_static! {
pub static ref STACKER_DB_CHANNEL: StackerDBChannel = StackerDBChannel::new();
}
pub static STACKER_DB_CHANNEL: StackerDBChannel = StackerDBChannel::new();
/// This struct receives StackerDB event callbacks without registering
/// over the JSON/RPC interface. To ensure that any event observer
@@ -92,8 +89,17 @@ lazy_static! {
/// bad idea) or listen for events. Registering for RPC callbacks
/// seems bad. So instead, it uses a singleton sync channel.
pub struct StackerDBChannel {
receiver: Mutex<Option<Receiver<StackerDBChunksEvent>>>,
sender_info: Mutex<Option<InnerStackerDBChannel>>,
}
#[derive(Clone)]
struct InnerStackerDBChannel {
/// A channel for sending the chunk events to the listener
sender: Sender<StackerDBChunksEvent>,
/// Does the listener want to receive `.signers` chunks?
interested_in_signers: bool,
/// Which StackerDB contracts is the listener interested in?
other_interests: Vec<QualifiedContractIdentifier>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -130,59 +136,84 @@ pub struct MinedNakamotoBlockEvent {
pub signer_bitvec: String,
}
impl StackerDBChannel {
pub fn new() -> Self {
let (sender, recv_channel) = std::sync::mpsc::channel();
Self {
receiver: Mutex::new(Some(recv_channel)),
impl InnerStackerDBChannel {
pub fn new_miner_receiver() -> (Receiver<StackerDBChunksEvent>, Self) {
let (sender, recv) = channel();
let sender_info = Self {
sender,
interested_in_signers: true,
other_interests: vec![],
};
(recv, sender_info)
}
}
impl StackerDBChannel {
pub const fn new() -> Self {
Self {
sender_info: Mutex::new(None),
}
}
pub fn send(&self, event: StackerDBChunksEvent) {
if let Err(send_err) = self.sender.send(event) {
error!(
"Failed to send StackerDB event to WSTS coordinator channel. Miner thread may have crashed.";
"err" => ?send_err
);
}
}
/// Return the receiver to the StackerDBChannel. This must be done before
/// another interested thread can subscribe to events.
/// Consume the receiver for the StackerDBChannel and drop the senders. This should be done
/// before another interested thread can subscribe to events, but it is not absolutely necessary
/// to do so (it would just result in temporary over-use of memory while the prior channel is still
/// open).
///
/// The StackerDBChnnel's receiver is guarded with a Mutex, so that ownership can
/// be taken by different threads without unsafety.
pub fn replace_receiver(&self, receiver: Receiver<StackerDBChunksEvent>) {
// not strictly necessary, but do this rather than mark the `receiver` argument as unused
// so that we're explicit about the fact that `replace_receiver` consumes.
drop(receiver);
let mut guard = self
.receiver
.sender_info
.lock()
.expect("FATAL: poisoned StackerDBChannel lock");
guard.replace(receiver);
guard.take();
}
/// Try to take ownership of the event receiver channel. If another thread
/// already has the channel (or failed to return it), this will return None.
/// Create a new event receiver channel for receiving events relevant to the miner coordinator,
/// dropping the old StackerDB event sender channels if they are still registered.
/// Returns the new receiver channel and a bool indicating whether or not sender channels were
/// still in place.
///
/// The StackerDBChnnel's receiver is guarded with a Mutex, so that ownership can
/// be taken by different threads without unsafety.
pub fn take_receiver(&self) -> Option<Receiver<StackerDBChunksEvent>> {
self.receiver
/// The StackerDBChannel senders are guarded by mutexes so that they can be replaced
/// by different threads without unsafety.
pub fn register_miner_coordinator(&self) -> (Receiver<StackerDBChunksEvent>, bool) {
let mut sender_info = self
.sender_info
.lock()
.expect("FATAL: poisoned StackerDBChannel lock")
.take()
.expect("FATAL: poisoned StackerDBChannel lock");
let (recv, new_sender) = InnerStackerDBChannel::new_miner_receiver();
let replaced_receiver = sender_info.replace(new_sender).is_some();
(recv, replaced_receiver)
}
/// Is there a thread holding the receiver?
///
/// This method is used by the event dispatcher to decide whether or not to send a StackerDB
/// event to the channel.
pub fn is_active(&self) -> bool {
// if the receiver field is empty (i.e., None), then a thread must have taken it.
self.receiver
/// Is there a thread holding the receiver, and is it interested in chunks events from `stackerdb`?
/// Returns the a sending channel to broadcast the event to if so, and `None` if not.
pub fn is_active(
&self,
stackerdb: &QualifiedContractIdentifier,
) -> Option<Sender<StackerDBChunksEvent>> {
// if the receiver field is empty (i.e., None), then there is no listening thread, return None
let guard = self
.sender_info
.lock()
.expect("FATAL: poisoned StackerDBChannel lock")
.is_none()
.expect("FATAL: poisoned StackerDBChannel lock");
let sender_info = guard.as_ref()?;
if sender_info.interested_in_signers
&& stackerdb.issuer.1 == [0; 20]
&& stackerdb.name.starts_with(SIGNERS_NAME)
{
return Some(sender_info.sender.clone());
}
if sender_info.other_interests.contains(stackerdb) {
return Some(sender_info.sender.clone());
}
None
}
}
@@ -1172,8 +1203,8 @@ impl EventDispatcher {
) {
let interested_observers = self.filter_observers(&self.stackerdb_observers_lookup, false);
let interested_receiver = STACKER_DB_CHANNEL.is_active();
if interested_observers.is_empty() && !interested_receiver {
let interested_receiver = STACKER_DB_CHANNEL.is_active(&contract_id);
if interested_observers.is_empty() && interested_receiver.is_none() {
return;
}
@@ -1184,8 +1215,13 @@ impl EventDispatcher {
let payload = serde_json::to_value(&event)
.expect("FATAL: failed to serialize StackerDBChunksEvent to JSON");
if interested_receiver {
STACKER_DB_CHANNEL.send(event)
if let Some(channel) = interested_receiver {
if let Err(send_err) = channel.send(event) {
warn!(
"Failed to send StackerDB event to WSTS coordinator channel. Miner thread may have exited.";
"err" => ?send_err
);
}
}
for observer in interested_observers.iter() {

View File

@@ -22,7 +22,9 @@ use clarity::boot_util::boot_code_id;
use clarity::vm::clarity::ClarityConnection;
use clarity::vm::types::{PrincipalData, QualifiedContractIdentifier};
use hashbrown::HashSet;
use libsigner::{BlockProposalSigners, MessageSlotID, SignerMessage};
use libsigner::{
BlockProposalSigners, MessageSlotID, SignerMessage, SignerSession, StackerDBSession,
};
use stacks::burnchains::{Burnchain, BurnchainParameters};
use stacks::chainstate::burn::db::sortdb::SortitionDB;
use stacks::chainstate::burn::{BlockSnapshot, ConsensusHash};
@@ -36,7 +38,7 @@ use stacks::chainstate::stacks::{
TenureChangeCause, TenureChangePayload, ThresholdSignature, TransactionAnchorMode,
TransactionPayload, TransactionVersion,
};
use stacks::net::stackerdb::{StackerDBConfig, StackerDBs};
use stacks::net::stackerdb::StackerDBs;
use stacks_common::codec::read_next;
use stacks_common::types::chainstate::{StacksAddress, StacksBlockId};
use stacks_common::types::{PrivateKey, StacksEpochId};
@@ -138,36 +140,6 @@ impl BlockMinerThread {
globals.unblock_miner();
}
fn make_miners_stackerdb_config(
&mut self,
stackerdbs: &mut StackerDBs,
) -> Result<StackerDBConfig, NakamotoNodeError> {
let mut chain_state = neon_node::open_chainstate_with_faults(&self.config)
.expect("FATAL: could not open chainstate DB");
let burn_db_path = self.config.get_burn_db_file_path();
let sort_db = SortitionDB::open(&burn_db_path, true, self.burnchain.pox_constants.clone())
.expect("FATAL: could not open sortition DB");
let mut stacker_db_configs = HashMap::with_capacity(1);
let miner_contract = boot_code_id(MINERS_NAME, self.config.is_mainnet());
stacker_db_configs.insert(miner_contract.clone(), StackerDBConfig::noop());
let mut miners_only_config = stackerdbs
.create_or_reconfigure_stackerdbs(&mut chain_state, &sort_db, stacker_db_configs)
.map_err(|e| {
error!(
"Failed to configure .miners stackerdbs";
"err" => ?e,
);
NakamotoNodeError::MinerConfigurationFailed(
"Could not setup .miners stackerdbs configuration",
)
})?;
miners_only_config.remove(&miner_contract).ok_or_else(|| {
NakamotoNodeError::MinerConfigurationFailed(
"Did not return .miners stackerdb configuration after setup",
)
})
}
pub fn run_miner(mut self, prior_miner: Option<JoinHandle<()>>) {
// when starting a new tenure, block the mining thread if its currently running.
// the new mining thread will join it (so that the new mining thread stalls, not the relayer)
@@ -210,13 +182,7 @@ impl BlockMinerThread {
};
if let Some(mut new_block) = new_block {
let Ok(stackerdb_config) = self.make_miners_stackerdb_config(&mut stackerdbs)
else {
warn!("Failed to setup stackerdb to propose block, will try mining again");
continue;
};
if let Err(e) = self.propose_block(&new_block, &mut stackerdbs, &stackerdb_config) {
if let Err(e) = self.propose_block(&new_block, &stackerdbs) {
error!("Unrecoverable error while proposing block to signer set: {e:?}. Ending tenure.");
return;
}
@@ -224,7 +190,6 @@ impl BlockMinerThread {
let (aggregate_public_key, signers_signature) = match self.coordinate_signature(
&new_block,
&mut stackerdbs,
&stackerdb_config,
&mut attempts,
) {
Ok(x) => x,
@@ -278,7 +243,6 @@ impl BlockMinerThread {
&mut self,
new_block: &NakamotoBlock,
stackerdbs: &mut StackerDBs,
stackerdb_config: &StackerDBConfig,
attempts: &mut u64,
) -> Result<(Point, ThresholdSignature), NakamotoNodeError> {
let Some(miner_privkey) = self.config.miner.mining_key else {
@@ -374,7 +338,6 @@ impl BlockMinerThread {
aggregate_public_key,
self.config.is_mainnet(),
&stackerdbs,
stackerdb_config.clone(),
&self.config,
)
.map_err(|e| {
@@ -390,8 +353,7 @@ impl BlockMinerThread {
&tip,
&self.burnchain,
&sort_db,
stackerdbs,
&self.event_dispatcher,
&stackerdbs,
)?;
Ok((aggregate_public_key, signature))
@@ -400,10 +362,14 @@ impl BlockMinerThread {
fn propose_block(
&mut self,
new_block: &NakamotoBlock,
stackerdbs: &mut StackerDBs,
stackerdb_config: &StackerDBConfig,
stackerdbs: &StackerDBs,
) -> Result<(), NakamotoNodeError> {
let rpc_socket = self.config.node.get_rpc_loopback().ok_or_else(|| {
NakamotoNodeError::MinerConfigurationFailed("Could not parse RPC bind")
})?;
let miners_contract_id = boot_code_id(MINERS_NAME, self.config.is_mainnet());
let mut miners_session =
StackerDBSession::new(&rpc_socket.to_string(), miners_contract_id.clone());
let Some(miner_privkey) = self.config.miner.mining_key else {
return Err(NakamotoNodeError::MinerConfigurationFailed(
"No mining key configured, cannot mine",
@@ -455,17 +421,12 @@ impl BlockMinerThread {
};
// Propose the block to the observing signers through the .miners stackerdb instance
let miner_contract_id = boot_code_id(MINERS_NAME, self.config.is_mainnet());
let Ok(stackerdb_tx) = stackerdbs.tx_begin(stackerdb_config.clone()) else {
warn!("Failed to begin stackerdbs transaction to write block proposal, will try mining again");
return Ok(());
};
match stackerdb_tx.put_chunk(&miner_contract_id, proposal, &self.event_dispatcher) {
Ok(()) => {
match miners_session.put_chunk(&proposal) {
Ok(ack) => {
info!(
"Proposed block to stackerdb";
"signer_sighash" => %new_block.header.signer_signature_hash()
"signer_sighash" => %new_block.header.signer_signature_hash(),
"ack_msg" => ?ack,
);
}
Err(e) => {

View File

@@ -17,7 +17,7 @@ use std::sync::mpsc::Receiver;
use std::time::{Duration, Instant};
use hashbrown::{HashMap, HashSet};
use libsigner::{MessageSlotID, SignerEvent, SignerMessage};
use libsigner::{MessageSlotID, SignerEvent, SignerMessage, SignerSession, StackerDBSession};
use stacks::burnchains::Burnchain;
use stacks::chainstate::burn::db::sortdb::SortitionDB;
use stacks::chainstate::burn::BlockSnapshot;
@@ -26,7 +26,7 @@ use stacks::chainstate::stacks::boot::{NakamotoSignerEntry, RewardSet, MINERS_NA
use stacks::chainstate::stacks::events::StackerDBChunksEvent;
use stacks::chainstate::stacks::{Error as ChainstateError, ThresholdSignature};
use stacks::libstackerdb::StackerDBChunkData;
use stacks::net::stackerdb::{StackerDBConfig, StackerDBs};
use stacks::net::stackerdb::StackerDBs;
use stacks::util_lib::boot::boot_code_id;
use stacks_common::codec::StacksMessageCodec;
use stacks_common::types::chainstate::{StacksPrivateKey, StacksPublicKey};
@@ -41,7 +41,7 @@ use wsts::v2::Aggregator;
use super::Error as NakamotoNodeError;
use crate::event_dispatcher::STACKER_DB_CHANNEL;
use crate::{Config, EventDispatcher};
use crate::Config;
/// The `SignCoordinator` struct represents a WSTS FIRE coordinator whose
/// sole function is to serve as the coordinator for Nakamoto block signing.
@@ -54,7 +54,7 @@ pub struct SignCoordinator {
message_key: Scalar,
wsts_public_keys: PublicKeys,
is_mainnet: bool,
miners_db_config: StackerDBConfig,
miners_session: StackerDBSession,
signing_round_timeout: Duration,
}
@@ -209,7 +209,6 @@ impl SignCoordinator {
aggregate_public_key: Point,
is_mainnet: bool,
stackerdb_conn: &StackerDBs,
miners_db_config: StackerDBConfig,
config: &Config,
) -> Result<Self, ChainstateError> {
let Some(ref reward_set_signers) = reward_set.signers else {
@@ -217,6 +216,13 @@ impl SignCoordinator {
return Err(ChainstateError::NoRegisteredSigners(0));
};
let rpc_socket = config
.node
.get_rpc_loopback()
.ok_or_else(|| ChainstateError::MinerAborted)?;
let miners_contract_id = boot_code_id(MINERS_NAME, is_mainnet);
let miners_session = StackerDBSession::new(&rpc_socket.to_string(), miners_contract_id);
let NakamotoSigningParams {
num_signers,
num_keys,
@@ -259,12 +265,10 @@ impl SignCoordinator {
warn!("Failed to set a valid set of party polynomials"; "error" => %e);
};
let Some(receiver) = STACKER_DB_CHANNEL.take_receiver() else {
error!("Could not obtain handle for the StackerDBChannel");
return Err(ChainstateError::ChannelClosed(
"WSTS coordinator requires a handle to the StackerDBChannel".into(),
));
};
let (receiver, replaced_other) = STACKER_DB_CHANNEL.register_miner_coordinator();
if replaced_other {
warn!("Replaced the miner/coordinator receiver of a prior thread. Prior thread may have crashed.");
}
Ok(Self {
coordinator,
@@ -272,7 +276,7 @@ impl SignCoordinator {
receiver: Some(receiver),
wsts_public_keys,
is_mainnet,
miners_db_config,
miners_session,
signing_round_timeout: config.miner.wait_on_signers.clone(),
})
}
@@ -288,11 +292,10 @@ impl SignCoordinator {
message_key: &Scalar,
sortdb: &SortitionDB,
tip: &BlockSnapshot,
stackerdbs: &mut StackerDBs,
stackerdbs: &StackerDBs,
message: SignerMessage,
is_mainnet: bool,
miners_db_config: &StackerDBConfig,
event_dispatcher: &EventDispatcher,
miners_session: &mut StackerDBSession,
) -> Result<(), String> {
let mut miner_sk = StacksPrivateKey::from_slice(&message_key.to_bytes()).unwrap();
miner_sk.set_compress_public(true);
@@ -321,14 +324,9 @@ impl SignCoordinator {
.sign(&miner_sk)
.map_err(|_| "Failed to sign StackerDB chunk")?;
let stackerdb_tx = stackerdbs.tx_begin(miners_db_config.clone()).map_err(|e| {
warn!("Failed to begin stackerdbs transaction to write .miners message"; "err" => ?e);
"Failed to begin StackerDBs transaction"
})?;
match stackerdb_tx.put_chunk(&miners_contract_id, chunk, event_dispatcher) {
Ok(()) => {
debug!("Wrote message to stackerdb: {message:?}");
match miners_session.put_chunk(&chunk) {
Ok(ack) => {
debug!("Wrote message to stackerdb: {ack:?}");
Ok(())
}
Err(e) => {
@@ -345,8 +343,7 @@ impl SignCoordinator {
burn_tip: &BlockSnapshot,
burnchain: &Burnchain,
sortdb: &SortitionDB,
stackerdbs: &mut StackerDBs,
event_dispatcher: &EventDispatcher,
stackerdbs: &StackerDBs,
) -> Result<ThresholdSignature, NakamotoNodeError> {
let sign_id = Self::get_sign_id(burn_tip.block_height, burnchain);
let sign_iter_id = block_attempt;
@@ -369,11 +366,10 @@ impl SignCoordinator {
&self.message_key,
sortdb,
burn_tip,
stackerdbs,
&stackerdbs,
nonce_req_msg.into(),
self.is_mainnet,
&self.miners_db_config,
event_dispatcher,
&mut self.miners_session,
)
.map_err(NakamotoNodeError::SigningCoordinatorFailure)?;
@@ -490,8 +486,7 @@ impl SignCoordinator {
stackerdbs,
msg.into(),
self.is_mainnet,
&self.miners_db_config,
event_dispatcher,
&mut self.miners_session,
) {
Ok(()) => {
debug!("Miner/Coordinator: sent outbound message.");