fix: do not panic async task on prometheus bind failure, pass prom thread to naka runloop

This commit is contained in:
Aaron Blankstein
2024-01-02 13:36:32 -06:00
parent fa6a3a69b9
commit b6f86693b4
6 changed files with 109 additions and 42 deletions

View File

@@ -5,8 +5,19 @@ pub use stacks::monitoring::{increment_errors_emitted_counter, increment_warning
#[cfg(feature = "monitoring_prom")]
mod prometheus;
pub fn start_serving_monitoring_metrics(bind_address: String) {
info!("Start serving prometheus metrics");
#[cfg(feature = "monitoring_prom")]
prometheus::start_serving_prometheus_metrics(bind_address);
#[derive(Debug)]
pub enum MonitoringError {
AlreadyBound,
UnableToGetAddress,
}
#[cfg(feature = "monitoring_prom")]
pub fn start_serving_monitoring_metrics(bind_address: String) -> Result<(), MonitoringError> {
prometheus::start_serving_prometheus_metrics(bind_address)
}
#[cfg(not(feature = "monitoring_prom"))]
pub fn start_serving_monitoring_metrics(bind_address: String) -> Result<(), MonitoringError> {
warn!("Attempted to start monitoring service at bind_address = {bind_address}, but stacks-node was built without `monitoring_prom` feature.");
Ok(())
}

View File

@@ -4,20 +4,26 @@ use async_std::task;
use http_types::{Body, Response, StatusCode};
use stacks::prometheus::{gather, Encoder, TextEncoder};
pub fn start_serving_prometheus_metrics(bind_address: String) {
let addr = bind_address.clone();
use super::MonitoringError;
async_std::task::block_on(async {
let listener = TcpListener::bind(addr)
pub fn start_serving_prometheus_metrics(bind_address: String) -> Result<(), MonitoringError> {
task::block_on(async {
let listener = TcpListener::bind(bind_address)
.await
.expect("Prometheus monitoring: unable to bind address");
let addr = format!(
"http://{}",
listener
.map_err(|_| {
warn!("Prometheus monitoring: unable to bind address, will not spawn prometheus endpoint service.");
MonitoringError::AlreadyBound
})?;
let local_addr = listener
.local_addr()
.expect("Prometheus monitoring: unable to get addr")
.map_err(|_| {
warn!("Prometheus monitoring: unable to get local bind address, will not spawn prometheus endpoint service.");
MonitoringError::UnableToGetAddress
})?;
info!(
"Prometheus monitoring: server listening on http://{}",
local_addr
);
info!("Prometheus monitoring: server listening on {}", addr);
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
@@ -25,21 +31,20 @@ pub fn start_serving_prometheus_metrics(bind_address: String) {
Ok(stream) => stream,
Err(err) => {
error!(
"Prometheus monitoring: unable to open socket and serve metrics - {:?}",
err
"Prometheus monitoring: unable to open socket and serve metrics - {err:?}",
);
continue;
}
};
let addr = addr.clone();
task::spawn(async {
if let Err(err) = accept(stream).await {
eprintln!("{}", err);
error!("{err}");
}
});
}
});
Ok::<_, MonitoringError>(())
})
}
async fn accept(stream: TcpStream) -> http_types::Result<()> {

View File

@@ -54,7 +54,7 @@ impl BootRunLoop {
InnerLoops::Epoch2(neon),
)
} else {
let naka = NakaRunLoop::new(config.clone(), None, None);
let naka = NakaRunLoop::new(config.clone(), None, None, None);
(
naka.get_coordinator_channel().unwrap(),
InnerLoops::Epoch3(naka),
@@ -122,6 +122,7 @@ impl BootRunLoop {
.expect("FATAL: failed to spawn epoch-2/3-boot thread");
neon_loop.start(burnchain_opt.clone(), mine_start);
let monitoring_thread = neon_loop.take_monitoring_thread();
// did we exit because of the epoch-3.0 transition, or some other reason?
let exited_for_transition = boot_thread
.join()
@@ -136,6 +137,7 @@ impl BootRunLoop {
self.config.clone(),
Some(termination_switch),
Some(counters),
monitoring_thread,
);
let new_coord_channels = naka
.get_coordinator_channel()

View File

@@ -37,7 +37,7 @@ use stx_genesis::GenesisData;
use crate::burnchains::make_bitcoin_indexer;
use crate::globals::Globals as GenericGlobals;
use crate::monitoring::start_serving_monitoring_metrics;
use crate::monitoring::{start_serving_monitoring_metrics, MonitoringError};
use crate::nakamoto_node::{self, StacksNode, BLOCK_PROCESSOR_STACK_SIZE, RELAYER_MAX_BUFFER};
use crate::node::{
get_account_balances, get_account_lockups, get_names, get_namespaces,
@@ -69,6 +69,7 @@ pub struct RunLoop {
/// NOTE: this is duplicated in self.globals, but it needs to be accessible before globals is
/// instantiated (namely, so the test framework can access it).
miner_status: Arc<Mutex<MinerStatus>>,
monitoring_thread: Option<JoinHandle<Result<(), MonitoringError>>>,
}
impl RunLoop {
@@ -77,6 +78,7 @@ impl RunLoop {
config: Config,
should_keep_running: Option<Arc<AtomicBool>>,
counters: Option<Counters>,
monitoring_thread: Option<JoinHandle<Result<(), MonitoringError>>>,
) -> Self {
let channels = CoordinatorCommunication::instantiate();
let should_keep_running =
@@ -103,6 +105,7 @@ impl RunLoop {
burnchain: None,
pox_watchdog_comms,
miner_status,
monitoring_thread,
}
}
@@ -333,16 +336,22 @@ impl RunLoop {
/// Start Prometheus logging
fn start_prometheus(&mut self) {
let prometheus_bind = self.config.node.prometheus_bind.clone();
if let Some(prometheus_bind) = prometheus_bind {
thread::Builder::new()
.name("prometheus".to_string())
.spawn(move || {
debug!("prometheus thread ID is {:?}", thread::current().id());
start_serving_monitoring_metrics(prometheus_bind);
})
.unwrap();
if self.monitoring_thread.is_some() {
info!("Monitoring thread already running, nakamoto run-loop will not restart it");
return;
}
let Some(prometheus_bind) = self.config.node.prometheus_bind.clone() else {
return;
};
let monitoring_thread = thread::Builder::new()
.name("prometheus".to_string())
.spawn(move || {
debug!("prometheus thread ID is {:?}", thread::current().id());
start_serving_monitoring_metrics(prometheus_bind)
})
.expect("FATAL: failed to start monitoring thread");
self.monitoring_thread.replace(monitoring_thread);
}
/// Get the sortition DB's highest block height, aligned to a reward cycle boundary, and the

View File

@@ -33,7 +33,7 @@ use stx_genesis::GenesisData;
use super::RunLoopCallbacks;
use crate::burnchains::make_bitcoin_indexer;
use crate::globals::NeonGlobals as Globals;
use crate::monitoring::start_serving_monitoring_metrics;
use crate::monitoring::{start_serving_monitoring_metrics, MonitoringError};
use crate::neon_node::{StacksNode, BLOCK_PROCESSOR_STACK_SIZE, RELAYER_MAX_BUFFER};
use crate::node::{
get_account_balances, get_account_lockups, get_names, get_namespaces,
@@ -176,6 +176,7 @@ pub struct RunLoop {
/// NOTE: this is duplicated in self.globals, but it needs to be accessible before globals is
/// instantiated (namely, so the test framework can access it).
miner_status: Arc<Mutex<MinerStatus>>,
monitoring_thread: Option<JoinHandle<Result<(), MonitoringError>>>,
}
/// Write to stderr in an async-safe manner.
@@ -224,6 +225,7 @@ impl RunLoop {
burnchain: None,
pox_watchdog_comms,
miner_status,
monitoring_thread: None,
}
}
@@ -614,16 +616,22 @@ impl RunLoop {
/// Start Prometheus logging
fn start_prometheus(&mut self) {
let prometheus_bind = self.config.node.prometheus_bind.clone();
if let Some(prometheus_bind) = prometheus_bind {
thread::Builder::new()
.name("prometheus".to_string())
.spawn(move || {
debug!("prometheus thread ID is {:?}", thread::current().id());
start_serving_monitoring_metrics(prometheus_bind);
})
.unwrap();
}
let Some(prometheus_bind) = self.config.node.prometheus_bind.clone() else {
return;
};
let monitoring_thread = thread::Builder::new()
.name("prometheus".to_string())
.spawn(move || {
debug!("prometheus thread ID is {:?}", thread::current().id());
start_serving_monitoring_metrics(prometheus_bind)
})
.expect("FATAL: failed to start monitoring thread");
self.monitoring_thread.replace(monitoring_thread);
}
pub fn take_monitoring_thread(&mut self) -> Option<JoinHandle<Result<(), MonitoringError>>> {
self.monitoring_thread.take()
}
/// Get the sortition DB's highest block height, aligned to a reward cycle boundary, and the

View File

@@ -370,6 +370,8 @@ fn simple_neon_integration() {
}
let (mut naka_conf, _miner_account) = naka_neon_integration_conf(None);
let prom_bind = format!("{}:{}", "127.0.0.1", 6000);
naka_conf.node.prometheus_bind = Some(prom_bind.clone());
naka_conf.miner.wait_on_interim_blocks = Duration::from_secs(1000);
let sender_sk = Secp256k1PrivateKey::new();
// setup sender + recipient for a test stx transfer
@@ -437,6 +439,21 @@ fn simple_neon_integration() {
.unwrap()
.stacks_block_height;
// query for prometheus metrics
#[cfg(feature = "monitoring_prom")]
{
let prom_http_origin = format!("http://{}", prom_bind);
let client = reqwest::blocking::Client::new();
let res = client
.get(&prom_http_origin)
.send()
.unwrap()
.text()
.unwrap();
let expected_result = format!("stacks_node_stacks_tip_height {block_height_pre_3_0}");
assert!(res.contains(&expected_result));
}
info!("Nakamoto miner started...");
// first block wakes up the run loop, wait until a key registration has been submitted.
next_block_and(&mut btc_regtest_controller, 60, || {
@@ -529,6 +546,21 @@ fn simple_neon_integration() {
assert!(tip.anchored_header.as_stacks_nakamoto().is_some());
assert!(tip.stacks_block_height >= block_height_pre_3_0 + 30);
// make sure prometheus returns an updated height
#[cfg(feature = "monitoring_prom")]
{
let prom_http_origin = format!("http://{}", prom_bind);
let client = reqwest::blocking::Client::new();
let res = client
.get(&prom_http_origin)
.send()
.unwrap()
.text()
.unwrap();
let expected_result = format!("stacks_node_stacks_tip_height {}", tip.stacks_block_height);
assert!(res.contains(&expected_result));
}
coord_channel
.lock()
.expect("Mutex poisoned")