From b6f86693b42cc7a5a38d527628ebd859cdee5608 Mon Sep 17 00:00:00 2001 From: Aaron Blankstein Date: Tue, 2 Jan 2024 13:36:32 -0600 Subject: [PATCH] fix: do not panic async task on prometheus bind failure, pass prom thread to naka runloop --- testnet/stacks-node/src/monitoring/mod.rs | 19 ++++++++-- .../stacks-node/src/monitoring/prometheus.rs | 37 +++++++++++-------- .../stacks-node/src/run_loop/boot_nakamoto.rs | 4 +- testnet/stacks-node/src/run_loop/nakamoto.rs | 29 ++++++++++----- testnet/stacks-node/src/run_loop/neon.rs | 30 +++++++++------ .../src/tests/nakamoto_integrations.rs | 32 ++++++++++++++++ 6 files changed, 109 insertions(+), 42 deletions(-) diff --git a/testnet/stacks-node/src/monitoring/mod.rs b/testnet/stacks-node/src/monitoring/mod.rs index 165937b4a..4c254681e 100644 --- a/testnet/stacks-node/src/monitoring/mod.rs +++ b/testnet/stacks-node/src/monitoring/mod.rs @@ -5,8 +5,19 @@ pub use stacks::monitoring::{increment_errors_emitted_counter, increment_warning #[cfg(feature = "monitoring_prom")] mod prometheus; -pub fn start_serving_monitoring_metrics(bind_address: String) { - info!("Start serving prometheus metrics"); - #[cfg(feature = "monitoring_prom")] - prometheus::start_serving_prometheus_metrics(bind_address); +#[derive(Debug)] +pub enum MonitoringError { + AlreadyBound, + UnableToGetAddress, +} + +#[cfg(feature = "monitoring_prom")] +pub fn start_serving_monitoring_metrics(bind_address: String) -> Result<(), MonitoringError> { + prometheus::start_serving_prometheus_metrics(bind_address) +} + +#[cfg(not(feature = "monitoring_prom"))] +pub fn start_serving_monitoring_metrics(bind_address: String) -> Result<(), MonitoringError> { + warn!("Attempted to start monitoring service at bind_address = {bind_address}, but stacks-node was built without `monitoring_prom` feature."); + Ok(()) } diff --git a/testnet/stacks-node/src/monitoring/prometheus.rs b/testnet/stacks-node/src/monitoring/prometheus.rs index 65c427d2b..e9705142d 100644 --- a/testnet/stacks-node/src/monitoring/prometheus.rs +++ b/testnet/stacks-node/src/monitoring/prometheus.rs @@ -4,20 +4,26 @@ use async_std::task; use http_types::{Body, Response, StatusCode}; use stacks::prometheus::{gather, Encoder, TextEncoder}; -pub fn start_serving_prometheus_metrics(bind_address: String) { - let addr = bind_address.clone(); +use super::MonitoringError; - async_std::task::block_on(async { - let listener = TcpListener::bind(addr) +pub fn start_serving_prometheus_metrics(bind_address: String) -> Result<(), MonitoringError> { + task::block_on(async { + let listener = TcpListener::bind(bind_address) .await - .expect("Prometheus monitoring: unable to bind address"); - let addr = format!( - "http://{}", - listener + .map_err(|_| { + warn!("Prometheus monitoring: unable to bind address, will not spawn prometheus endpoint service."); + MonitoringError::AlreadyBound + })?; + let local_addr = listener .local_addr() - .expect("Prometheus monitoring: unable to get addr") + .map_err(|_| { + warn!("Prometheus monitoring: unable to get local bind address, will not spawn prometheus endpoint service."); + MonitoringError::UnableToGetAddress + })?; + info!( + "Prometheus monitoring: server listening on http://{}", + local_addr ); - info!("Prometheus monitoring: server listening on {}", addr); let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { @@ -25,21 +31,20 @@ pub fn start_serving_prometheus_metrics(bind_address: String) { Ok(stream) => stream, Err(err) => { error!( - "Prometheus monitoring: unable to open socket and serve metrics - {:?}", - err + "Prometheus monitoring: unable to open socket and serve metrics - {err:?}", ); continue; } }; - let addr = addr.clone(); - task::spawn(async { if let Err(err) = accept(stream).await { - eprintln!("{}", err); + error!("{err}"); } }); } - }); + + Ok::<_, MonitoringError>(()) + }) } async fn accept(stream: TcpStream) -> http_types::Result<()> { diff --git a/testnet/stacks-node/src/run_loop/boot_nakamoto.rs b/testnet/stacks-node/src/run_loop/boot_nakamoto.rs index e70784ce4..4485a4cac 100644 --- a/testnet/stacks-node/src/run_loop/boot_nakamoto.rs +++ b/testnet/stacks-node/src/run_loop/boot_nakamoto.rs @@ -54,7 +54,7 @@ impl BootRunLoop { InnerLoops::Epoch2(neon), ) } else { - let naka = NakaRunLoop::new(config.clone(), None, None); + let naka = NakaRunLoop::new(config.clone(), None, None, None); ( naka.get_coordinator_channel().unwrap(), InnerLoops::Epoch3(naka), @@ -122,6 +122,7 @@ impl BootRunLoop { .expect("FATAL: failed to spawn epoch-2/3-boot thread"); neon_loop.start(burnchain_opt.clone(), mine_start); + let monitoring_thread = neon_loop.take_monitoring_thread(); // did we exit because of the epoch-3.0 transition, or some other reason? let exited_for_transition = boot_thread .join() @@ -136,6 +137,7 @@ impl BootRunLoop { self.config.clone(), Some(termination_switch), Some(counters), + monitoring_thread, ); let new_coord_channels = naka .get_coordinator_channel() diff --git a/testnet/stacks-node/src/run_loop/nakamoto.rs b/testnet/stacks-node/src/run_loop/nakamoto.rs index 945e9fece..d5e57646a 100644 --- a/testnet/stacks-node/src/run_loop/nakamoto.rs +++ b/testnet/stacks-node/src/run_loop/nakamoto.rs @@ -37,7 +37,7 @@ use stx_genesis::GenesisData; use crate::burnchains::make_bitcoin_indexer; use crate::globals::Globals as GenericGlobals; -use crate::monitoring::start_serving_monitoring_metrics; +use crate::monitoring::{start_serving_monitoring_metrics, MonitoringError}; use crate::nakamoto_node::{self, StacksNode, BLOCK_PROCESSOR_STACK_SIZE, RELAYER_MAX_BUFFER}; use crate::node::{ get_account_balances, get_account_lockups, get_names, get_namespaces, @@ -69,6 +69,7 @@ pub struct RunLoop { /// NOTE: this is duplicated in self.globals, but it needs to be accessible before globals is /// instantiated (namely, so the test framework can access it). miner_status: Arc>, + monitoring_thread: Option>>, } impl RunLoop { @@ -77,6 +78,7 @@ impl RunLoop { config: Config, should_keep_running: Option>, counters: Option, + monitoring_thread: Option>>, ) -> Self { let channels = CoordinatorCommunication::instantiate(); let should_keep_running = @@ -103,6 +105,7 @@ impl RunLoop { burnchain: None, pox_watchdog_comms, miner_status, + monitoring_thread, } } @@ -333,16 +336,22 @@ impl RunLoop { /// Start Prometheus logging fn start_prometheus(&mut self) { - let prometheus_bind = self.config.node.prometheus_bind.clone(); - if let Some(prometheus_bind) = prometheus_bind { - thread::Builder::new() - .name("prometheus".to_string()) - .spawn(move || { - debug!("prometheus thread ID is {:?}", thread::current().id()); - start_serving_monitoring_metrics(prometheus_bind); - }) - .unwrap(); + if self.monitoring_thread.is_some() { + info!("Monitoring thread already running, nakamoto run-loop will not restart it"); + return; } + let Some(prometheus_bind) = self.config.node.prometheus_bind.clone() else { + return; + }; + let monitoring_thread = thread::Builder::new() + .name("prometheus".to_string()) + .spawn(move || { + debug!("prometheus thread ID is {:?}", thread::current().id()); + start_serving_monitoring_metrics(prometheus_bind) + }) + .expect("FATAL: failed to start monitoring thread"); + + self.monitoring_thread.replace(monitoring_thread); } /// Get the sortition DB's highest block height, aligned to a reward cycle boundary, and the diff --git a/testnet/stacks-node/src/run_loop/neon.rs b/testnet/stacks-node/src/run_loop/neon.rs index f04874110..f86c2b48f 100644 --- a/testnet/stacks-node/src/run_loop/neon.rs +++ b/testnet/stacks-node/src/run_loop/neon.rs @@ -33,7 +33,7 @@ use stx_genesis::GenesisData; use super::RunLoopCallbacks; use crate::burnchains::make_bitcoin_indexer; use crate::globals::NeonGlobals as Globals; -use crate::monitoring::start_serving_monitoring_metrics; +use crate::monitoring::{start_serving_monitoring_metrics, MonitoringError}; use crate::neon_node::{StacksNode, BLOCK_PROCESSOR_STACK_SIZE, RELAYER_MAX_BUFFER}; use crate::node::{ get_account_balances, get_account_lockups, get_names, get_namespaces, @@ -176,6 +176,7 @@ pub struct RunLoop { /// NOTE: this is duplicated in self.globals, but it needs to be accessible before globals is /// instantiated (namely, so the test framework can access it). miner_status: Arc>, + monitoring_thread: Option>>, } /// Write to stderr in an async-safe manner. @@ -224,6 +225,7 @@ impl RunLoop { burnchain: None, pox_watchdog_comms, miner_status, + monitoring_thread: None, } } @@ -614,16 +616,22 @@ impl RunLoop { /// Start Prometheus logging fn start_prometheus(&mut self) { - let prometheus_bind = self.config.node.prometheus_bind.clone(); - if let Some(prometheus_bind) = prometheus_bind { - thread::Builder::new() - .name("prometheus".to_string()) - .spawn(move || { - debug!("prometheus thread ID is {:?}", thread::current().id()); - start_serving_monitoring_metrics(prometheus_bind); - }) - .unwrap(); - } + let Some(prometheus_bind) = self.config.node.prometheus_bind.clone() else { + return; + }; + let monitoring_thread = thread::Builder::new() + .name("prometheus".to_string()) + .spawn(move || { + debug!("prometheus thread ID is {:?}", thread::current().id()); + start_serving_monitoring_metrics(prometheus_bind) + }) + .expect("FATAL: failed to start monitoring thread"); + + self.monitoring_thread.replace(monitoring_thread); + } + + pub fn take_monitoring_thread(&mut self) -> Option>> { + self.monitoring_thread.take() } /// Get the sortition DB's highest block height, aligned to a reward cycle boundary, and the diff --git a/testnet/stacks-node/src/tests/nakamoto_integrations.rs b/testnet/stacks-node/src/tests/nakamoto_integrations.rs index 9825bc449..ca464cb10 100644 --- a/testnet/stacks-node/src/tests/nakamoto_integrations.rs +++ b/testnet/stacks-node/src/tests/nakamoto_integrations.rs @@ -370,6 +370,8 @@ fn simple_neon_integration() { } let (mut naka_conf, _miner_account) = naka_neon_integration_conf(None); + let prom_bind = format!("{}:{}", "127.0.0.1", 6000); + naka_conf.node.prometheus_bind = Some(prom_bind.clone()); naka_conf.miner.wait_on_interim_blocks = Duration::from_secs(1000); let sender_sk = Secp256k1PrivateKey::new(); // setup sender + recipient for a test stx transfer @@ -437,6 +439,21 @@ fn simple_neon_integration() { .unwrap() .stacks_block_height; + // query for prometheus metrics + #[cfg(feature = "monitoring_prom")] + { + let prom_http_origin = format!("http://{}", prom_bind); + let client = reqwest::blocking::Client::new(); + let res = client + .get(&prom_http_origin) + .send() + .unwrap() + .text() + .unwrap(); + let expected_result = format!("stacks_node_stacks_tip_height {block_height_pre_3_0}"); + assert!(res.contains(&expected_result)); + } + info!("Nakamoto miner started..."); // first block wakes up the run loop, wait until a key registration has been submitted. next_block_and(&mut btc_regtest_controller, 60, || { @@ -529,6 +546,21 @@ fn simple_neon_integration() { assert!(tip.anchored_header.as_stacks_nakamoto().is_some()); assert!(tip.stacks_block_height >= block_height_pre_3_0 + 30); + // make sure prometheus returns an updated height + #[cfg(feature = "monitoring_prom")] + { + let prom_http_origin = format!("http://{}", prom_bind); + let client = reqwest::blocking::Client::new(); + let res = client + .get(&prom_http_origin) + .send() + .unwrap() + .text() + .unwrap(); + let expected_result = format!("stacks_node_stacks_tip_height {}", tip.stacks_block_height); + assert!(res.contains(&expected_result)); + } + coord_channel .lock() .expect("Mutex poisoned")