Add spawned signer struct to v1 mod.rs

Signed-off-by: Jacinta Ferrant <jacinta@trustmachines.co>
This commit is contained in:
Jacinta Ferrant
2024-05-13 13:33:54 -07:00
parent d8475f1567
commit 764de0b48b
4 changed files with 108 additions and 120 deletions

View File

@@ -1,7 +1,6 @@
use std::collections::HashSet;
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::{env, thread};
@@ -9,7 +8,7 @@ use std::{env, thread};
use clarity::boot_util::boot_code_id;
use clarity::vm::Value;
use libsigner::v1::messages::{BlockResponse, MessageSlotID, RejectCode, SignerMessage};
use libsigner::{BlockProposal, RunningSigner, Signer, SignerEntries, SignerEventReceiver};
use libsigner::{BlockProposal, SignerEntries};
use rand::thread_rng;
use rand_core::RngCore;
use stacks::burnchains::Txid;
@@ -41,8 +40,8 @@ use stacks_common::util::secp256k1::MessageSignature;
use stacks_signer::client::{SignerSlotID, StackerDB, StacksClient};
use stacks_signer::config::{build_signer_config_tomls, GlobalConfig as SignerConfig, Network};
use stacks_signer::runloop::{RunLoopCommand, SignerCommand};
use stacks_signer::v1;
use stacks_signer::v1::coordinator::CoordinatorSelector;
use stacks_signer::v1::SpawnedSigner;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};
use wsts::curve::point::Point;
@@ -83,13 +82,8 @@ struct RunningNodes {
struct SignerTest {
// The stx and bitcoin nodes and their run loops
pub running_nodes: RunningNodes,
// The channels for sending commands to the signers
pub signer_cmd_senders: Vec<Sender<RunLoopCommand>>,
// The channels for receiving results from the signers
pub result_receivers: Vec<Receiver<Vec<OperationResult>>>,
// The running signer and its threads
pub running_signers:
Vec<RunningSigner<SignerEventReceiver<SignerMessage>, Vec<OperationResult>, SignerMessage>>,
// The spawned signers and their threads
pub spawned_signers: Vec<SpawnedSigner>,
// the private keys of the signers
pub signer_stacks_private_keys: Vec<StacksPrivateKey>,
// link to the stacks node
@@ -127,21 +121,15 @@ impl SignerTest {
Some(9000),
);
let mut running_signers = Vec::new();
let mut signer_cmd_senders = Vec::new();
let mut result_receivers = Vec::new();
for i in 0..num_signers {
let (cmd_send, cmd_recv) = channel();
let (res_send, res_recv) = channel();
info!("spawn signer");
running_signers.push(spawn_signer(
&signer_configs[i as usize],
cmd_recv,
res_send,
));
signer_cmd_senders.push(cmd_send);
result_receivers.push(res_recv);
}
let spawned_signers: Vec<_> = (0..num_signers)
.into_iter()
.map(|i| {
info!("spawning signer");
let signer_config =
SignerConfig::load_from_str(&signer_configs[i as usize]).unwrap();
SpawnedSigner::from(signer_config)
})
.collect();
// Setup the nodes and deploy the contract to it
let node = setup_stx_btc_node(naka_conf, &signer_stacks_private_keys, &signer_configs);
@@ -150,9 +138,7 @@ impl SignerTest {
Self {
running_nodes: node,
result_receivers,
signer_cmd_senders,
running_signers,
spawned_signers,
signer_stacks_private_keys,
stacks_client,
run_stamp,
@@ -430,10 +416,11 @@ impl SignerTest {
debug!("Waiting for DKG...");
let mut key = Point::default();
let dkg_now = Instant::now();
for recv in self.result_receivers.iter() {
for signer in self.spawned_signers.iter() {
let mut aggregate_public_key = None;
loop {
let results = recv
let results = signer
.res_recv
.recv_timeout(timeout)
.expect("failed to recv dkg results");
for result in results {
@@ -738,12 +725,10 @@ impl SignerTest {
/// # Panics
/// Panics if `signer_idx` is out of bounds
fn stop_signer(&mut self, signer_idx: usize) -> StacksPrivateKey {
let running_signer = self.running_signers.remove(signer_idx);
self.signer_cmd_senders.remove(signer_idx);
self.result_receivers.remove(signer_idx);
let spawned_signer = self.spawned_signers.remove(signer_idx);
let signer_key = self.signer_stacks_private_keys.remove(signer_idx);
running_signer.stop();
spawned_signer.stop();
signer_key
}
@@ -764,15 +749,10 @@ impl SignerTest {
.pop()
.unwrap();
let (cmd_send, cmd_recv) = channel();
let (res_send, res_recv) = channel();
info!("Restarting signer");
let signer = spawn_signer(&signer_config, cmd_recv, res_send);
self.result_receivers.insert(signer_idx, res_recv);
self.signer_cmd_senders.insert(signer_idx, cmd_send);
self.running_signers.insert(signer_idx, signer);
let config = SignerConfig::load_from_str(&signer_config).unwrap();
let signer = SpawnedSigner::from(config);
self.spawned_signers.insert(signer_idx, signer);
}
fn shutdown(self) {
@@ -786,38 +766,13 @@ impl SignerTest {
.run_loop_stopper
.store(false, Ordering::SeqCst);
// Stop the signers before the node to prevent hanging
for signer in self.running_signers {
for signer in self.spawned_signers {
assert!(signer.stop().is_none());
}
self.running_nodes.run_loop_thread.join().unwrap();
}
}
fn spawn_signer(
data: &str,
receiver: Receiver<RunLoopCommand>,
sender: Sender<Vec<OperationResult>>,
) -> RunningSigner<SignerEventReceiver<SignerMessage>, Vec<OperationResult>, SignerMessage> {
let config = SignerConfig::load_from_str(data).unwrap();
let ev = SignerEventReceiver::new(config.network.is_mainnet());
let endpoint = config.endpoint;
#[cfg(feature = "monitoring_prom")]
{
stacks_signer::monitoring::start_serving_monitoring_metrics(config.clone()).ok();
}
let runloop: stacks_signer::runloop::RunLoop<v1::signer::Signer, SignerMessage> =
stacks_signer::runloop::RunLoop::new(config);
let mut signer: Signer<
RunLoopCommand,
Vec<OperationResult>,
stacks_signer::runloop::RunLoop<v1::signer::Signer, SignerMessage>,
SignerEventReceiver<SignerMessage>,
SignerMessage,
> = Signer::new(runloop, ev, receiver, sender);
info!("Spawning signer on endpoint {}", endpoint);
signer.spawn(endpoint).unwrap()
}
fn setup_stx_btc_node(
mut naka_conf: NeonConfig,
signer_stacks_private_keys: &[StacksPrivateKey],
@@ -984,8 +939,9 @@ fn stackerdb_dkg() {
// Determine the coordinator of the current node height
info!("signer_runloop: spawn send commands to do dkg");
let dkg_now = Instant::now();
for sender in signer_test.signer_cmd_senders.iter() {
sender
for signer in signer_test.spawned_signers.iter() {
signer
.cmd_send
.send(RunLoopCommand {
reward_cycle,
command: SignerCommand::Dkg,
@@ -1104,11 +1060,13 @@ fn stackerdb_sign_request_rejected() {
merkle_root: None,
},
};
for sender in signer_test.signer_cmd_senders.iter() {
sender
for signer in signer_test.spawned_signers.iter() {
signer
.cmd_send
.send(sign_command.clone())
.expect("failed to send sign command");
sender
signer
.cmd_send
.send(sign_taproot_command.clone())
.expect("failed to send sign taproot command");
}