feat: add metrics to /ping response of event observer server (#297)

Fixes #285
This commit is contained in:
Micaiah Reid
2023-06-30 09:24:13 -04:00
committed by GitHub
parent a2d3b1493a
commit 0e1ee7c1ee
5 changed files with 332 additions and 8 deletions

View File

@@ -216,7 +216,7 @@ impl Service {
match event {
ObserverEvent::PredicateRegistered(spec) => {
// If start block specified, use it.
// I no start block specified, depending on the nature the hook, we'd like to retrieve:
// If no start block specified, depending on the nature the hook, we'd like to retrieve:
// - contract-id
if let PredicatesApi::On(ref config) = self.config.http_api {
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)

View File

@@ -8,7 +8,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = "1"
serde = {version = "1", features = ["rc"]}
serde_json = { version = "1", features = ["arbitrary_precision"] }
serde-hex = "0.1.0"
serde_derive = "1"

View File

@@ -8,16 +8,19 @@ use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex, RwLock};
use super::{
BitcoinConfig, BitcoinRPCRequest, MempoolAdmissionData, ObserverCommand,
BitcoinConfig, BitcoinRPCRequest, MempoolAdmissionData, ObserverCommand, ObserverMetrics,
StacksChainMempoolEvent,
};
#[rocket::get("/ping", format = "application/json")]
pub fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
pub fn handle_ping(
ctx: &State<Context>,
metrics_rw_lock: &State<Arc<RwLock<ObserverMetrics>>>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "GET /ping"));
Json(json!({
"status": 200,
"result": "Ok",
"result": metrics_rw_lock.inner(),
}))
}

View File

@@ -46,7 +46,7 @@ use std::str;
use std::str::FromStr;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[cfg(feature = "zeromq")]
use zeromq::{Socket, SocketRecv};
@@ -359,6 +359,35 @@ impl ChainhookStore {
}
}
#[derive(Debug, Default, Serialize, Clone)]
pub struct ReorgMetrics {
timestamp: i64,
applied_blocks: usize,
rolled_back_blocks: usize,
}
#[derive(Debug, Default, Serialize, Clone)]
pub struct ChainMetrics {
pub tip_height: u64,
pub last_reorg: Option<ReorgMetrics>,
pub last_block_ingestion_at: u128,
pub registered_predicates: usize,
pub deregistered_predicates: usize,
}
impl ChainMetrics {
pub fn deregister_prediate(&mut self) {
self.registered_predicates -= 1;
self.deregistered_predicates += 1;
}
}
#[derive(Debug, Default, Serialize, Clone)]
pub struct ObserverMetrics {
pub bitcoin: ChainMetrics,
pub stacks: ChainMetrics,
}
pub async fn start_event_observer(
mut config: EventObserverConfig,
observer_commands_tx: Sender<ObserverCommand>,
@@ -409,6 +438,18 @@ pub async fn start_event_observer(
let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone()));
let observer_metrics = ObserverMetrics {
bitcoin: ChainMetrics {
registered_predicates: chainhook_store.predicates.bitcoin_chainhooks.len(),
..Default::default()
},
stacks: ChainMetrics {
registered_predicates: chainhook_store.predicates.stacks_chainhooks.len(),
..Default::default()
},
};
let observer_metrics_rw_lock = Arc::new(RwLock::new(observer_metrics));
let limits = Limits::default().limit("json", 20.megabytes());
let mut shutdown_config = config::Shutdown::default();
shutdown_config.ctrlc = false;
@@ -451,6 +492,7 @@ pub async fn start_event_observer(
.manage(background_job_tx_mutex)
.manage(bitcoin_config)
.manage(ctx_cloned)
.manage(observer_metrics_rw_lock.clone())
.mount("/", routes)
.ignite()
.await?;
@@ -470,6 +512,7 @@ pub async fn start_event_observer(
observer_commands_rx,
observer_events_tx,
ingestion_shutdown,
observer_metrics_rw_lock.clone(),
ctx,
)
.await
@@ -653,6 +696,7 @@ pub async fn start_observer_commands_handler(
observer_commands_rx: Receiver<ObserverCommand>,
observer_events_tx: Option<crossbeam_channel::Sender<ObserverEvent>>,
ingestion_shutdown: Option<Shutdown>,
observer_metrics: Arc<RwLock<ObserverMetrics>>,
ctx: Context,
) -> Result<(), Box<dyn Error>> {
let mut chainhooks_occurrences_tracker: HashMap<String, u64> = HashMap::new();
@@ -728,6 +772,21 @@ pub async fn start_observer_commands_handler(
}
};
};
match observer_metrics.write() {
Ok(mut metrics) => {
if new_block.block_identifier.index > metrics.bitcoin.tip_height {
metrics.bitcoin.tip_height = new_block.block_identifier.index;
}
metrics.bitcoin.last_block_ingestion_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Could not get current time in ms")
.as_millis()
.into();
}
Err(e) => ctx.try_log(|logger| {
slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e)
}),
};
bitcoin_block_store.insert(new_block.block_identifier.clone(), new_block);
}
ObserverCommand::CacheBitcoinBlock(block) => {
@@ -974,6 +1033,29 @@ pub async fn start_observer_commands_handler(
}
}
match blocks_to_apply
.iter()
.max_by_key(|b| b.block_identifier.index)
{
Some(highest_tip_block) => match observer_metrics.write() {
Ok(mut metrics) => {
metrics.bitcoin.last_reorg = Some(ReorgMetrics {
timestamp: highest_tip_block.timestamp.into(),
applied_blocks: blocks_to_apply.len(),
rolled_back_blocks: blocks_to_rollback.len(),
});
}
Err(e) => ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to acquire observer_metrics_rw_lock:{}",
e
)
}),
},
None => {}
}
BitcoinChainEvent::ChainUpdatedWithReorg(BitcoinChainUpdatedWithReorgData {
blocks_to_apply,
blocks_to_rollback,
@@ -1108,6 +1190,17 @@ pub async fn start_observer_commands_handler(
ChainhookSpecification::Bitcoin(chainhook),
));
}
match observer_metrics.write() {
Ok(mut metrics) => metrics.bitcoin.deregister_prediate(),
Err(e) => ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to acquire observer_metrics_rw_lock:{}",
e
)
}),
}
}
}
@@ -1157,6 +1250,66 @@ pub async fn start_observer_commands_handler(
stacks_chainhooks.len()
)
});
// track stacks chain metrics
match &chain_event {
StacksChainEvent::ChainUpdatedWithBlocks(update) => {
match update
.new_blocks
.iter()
.max_by_key(|b| b.block.block_identifier.index)
{
Some(highest_tip_update) => match observer_metrics.write() {
Ok(mut metrics) => {
if highest_tip_update.block.block_identifier.index
> metrics.stacks.tip_height
{
metrics.stacks.tip_height =
highest_tip_update.block.block_identifier.index;
}
metrics.stacks.last_block_ingestion_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Could not get current time in ms")
.as_millis()
.into();
}
Err(e) => ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to acquire observer_metrics_rw_lock:{}",
e
)
}),
},
None => {}
}
}
StacksChainEvent::ChainUpdatedWithReorg(update) => {
match update
.blocks_to_apply
.iter()
.max_by_key(|b| b.block.block_identifier.index)
{
Some(highest_tip_update) => match observer_metrics.write() {
Ok(mut metrics) => {
metrics.stacks.last_reorg = Some(ReorgMetrics {
timestamp: highest_tip_update.block.timestamp.into(),
applied_blocks: update.blocks_to_apply.len(),
rolled_back_blocks: update.blocks_to_rollback.len(),
});
}
Err(e) => ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to acquire observer_metrics_rw_lock:{}",
e
)
}),
},
None => {}
}
}
_ => {}
}
// process hooks
let (predicates_triggered, predicates_evaluated) =
@@ -1241,6 +1394,17 @@ pub async fn start_observer_commands_handler(
ChainhookSpecification::Stacks(chainhook),
));
}
match observer_metrics.write() {
Ok(mut metrics) => metrics.stacks.deregister_prediate(),
Err(e) => ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to acquire observer_metrics_rw_lock:{}",
e
)
}),
}
}
}
@@ -1286,7 +1450,7 @@ pub async fn start_observer_commands_handler(
.predicates
.register_full_specification(networks, spec)
{
Ok(uuid) => uuid,
Ok(spec) => spec,
Err(e) => {
ctx.try_log(|logger| {
slog::error!(
@@ -1300,11 +1464,25 @@ pub async fn start_observer_commands_handler(
};
ctx.try_log(|logger| slog::info!(logger, "Registering chainhook {}", spec.uuid(),));
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::PredicateRegistered(spec));
let _ = tx.send(ObserverEvent::PredicateRegistered(spec.clone()));
} else {
ctx.try_log(|logger| slog::info!(logger, "Enabling Predicate {}", spec.uuid()));
chainhook_store.predicates.enable_specification(&mut spec);
}
match observer_metrics.write() {
Ok(mut metrics) => match spec {
ChainhookSpecification::Bitcoin(_) => {
metrics.bitcoin.registered_predicates += 1
}
ChainhookSpecification::Stacks(_) => {
metrics.stacks.registered_predicates += 1
}
},
Err(e) => ctx.try_log(|logger| {
slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e)
}),
};
}
ObserverCommand::EnablePredicate(mut spec) => {
ctx.try_log(|logger| slog::info!(logger, "Enabling Predicate {}", spec.uuid()));
@@ -1323,6 +1501,13 @@ pub async fn start_observer_commands_handler(
ChainhookSpecification::Stacks(hook),
));
}
match observer_metrics.write() {
Ok(mut metrics) => metrics.stacks.deregister_prediate(),
Err(e) => ctx.try_log(|logger| {
slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e)
}),
}
}
ObserverCommand::DeregisterBitcoinPredicate(hook_uuid) => {
ctx.try_log(|logger| {
@@ -1335,6 +1520,13 @@ pub async fn start_observer_commands_handler(
let _ = tx.send(ObserverEvent::PredicateDeregistered(
ChainhookSpecification::Bitcoin(hook),
));
match observer_metrics.write() {
Ok(mut metrics) => metrics.bitcoin.deregister_prediate(),
Err(e) => ctx.try_log(|logger| {
slog::warn!(logger, "unable to acquire observer_metrics_rw_lock:{}", e)
}),
}
}
}
}

View File

@@ -11,6 +11,7 @@ use crate::indexer::tests::helpers::{
};
use crate::observer::{
start_observer_commands_handler, ChainhookStore, EventObserverConfig, ObserverCommand,
ObserverMetrics,
};
use crate::utils::{AbstractBlock, Context};
use chainhook_types::{
@@ -20,6 +21,7 @@ use chainhook_types::{
use hiro_system_kit;
use std::collections::BTreeMap;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, RwLock};
use super::ObserverEvent;
@@ -201,6 +203,8 @@ fn generate_and_register_new_bitcoin_chainhook(
fn test_stacks_chainhook_register_deregister() {
let (observer_commands_tx, observer_commands_rx) = channel();
let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded();
let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default()));
let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone();
let handle = std::thread::spawn(move || {
let (config, chainhook_store) = generate_test_config();
@@ -210,6 +214,7 @@ fn test_stacks_chainhook_register_deregister() {
observer_commands_rx,
Some(observer_events_tx),
None,
observer_metrics_rw_lock_moved,
Context::empty(),
));
});
@@ -223,6 +228,16 @@ fn test_stacks_chainhook_register_deregister() {
"increment",
);
// registering stacks chainhook should increment the observer_metric's registered stacks hooks
assert_eq!(
1,
observer_metrics_rw_lock
.read()
.unwrap()
.stacks
.registered_predicates
);
// Simulate a block that does not include a trigger
let transactions = vec![generate_test_tx_stacks_contract_call(
0,
@@ -373,6 +388,25 @@ fn test_stacks_chainhook_register_deregister() {
_ => false,
});
// deregistering stacks chainhook should decrement the observer_metric's registered stacks hooks
assert_eq!(
0,
observer_metrics_rw_lock
.read()
.unwrap()
.stacks
.registered_predicates
);
// and increment the deregistered hooks
assert_eq!(
1,
observer_metrics_rw_lock
.read()
.unwrap()
.stacks
.deregistered_predicates
);
// Simulate a block that does not include a trigger
let transactions = vec![generate_test_tx_stacks_contract_call(
2,
@@ -443,6 +477,8 @@ fn test_stacks_chainhook_register_deregister() {
fn test_stacks_chainhook_auto_deregister() {
let (observer_commands_tx, observer_commands_rx) = channel();
let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded();
let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default()));
let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone();
let handle = std::thread::spawn(move || {
let (config, chainhook_store) = generate_test_config();
@@ -452,6 +488,7 @@ fn test_stacks_chainhook_auto_deregister() {
observer_commands_rx,
Some(observer_events_tx),
None,
observer_metrics_rw_lock_moved,
Context::empty(),
));
});
@@ -479,6 +516,15 @@ fn test_stacks_chainhook_auto_deregister() {
}
_ => false,
});
// registering stacks chainhook should increment the observer_metric's registered stacks hooks
assert_eq!(
1,
observer_metrics_rw_lock
.read()
.unwrap()
.stacks
.registered_predicates
);
// Simulate a block that does not include a trigger
let transactions = vec![generate_test_tx_stacks_contract_call(
@@ -591,6 +637,25 @@ fn test_stacks_chainhook_auto_deregister() {
_ => false,
});
// deregistering stacks chainhook should decrement the observer_metric's registered stacks hooks
assert_eq!(
0,
observer_metrics_rw_lock
.read()
.unwrap()
.stacks
.registered_predicates
);
// and increment the deregistered hooks
assert_eq!(
1,
observer_metrics_rw_lock
.read()
.unwrap()
.stacks
.deregistered_predicates
);
// Should propagate block
assert!(match observer_events_rx.recv() {
Ok(ObserverEvent::StacksChainEvent(_)) => {
@@ -614,6 +679,8 @@ fn test_stacks_chainhook_auto_deregister() {
fn test_bitcoin_chainhook_register_deregister() {
let (observer_commands_tx, observer_commands_rx) = channel();
let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded();
let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default()));
let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone();
let handle = std::thread::spawn(move || {
let (config, chainhook_store) = generate_test_config();
@@ -623,6 +690,7 @@ fn test_bitcoin_chainhook_register_deregister() {
observer_commands_rx,
Some(observer_events_tx),
None,
observer_metrics_rw_lock_moved,
Context::empty(),
));
});
@@ -636,6 +704,16 @@ fn test_bitcoin_chainhook_register_deregister() {
None,
);
// registering bitcoin chainhook should increment the observer_metric's registered bitcoin hooks
assert_eq!(
1,
observer_metrics_rw_lock
.read()
.unwrap()
.bitcoin
.registered_predicates
);
// Simulate a block that does not include a trigger (wallet_1 to wallet_3)
let transactions = vec![generate_test_tx_bitcoin_p2pkh_transfer(
0,
@@ -785,6 +863,25 @@ fn test_bitcoin_chainhook_register_deregister() {
_ => false,
});
// deregistering bitcoin chainhook should decrement the observer_metric's registered bitcoin hooks
assert_eq!(
0,
observer_metrics_rw_lock
.read()
.unwrap()
.bitcoin
.registered_predicates
);
// and increment the deregistered hooks
assert_eq!(
1,
observer_metrics_rw_lock
.read()
.unwrap()
.bitcoin
.deregistered_predicates
);
// Simulate a block that does not include a trigger
let transactions = vec![generate_test_tx_bitcoin_p2pkh_transfer(
2,
@@ -854,6 +951,8 @@ fn test_bitcoin_chainhook_register_deregister() {
fn test_bitcoin_chainhook_auto_deregister() {
let (observer_commands_tx, observer_commands_rx) = channel();
let (observer_events_tx, observer_events_rx) = crossbeam_channel::unbounded();
let observer_metrics_rw_lock = Arc::new(RwLock::new(ObserverMetrics::default()));
let observer_metrics_rw_lock_moved = observer_metrics_rw_lock.clone();
let handle = std::thread::spawn(move || {
let (config, chainhook_store) = generate_test_config();
@@ -863,6 +962,7 @@ fn test_bitcoin_chainhook_auto_deregister() {
observer_commands_rx,
Some(observer_events_tx),
None,
observer_metrics_rw_lock_moved,
Context::empty(),
));
});
@@ -876,6 +976,16 @@ fn test_bitcoin_chainhook_auto_deregister() {
Some(1),
);
// registering bitcoin chainhook should increment the observer_metric's registered bitcoin hooks
assert_eq!(
1,
observer_metrics_rw_lock
.read()
.unwrap()
.bitcoin
.registered_predicates
);
// Simulate a block that does not include a trigger (wallet_1 to wallet_3)
let transactions = vec![generate_test_tx_bitcoin_p2pkh_transfer(
0,
@@ -1012,6 +1122,25 @@ fn test_bitcoin_chainhook_auto_deregister() {
_ => false,
});
// deregistering bitcoin chainhook should decrement the observer_metric's registered bitcoin hooks
assert_eq!(
0,
observer_metrics_rw_lock
.read()
.unwrap()
.bitcoin
.registered_predicates
);
// and increment the deregistered hooks
assert_eq!(
1,
observer_metrics_rw_lock
.read()
.unwrap()
.bitcoin
.deregistered_predicates
);
// Should propagate block
assert!(match observer_events_rx.recv() {
Ok(ObserverEvent::BitcoinChainEvent(_)) => {