feat: signer monitoring server

This commit is contained in:
Hank Stoever
2024-04-25 10:38:41 -07:00
parent 41e57efb1d
commit a5bda55833
14 changed files with 578 additions and 4 deletions

5
Cargo.lock generated
View File

@@ -1921,9 +1921,11 @@ version = "0.0.1"
dependencies = [
"clarity",
"hashbrown 0.14.3",
"lazy_static",
"libc",
"libstackerdb",
"mutants",
"prometheus",
"rand 0.8.5",
"rand_core 0.6.4",
"secp256k1",
@@ -3451,10 +3453,12 @@ dependencies = [
"clap 4.5.0",
"clarity",
"hashbrown 0.14.3",
"lazy_static",
"libsigner",
"libstackerdb",
"num-traits",
"polynomial",
"prometheus",
"rand 0.8.5",
"rand_core 0.6.4",
"reqwest",
@@ -3470,6 +3474,7 @@ dependencies = [
"stacks-common",
"stackslib",
"thiserror",
"tiny_http",
"toml 0.5.11",
"tracing",
"tracing-subscriber",

View File

@@ -18,8 +18,10 @@ path = "./src/libsigner.rs"
[dependencies]
clarity = { path = "../clarity" }
hashbrown = { workspace = true }
lazy_static = "1.4.0"
libc = "0.2"
libstackerdb = { path = "../libstackerdb" }
prometheus = { version = "0.9", optional = true }
serde = "1"
serde_derive = "1"
serde_stacker = "0.1"
@@ -50,3 +52,6 @@ sha2 = { version = "0.10", features = ["asm"] }
[target.'cfg(any(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64")), any(target_os = "windows")))'.dependencies]
sha2 = { version = "0.10" }
[features]
monitoring_prom = ["prometheus"]

View File

@@ -24,8 +24,10 @@ backoff = "0.4"
clarity = { path = "../clarity" }
clap = { version = "4.1.1", features = ["derive", "env"] }
hashbrown = { workspace = true }
lazy_static = "1.4.0"
libsigner = { path = "../libsigner" }
libstackerdb = { path = "../libstackerdb" }
prometheus = { version = "0.9", optional = true }
rand_core = "0.6"
reqwest = { version = "0.11.22", default-features = false, features = ["blocking", "json", "rustls-tls"] }
serde = "1"
@@ -37,6 +39,7 @@ slog-term = "2.6.0"
stacks-common = { path = "../stacks-common" }
stackslib = { path = "../stackslib" }
thiserror = "1.0"
tiny_http = { version = "0.12", optional = true }
toml = "0.5.6"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
@@ -60,3 +63,6 @@ features = ["arbitrary_precision", "unbounded_depth"]
[dependencies.secp256k1]
version = "0.24.3"
features = ["serde", "recovery"]
[features]
monitoring_prom = ["libsigner/monitoring_prom", "prometheus", "tiny_http"]

View File

@@ -458,7 +458,7 @@ impl StacksClient {
}
/// Helper function to retrieve the account info from the stacks node for a specific address
fn get_account_entry(
pub fn get_account_entry(
&self,
address: &StacksAddress,
) -> Result<AccountEntryResponse, ClientError> {

View File

@@ -186,6 +186,8 @@ pub struct GlobalConfig {
pub auth_password: String,
/// The path to the signer's database file
pub db_path: PathBuf,
/// Metrics endpoint
pub metrics_endpoint: Option<SocketAddr>,
}
/// Internal struct for loading up the config file
@@ -221,6 +223,8 @@ struct RawConfigFile {
pub auth_password: String,
/// The path to the signer's database file or :memory: for an in-memory database
pub db_path: String,
/// Metrics endpoint
pub metrics_endpoint: Option<String>,
}
impl RawConfigFile {
@@ -298,6 +302,19 @@ impl TryFrom<RawConfigFile> for GlobalConfig {
let sign_timeout = raw_data.sign_timeout_ms.map(Duration::from_millis);
let db_path = raw_data.db_path.into();
let metrics_endpoint = match raw_data.metrics_endpoint {
Some(endpoint) => Some(
endpoint
.to_socket_addrs()
.map_err(|_| ConfigError::BadField("endpoint".to_string(), endpoint.clone()))?
.next()
.ok_or_else(|| {
ConfigError::BadField("endpoint".to_string(), endpoint.clone())
})?,
),
None => None,
};
Ok(Self {
node_host: raw_data.node_host,
endpoint,
@@ -315,6 +332,7 @@ impl TryFrom<RawConfigFile> for GlobalConfig {
max_tx_fee_ustx: raw_data.max_tx_fee_ustx,
auth_password: raw_data.auth_password,
db_path,
metrics_endpoint,
})
}
}
@@ -345,6 +363,10 @@ impl GlobalConfig {
0 => "default".to_string(),
_ => (self.tx_fee_ustx as f64 / 1_000_000.0).to_string(),
};
let metrics_endpoint = match &self.metrics_endpoint {
Some(endpoint) => endpoint.to_string(),
None => "None".to_string(),
};
format!(
r#"
Stacks node host: {node_host}
@@ -354,6 +376,7 @@ Public key: {public_key}
Network: {network}
Database path: {db_path}
DKG transaction fee: {tx_fee} uSTX
Metrics endpoint: {metrics_endpoint}
"#,
node_host = self.node_host,
endpoint = self.endpoint,
@@ -361,7 +384,8 @@ DKG transaction fee: {tx_fee} uSTX
public_key = StacksPublicKey::from_private(&self.stacks_private_key).to_hex(),
network = self.network,
db_path = self.db_path.to_str().unwrap_or_default(),
tx_fee = tx_fee
tx_fee = tx_fee,
metrics_endpoint = metrics_endpoint,
)
}
}
@@ -384,6 +408,7 @@ pub fn build_signer_config_tomls(
mut port_start: usize,
max_tx_fee_ustx: Option<u64>,
tx_fee_ustx: Option<u64>,
mut metrics_port_start: Option<usize>,
) -> Vec<String> {
let mut signer_config_tomls = vec![];
@@ -438,6 +463,17 @@ tx_fee_ustx = {tx_fee_ustx}
)
}
if let Some(metrics_port) = metrics_port_start {
let metrics_endpoint = format!("localhost:{}", metrics_port);
signer_config_toml = format!(
r#"
{signer_config_toml}
metrics_endpoint = "{metrics_endpoint}"
"#
);
metrics_port_start = Some(metrics_port + 1);
}
signer_config_tomls.push(signer_config_toml);
}
@@ -469,6 +505,7 @@ mod tests {
3000,
None,
None,
Some(4000),
);
let config =
@@ -477,6 +514,7 @@ mod tests {
assert_eq!(config.auth_password, "melon");
assert!(config.max_tx_fee_ustx.is_none());
assert!(config.tx_fee_ustx.is_none());
assert_eq!(config.metrics_endpoint, Some("localhost:4000".to_string()));
}
#[test]
@@ -501,6 +539,7 @@ mod tests {
3000,
None,
None,
None,
);
let config =
@@ -526,6 +565,7 @@ mod tests {
3000,
max_tx_fee_ustx,
tx_fee_ustx,
None,
);
let config =
@@ -546,6 +586,7 @@ mod tests {
3000,
max_tx_fee_ustx,
None,
None,
);
let config =
@@ -570,6 +611,7 @@ mod tests {
3000,
None,
tx_fee_ustx,
None,
);
let config =

View File

@@ -34,3 +34,6 @@ pub mod runloop;
pub mod signer;
/// The state module for the signer
pub mod signerdb;
/// The monitoring server for the signer
pub mod monitoring;

View File

@@ -93,6 +93,10 @@ fn spawn_running_signer(path: &PathBuf) -> SpawnedSigner {
let (cmd_send, cmd_recv) = channel();
let (res_send, res_recv) = channel();
let ev = SignerEventReceiver::new(config.network.is_mainnet());
#[cfg(feature = "monitoring_prom")]
{
stacks_signer::monitoring::start_serving_monitoring_metrics(config.clone()).ok();
}
let runloop = RunLoop::from(config);
let mut signer: Signer<RunLoopCommand, Vec<OperationResult>, RunLoop, SignerEventReceiver> =
Signer::new(runloop, ev, cmd_recv, res_send);
@@ -305,6 +309,7 @@ fn handle_generate_files(args: GenerateFilesArgs) {
3000,
None,
None,
None,
);
debug!("Built {:?} signer config tomls.", signer_config_tomls.len());
for (i, file_contents) in signer_config_tomls.iter().enumerate() {

View File

@@ -0,0 +1,162 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2024 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
#[cfg(feature = "monitoring_prom")]
use slog::slog_error;
#[cfg(not(feature = "monitoring_prom"))]
use slog::slog_warn;
#[cfg(feature = "monitoring_prom")]
use stacks_common::error;
#[cfg(not(feature = "monitoring_prom"))]
use stacks_common::warn;
use crate::config::GlobalConfig;
#[cfg(feature = "monitoring_prom")]
mod prometheus;
#[cfg(feature = "monitoring_prom")]
mod server;
/// Update stacks tip height guage
#[allow(unused_variables)]
pub fn update_stacks_tip_height(height: i64) {
#[cfg(feature = "monitoring_prom")]
prometheus::STACKS_TIP_HEIGHT_GUAGE.set(height);
}
/// Update the current reward cycle
#[allow(unused_variables)]
pub fn update_reward_cycle(reward_cycle: i64) {
#[cfg(feature = "monitoring_prom")]
prometheus::CURRENT_REWARD_CYCLE.set(reward_cycle);
}
/// Increment the block validation responses counter
#[allow(unused_variables)]
pub fn increment_block_validation_responses(accepted: bool) {
#[cfg(feature = "monitoring_prom")]
{
let label_value = if accepted { "accepted" } else { "rejected" };
prometheus::BLOCK_VALIDATION_RESPONSES
.with_label_values(&[label_value])
.inc();
}
}
/// Increment the block responses sent counter
#[allow(unused_variables)]
pub fn increment_block_responses_sent(accepted: bool) {
#[cfg(feature = "monitoring_prom")]
{
let label_value = if accepted { "accepted" } else { "rejected" };
prometheus::BLOCK_RESPONSES_SENT
.with_label_values(&[label_value])
.inc();
}
}
/// Increment the signer inbound messages counter
#[allow(unused_variables)]
pub fn increment_signer_inbound_messages(amount: i64) {
#[cfg(feature = "monitoring_prom")]
prometheus::SIGNER_INBOUND_MESSAGES.inc_by(amount);
}
/// Increment the coordinator inbound messages counter
#[allow(unused_variables)]
pub fn increment_coordinator_inbound_messages(amount: i64) {
#[cfg(feature = "monitoring_prom")]
prometheus::COORDINATOR_INBOUND_MESSAGES.inc_by(amount);
}
/// Increment the number of inbound packets received
#[allow(unused_variables)]
pub fn increment_inbound_packets(amount: i64) {
#[cfg(feature = "monitoring_prom")]
prometheus::INBOUND_PACKETS_RECEIVED.inc_by(amount);
}
/// Increment the number of commands processed
#[allow(unused_variables)]
pub fn increment_commands_processed(command_type: &str) {
#[cfg(feature = "monitoring_prom")]
prometheus::COMMANDS_PROCESSED
.with_label_values(&[command_type])
.inc();
}
/// Increment the number of DKG votes submitted
#[allow(unused_variables)]
pub fn increment_dkg_votes_submitted() {
#[cfg(feature = "monitoring_prom")]
prometheus::DGK_VOTES_SUBMITTED.inc();
}
/// Increment the number of commands processed
#[allow(unused_variables)]
pub fn increment_operation_results(operation_type: &str) {
#[cfg(feature = "monitoring_prom")]
prometheus::OPERATION_RESULTS
.with_label_values(&[operation_type])
.inc();
}
/// Increment the number of block proposals received
#[allow(unused_variables)]
pub fn increment_block_proposals_received() {
#[cfg(feature = "monitoring_prom")]
prometheus::BLOCK_PROPOSALS_RECEIVED.inc();
}
/// Update the stx balance of the signer
#[allow(unused_variables)]
pub fn update_signer_stx_balance(balance: i64) {
#[cfg(feature = "monitoring_prom")]
prometheus::SIGNER_STX_BALANCE.set(balance);
}
/// Update the signer nonce metric
#[allow(unused_variables)]
pub fn update_signer_nonce(nonce: u64) {
#[cfg(feature = "monitoring_prom")]
prometheus::SIGNER_NONCE.set(nonce as i64);
}
/// Start serving monitoring metrics.
/// This will only serve the metrics if the `monitoring_prom` feature is enabled.
#[allow(unused_variables)]
pub fn start_serving_monitoring_metrics(config: GlobalConfig) -> Result<(), String> {
#[cfg(feature = "monitoring_prom")]
{
if config.metrics_endpoint.is_none() {
return Ok(());
}
let thread = std::thread::Builder::new()
.name("signer_metrics".to_string())
.spawn(move || {
if let Err(monitoring_err) = server::MonitoringServer::start(&config) {
error!(
"Monitoring: Error starting metrics server: {:?}",
monitoring_err
);
}
});
}
#[cfg(not(feature = "monitoring_prom"))]
warn!("Not starting monitoring metrics server as the monitoring_prom feature is not enabled");
Ok(())
}

View File

@@ -0,0 +1,98 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2024 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use lazy_static::lazy_static;
use prometheus::{
gather, opts, register_int_counter, register_int_counter_vec, register_int_gauge, Encoder,
IntCounter, IntCounterVec, IntGauge, TextEncoder,
};
lazy_static! {
pub static ref STACKS_TIP_HEIGHT_GUAGE: IntGauge = register_int_gauge!(opts!(
"stacks_signer_stacks_node_height",
"The current height of the Stacks node"
))
.unwrap();
pub static ref BLOCK_VALIDATION_RESPONSES: IntCounterVec = register_int_counter_vec!(
"stacks_signer_block_validation_responses",
"The number of block validation responses. `response_type` is either 'accepted' or 'rejected'",
&["response_type"]
)
.unwrap();
pub static ref BLOCK_RESPONSES_SENT: IntCounterVec = register_int_counter_vec!(
"stacks_signer_block_responses_sent",
"The number of block responses sent. `response_type` is either 'accepted' or 'rejected'",
&["response_type"]
)
.unwrap();
pub static ref SIGNER_INBOUND_MESSAGES: IntCounter = register_int_counter!(opts!(
"stacks_signer_inbound_messages",
"The number of inbound messages received by the signer"
))
.unwrap();
pub static ref COORDINATOR_INBOUND_MESSAGES: IntCounter = register_int_counter!(opts!(
"stacks_signer_coordinator_inbound_messages",
"The number of inbound messages received as a coordinator"
))
.unwrap();
pub static ref INBOUND_PACKETS_RECEIVED: IntCounter = register_int_counter!(opts!(
"stacks_signer_inbound_packets_received",
"The number of inbound packets received by the signer"
))
.unwrap();
pub static ref COMMANDS_PROCESSED: IntCounterVec = register_int_counter_vec!(
"stacks_signer_commands_processed",
"The number of commands processed by the signer",
&["command_type"]
)
.unwrap();
pub static ref DGK_VOTES_SUBMITTED: IntCounter = register_int_counter!(opts!(
"stacks_signer_dgk_votes_submitted",
"The number of DGK votes submitted by the signer"
))
.unwrap();
pub static ref OPERATION_RESULTS: IntCounterVec = register_int_counter_vec!(
"stacks_signer_operation_results_dkg",
"The number of DKG operation results",
&["operation_type"]
)
.unwrap();
pub static ref BLOCK_PROPOSALS_RECEIVED: IntCounter = register_int_counter!(opts!(
"stacks_signer_block_proposals_received",
"The number of block proposals received by the signer"
))
.unwrap();
pub static ref CURRENT_REWARD_CYCLE: IntGauge = register_int_gauge!(opts!(
"stacks_signer_current_reward_cycle",
"The current reward cycle"
)).unwrap();
pub static ref SIGNER_STX_BALANCE: IntGauge = register_int_gauge!(opts!(
"stacks_signer_stx_balance",
"The current STX balance of the signer"
)).unwrap();
pub static ref SIGNER_NONCE: IntGauge = register_int_gauge!(opts!(
"stacks_signer_nonce",
"The current nonce of the signer"
)).unwrap();
}
pub fn gather_metrics_string() -> String {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metrics_families = gather();
encoder.encode(&metrics_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}

View File

@@ -0,0 +1,197 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2024 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::net::SocketAddr;
use std::time::Instant;
use clarity::util::hash::to_hex;
use clarity::util::secp256k1::Secp256k1PublicKey;
use slog::{slog_debug, slog_error, slog_info, slog_warn};
use stacks_common::{debug, error, info, warn};
use tiny_http::{Response as HttpResponse, Server as HttpServer};
use super::{update_reward_cycle, update_signer_stx_balance};
use crate::client::{ClientError, StacksClient};
use crate::config::{GlobalConfig, Network};
use crate::monitoring::prometheus::gather_metrics_string;
use crate::monitoring::{update_signer_nonce, update_stacks_tip_height};
#[derive(Debug)]
/// Monitoring server errors
pub enum MonitoringError {
/// Already bound to an address
AlreadyBound,
/// Server terminated
Terminated,
/// No endpoint configured
EndpointNotConfigured,
/// Error fetching metrics from stacks node
FetchError(ClientError),
}
/// Metrics and monitoring server
pub struct MonitoringServer {
http_server: HttpServer,
local_addr: SocketAddr,
stacks_client: StacksClient,
last_metrics_poll: Instant,
network: Network,
public_key: Secp256k1PublicKey,
}
impl MonitoringServer {
pub fn new(
http_server: HttpServer,
local_addr: SocketAddr,
stacks_client: StacksClient,
network: Network,
public_key: Secp256k1PublicKey,
) -> Self {
Self {
http_server,
local_addr,
stacks_client,
last_metrics_poll: Instant::now(),
network,
public_key,
}
}
/// Start and run the metrics server
pub fn start(config: &GlobalConfig) -> Result<(), MonitoringError> {
let Some(endpoint) = config.metrics_endpoint else {
return Err(MonitoringError::EndpointNotConfigured);
};
let stacks_client = StacksClient::from(config);
let http_server = HttpServer::http(endpoint).map_err(|_| MonitoringError::AlreadyBound)?;
let public_key = Secp256k1PublicKey::from_private(&config.stacks_private_key);
let mut server = MonitoringServer::new(
http_server,
endpoint,
stacks_client,
config.network.clone(),
public_key,
);
server.update_metrics()?;
server.main_loop()
}
// /// Start and run the metrics server
// pub fn run(endpoint: SocketAddr, stacks_client: StacksClient) -> Result<(), MonitoringError> {
// let http_server = HttpServer::http(endpoint).map_err(|_| MonitoringError::AlreadyBound)?;
// let mut server = PrometheusMetrics::new(http_server, endpoint, stacks_client);
// server.main_loop()
// }
/// Main listener loop of metrics server
pub fn main_loop(&mut self) -> Result<(), MonitoringError> {
info!("{}: Starting Prometheus metrics server", self);
loop {
if let Err(err) = self.refresh_metrics() {
error!("Monitoring: Error refreshing metrics: {:?}", err);
}
let request = match self.http_server.recv() {
Ok(request) => request,
Err(err) => {
error!("Monitoring: Error receiving request: {:?}", err);
return Err(MonitoringError::Terminated);
}
};
debug!("{}: received request {}", self, request.url());
if request.url() == "/metrics" {
let response = HttpResponse::from_string(gather_metrics_string());
request.respond(response).expect("Failed to send response");
continue;
}
// unknown request, return 200 ok
request
.respond(HttpResponse::from_string(self.get_info_response()))
.expect("Failed to respond to request");
}
}
/// Check to see if metrics need to be refreshed
fn refresh_metrics(&mut self) -> Result<(), MonitoringError> {
let now = Instant::now();
if now.duration_since(self.last_metrics_poll).as_secs() > 60 {
self.last_metrics_poll = now;
self.update_metrics()?;
}
Ok(())
}
/// Update metrics by making RPC calls to the Stacks node
fn update_metrics(&self) -> Result<(), MonitoringError> {
debug!("{}: Updating metrics", self);
let peer_info = self
.stacks_client
.get_peer_info()
.map_err(|e| MonitoringError::FetchError(e))?;
if let Ok(height) = i64::try_from(peer_info.stacks_tip_height) {
update_stacks_tip_height(height);
} else {
warn!(
"Failed to parse stacks tip height: {}",
peer_info.stacks_tip_height
);
}
let pox_info = self
.stacks_client
.get_pox_data()
.map_err(|e| MonitoringError::FetchError(e))?;
if let Ok(reward_cycle) = i64::try_from(pox_info.reward_cycle_id) {
update_reward_cycle(reward_cycle);
}
let signer_stx_addr = self.stacks_client.get_signer_address();
let account_entry = self
.stacks_client
.get_account_entry(&signer_stx_addr)
.map_err(|e| MonitoringError::FetchError(e))?;
let balance = i64::from_str_radix(&account_entry.balance[2..], 16).map_err(|e| {
MonitoringError::FetchError(ClientError::MalformedClarityValue(format!(
"Failed to parse balance: {} with err: {}",
&account_entry.balance, e,
)))
})?;
if let Ok(nonce) = u64::try_from(account_entry.nonce) {
update_signer_nonce(nonce);
} else {
warn!("Failed to parse nonce: {}", account_entry.nonce);
}
update_signer_stx_balance(balance);
Ok(())
}
/// Build a JSON response for non-metrics requests
fn get_info_response(&self) -> String {
// let public_key = Secp256k1PublicKey::from_private(&self.stacks_client.publ);
serde_json::to_string(&serde_json::json!({
"signerPublicKey": to_hex(&self.public_key.to_bytes_compressed()),
"network": self.network.to_string(),
"stxAddress": self.stacks_client.get_signer_address().to_string(),
}))
.expect("Failed to serialize JSON")
}
}
impl std::fmt::Display for MonitoringServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Signer monitoring server ({})", self.local_addr)
}
}

View File

@@ -413,6 +413,7 @@ impl Signer {
fn execute_command(&mut self, stacks_client: &StacksClient, command: &Command) {
match command {
Command::Dkg => {
crate::monitoring::increment_commands_processed(&"dkg");
if self.approved_aggregate_public_key.is_some() {
debug!("Reward cycle #{} Signer #{}: Already have an aggregate key. Ignoring DKG command.", self.reward_cycle, self.signer_id);
return;
@@ -449,6 +450,7 @@ impl Signer {
is_taproot,
merkle_root,
} => {
crate::monitoring::increment_commands_processed(&"sign");
if self.approved_aggregate_public_key.is_none() {
debug!("{self}: Cannot sign a block without an approved aggregate public key. Ignore it.");
return;
@@ -548,6 +550,7 @@ impl Signer {
) {
let mut block_info = match block_validate_response {
BlockValidateResponse::Ok(block_validate_ok) => {
crate::monitoring::increment_block_validation_responses(true);
let signer_signature_hash = block_validate_ok.signer_signature_hash;
// For mutability reasons, we need to take the block_info out of the map and add it back after processing
let mut block_info = match self
@@ -578,6 +581,7 @@ impl Signer {
block_info
}
BlockValidateResponse::Reject(block_validate_reject) => {
crate::monitoring::increment_block_validation_responses(false);
let signer_signature_hash = block_validate_reject.signer_signature_hash;
let mut block_info = match self
.signer_db
@@ -680,6 +684,9 @@ impl Signer {
packets: &[Packet],
current_reward_cycle: u64,
) {
if let Ok(packets_len) = packets.len().try_into() {
crate::monitoring::increment_inbound_packets(packets_len);
}
let signer_outbound_messages = self
.state_machine
.process_inbound_messages(packets)
@@ -1036,20 +1043,25 @@ impl Signer {
// Signers only every trigger non-taproot signing rounds over blocks. Ignore SignTaproot results
match operation_result {
OperationResult::Sign(signature) => {
crate::monitoring::increment_operation_results(&"sign");
debug!("{self}: Received signature result");
self.process_signature(signature);
}
OperationResult::SignTaproot(_) => {
crate::monitoring::increment_operation_results(&"sign_taproot");
debug!("{self}: Received a signature result for a taproot signature. Nothing to broadcast as we currently sign blocks with a FROST signature.");
}
OperationResult::Dkg(aggregate_key) => {
crate::monitoring::increment_operation_results(&"dkg");
self.process_dkg(stacks_client, aggregate_key);
}
OperationResult::SignError(e) => {
crate::monitoring::increment_operation_results(&"sign_error");
warn!("{self}: Received a Sign error: {e:?}");
self.process_sign_error(e);
}
OperationResult::DkgError(e) => {
crate::monitoring::increment_operation_results(&"dkg_error");
warn!("{self}: Received a DKG error: {e:?}");
// TODO: process these errors and track malicious signers to report
}
@@ -1200,6 +1212,7 @@ impl Signer {
debug!("{self}: Received a DKG result, but are in an unsupported epoch. Do not broadcast the transaction ({}).", new_transaction.txid());
return Ok(());
}
crate::monitoring::increment_dkg_votes_submitted();
// For all Pox-4 epochs onwards, broadcast the results also to stackerDB for other signers/miners to observe
signer_transactions.push(new_transaction);
let signer_message = SignerMessage::Transactions(signer_transactions);
@@ -1219,9 +1232,11 @@ impl Signer {
};
let block_submission = if block_vote.rejected {
crate::monitoring::increment_block_responses_sent(false);
// We signed a rejection message. Return a rejection message
BlockResponse::rejected(block_vote.signer_signature_hash, signature.clone())
} else {
crate::monitoring::increment_block_responses_sent(true);
// we agreed to sign the block hash. Return an approval message
BlockResponse::accepted(block_vote.signer_signature_hash, signature.clone())
};

View File

@@ -4,3 +4,4 @@ endpoint = "localhost:30000"
network = "testnet"
auth_password = "12345"
db_path = ":memory:"
metrics_endpoint = "0.0.0.0:9090"

View File

@@ -62,7 +62,7 @@ name = "stacks-events"
path = "src/stacks_events.rs"
[features]
monitoring_prom = ["stacks/monitoring_prom"]
monitoring_prom = ["stacks/monitoring_prom", "libsigner/monitoring_prom"]
slog_json = ["stacks/slog_json", "stacks-common/slog_json", "clarity/slog_json"]
prod-genesis-chainstate = []
default = []

View File

@@ -125,6 +125,7 @@ impl SignerTest {
3000,
Some(100_000),
None,
Some(9000),
);
let mut running_signers = Vec::new();
@@ -511,6 +512,23 @@ impl SignerTest {
entries.public_keys
}
fn get_signer_metrics(&self) -> String {
#[cfg(feature = "monitoring_prom")]
{
let client = reqwest::blocking::Client::new();
let res = client
.get("http://localhost:9000/metrics")
.send()
.unwrap()
.text()
.unwrap();
return res;
}
#[cfg(not(feature = "monitoring_prom"))]
return String::new();
}
fn generate_invalid_transactions(&self) -> Vec<StacksTransaction> {
let host = self
.running_nodes
@@ -741,6 +759,7 @@ impl SignerTest {
3000 + signer_idx,
Some(100_000),
None,
Some(9000 + signer_idx),
)
.pop()
.unwrap();
@@ -782,6 +801,10 @@ fn spawn_signer(
let config = SignerConfig::load_from_str(data).unwrap();
let ev = SignerEventReceiver::new(config.network.is_mainnet());
let endpoint = config.endpoint;
#[cfg(feature = "monitoring_prom")]
{
stacks_signer::monitoring::start_serving_monitoring_metrics(config.clone()).ok();
}
let runloop: stacks_signer::runloop::RunLoop = stacks_signer::runloop::RunLoop::from(config);
let mut signer: Signer<
RunLoopCommand,
@@ -1329,7 +1352,8 @@ fn stackerdb_block_proposal() {
.init();
info!("------------------------- Test Setup -------------------------");
let mut signer_test = SignerTest::new(5);
let num_signers = 5;
let mut signer_test = SignerTest::new(num_signers);
let timeout = Duration::from_secs(200);
let short_timeout = Duration::from_secs(30);
@@ -1347,6 +1371,17 @@ fn stackerdb_block_proposal() {
.0
.verify(&key, proposed_signer_signature_hash.as_bytes()));
// Test prometheus metrics response
#[cfg(feature = "monitoring_prom")]
{
let metrics_response = signer_test.get_signer_metrics();
// Because 5 signers are running in the same process, the prometheus metrics
// are incremented once for every signer. This is why we expect the metric to be
// `5`, even though there is only one block proposed.
let expected_result = format!("stacks_signer_block_proposals_received {}", num_signers);
assert!(metrics_response.contains(&expected_result));
}
signer_test.shutdown();
}