feat: add prometheus monitoring (#356)

* feat: add prometheus monitoring

* chore: progress

* fix: create object

* chore: port config

* chore: set initial metrics on start

* fix: monitor predicates

* test: add
This commit is contained in:
Rafael Cárdenas
2024-09-05 14:50:57 -06:00
committed by GitHub
parent 5692426e4b
commit f35e1d00e7
12 changed files with 304 additions and 43 deletions

1
Cargo.lock generated
View File

@@ -2513,6 +2513,7 @@ dependencies = [
"num_cpus",
"pprof",
"progressing",
"prometheus",
"rand",
"regex",
"reqwest",

View File

@@ -37,6 +37,7 @@ use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use ordhook::service::observers::initialize_observers_db;
use ordhook::service::{start_observer_forwarding, Service};
use ordhook::utils::bitcoind::bitcoind_get_block_height;
use ordhook::utils::monitoring::PrometheusMonitoring;
use ordhook::{hex, initialize_databases, try_error, try_info, try_warn};
use reqwest::Client as HttpClient;
use std::collections::HashSet;
@@ -883,8 +884,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
_ => None,
};
let blocks = cmd.get_blocks();
let inscription_indexing_processor =
start_inscription_indexing_processor(&config, ctx, block_post_processor);
let inscription_indexing_processor = start_inscription_indexing_processor(
&config,
ctx,
block_post_processor,
&PrometheusMonitoring::new(),
);
download_and_pipeline_blocks(
&config,

View File

@@ -1,13 +1,12 @@
use ordhook::chainhook_sdk::indexer::IndexerConfig;
use ordhook::chainhook_sdk::observer::DEFAULT_INGESTION_PORT;
use ordhook::chainhook_sdk::types::{
BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig,
};
use ordhook::config::{
Config, LogConfig, MetaProtocolsConfig, PredicatesApi, PredicatesApiConfig, ResourcesConfig,
SnapshotConfig, SnapshotConfigDownloadUrls, StorageConfig, DEFAULT_BITCOIND_RPC_THREADS,
DEFAULT_BITCOIND_RPC_TIMEOUT, DEFAULT_BRC20_LRU_CACHE_SIZE, DEFAULT_CONTROL_PORT,
DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
Config, IndexerConfig, LogConfig, MetaProtocolsConfig, PredicatesApi, PredicatesApiConfig,
ResourcesConfig, SnapshotConfig, SnapshotConfigDownloadUrls, StorageConfig,
DEFAULT_BITCOIND_RPC_THREADS, DEFAULT_BITCOIND_RPC_TIMEOUT, DEFAULT_BRC20_LRU_CACHE_SIZE,
DEFAULT_CONTROL_PORT, DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
};
use std::fs::File;
use std::io::{BufReader, Read};
@@ -43,7 +42,7 @@ impl ConfigFile {
}
pub fn from_config_file(config_file: ConfigFile) -> Result<Config, String> {
let (stacks_network, bitcoin_network) = match config_file.network.mode.as_str() {
let (_, bitcoin_network) = match config_file.network.mode.as_str() {
"devnet" => (StacksNetwork::Devnet, BitcoinNetwork::Regtest),
"testnet" => (StacksNetwork::Testnet, BitcoinNetwork::Testnet),
"mainnet" => (StacksNetwork::Mainnet, BitcoinNetwork::Mainnet),
@@ -115,14 +114,11 @@ impl ConfigFile {
bitcoin_block_signaling: match config_file.network.bitcoind_zmq_url {
Some(ref zmq_url) => BitcoinBlockSignaling::ZeroMQ(zmq_url.clone()),
None => BitcoinBlockSignaling::Stacks(StacksNodeConfig::default_localhost(
config_file
.network
.stacks_events_ingestion_port
.unwrap_or(DEFAULT_INGESTION_PORT),
DEFAULT_INGESTION_PORT,
)),
},
stacks_network,
bitcoin_network,
prometheus_monitoring_port: config_file.network.prometheus_monitoring_port,
},
logs: LogConfig {
ordinals_internals: config_file
@@ -220,6 +216,5 @@ pub struct NetworkConfigFile {
pub bitcoind_rpc_username: String,
pub bitcoind_rpc_password: String,
pub bitcoind_zmq_url: Option<String>,
pub stacks_node_rpc_url: Option<String>,
pub stacks_events_ingestion_port: Option<u16>,
pub prometheus_monitoring_port: Option<u16>,
}

View File

@@ -46,6 +46,7 @@ hyper = { version = "=0.14.27" }
lazy_static = { version = "1.4.0" }
ciborium = "0.2.1"
regex = "1.10.3"
prometheus = "0.13.3"
[dev-dependencies]
test-case = "3.1.0"

View File

@@ -1,5 +1,4 @@
use crate::core::OrdhookConfig;
pub use chainhook_sdk::indexer::IndexerConfig;
use chainhook_sdk::observer::EventObserverConfig;
use chainhook_sdk::types::{
BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig,
@@ -81,6 +80,16 @@ pub struct UrlConfig {
pub file_url: String,
}
#[derive(Debug, Clone)]
pub struct IndexerConfig {
pub bitcoin_network: BitcoinNetwork,
pub bitcoind_rpc_url: String,
pub bitcoind_rpc_username: String,
pub bitcoind_rpc_password: String,
pub bitcoin_block_signaling: BitcoinBlockSignaling,
pub prometheus_monitoring_port: Option<u16>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct ResourcesConfig {
pub ulimit: usize,
@@ -136,7 +145,7 @@ impl Config {
display_logs: false,
cache_path: self.storage.working_dir.clone(),
bitcoin_network: self.network.bitcoin_network.clone(),
stacks_network: self.network.stacks_network.clone(),
stacks_network: StacksNetwork::Devnet,
prometheus_monitoring_port: None,
data_handler_tx: None,
}
@@ -192,8 +201,8 @@ impl Config {
bitcoin_block_signaling: BitcoinBlockSignaling::Stacks(
StacksNodeConfig::default_localhost(DEFAULT_INGESTION_PORT),
),
stacks_network: StacksNetwork::Devnet,
bitcoin_network: BitcoinNetwork::Regtest,
prometheus_monitoring_port: None,
},
logs: LogConfig {
ordinals_internals: true,
@@ -227,8 +236,8 @@ impl Config {
bitcoin_block_signaling: BitcoinBlockSignaling::Stacks(
StacksNodeConfig::default_localhost(DEFAULT_INGESTION_PORT),
),
stacks_network: StacksNetwork::Testnet,
bitcoin_network: BitcoinNetwork::Testnet,
prometheus_monitoring_port: Some(9153),
},
logs: LogConfig {
ordinals_internals: true,
@@ -265,8 +274,8 @@ impl Config {
bitcoin_block_signaling: BitcoinBlockSignaling::Stacks(
StacksNodeConfig::default_localhost(DEFAULT_INGESTION_PORT),
),
stacks_network: StacksNetwork::Mainnet,
bitcoin_network: BitcoinNetwork::Mainnet,
prometheus_monitoring_port: Some(9153),
},
logs: LogConfig {
ordinals_internals: true,

View File

@@ -39,11 +39,12 @@ use crate::{
OrdhookConfig,
},
db::{
get_any_entry_in_ordinal_activities, open_ordhook_db_conn_rocks_db_loop,
open_readonly_ordhook_db_conn,
get_any_entry_in_ordinal_activities, get_latest_indexed_inscription_number,
open_ordhook_db_conn_rocks_db_loop, open_readonly_ordhook_db_conn,
},
service::write_brc20_block_operations,
try_error, try_info,
utils::monitoring::PrometheusMonitoring,
};
use crate::db::{TransactionBytesCursor, TraversalResult};
@@ -61,12 +62,14 @@ pub fn start_inscription_indexing_processor(
config: &Config,
ctx: &Context,
post_processor: Option<Sender<BitcoinBlockData>>,
prometheus: &PrometheusMonitoring,
) -> PostProcessorController {
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();
let config = config.clone();
let ctx = ctx.clone();
let prometheus = prometheus.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Inscription indexing runloop")
.spawn(move || {
let cache_l2 = Arc::new(new_traversals_lazy_cache(2048));
@@ -143,6 +146,7 @@ pub fn start_inscription_indexing_processor(
&mut brc20_db_conn_rw,
&ordhook_config,
&post_processor,
&prometheus,
&ctx,
);
@@ -181,6 +185,7 @@ pub fn process_blocks(
brc20_db_conn_rw: &mut Option<Connection>,
ordhook_config: &OrdhookConfig,
post_processor: &Option<Sender<BitcoinBlockData>>,
prometheus: &PrometheusMonitoring,
ctx: &Context,
) -> Vec<BitcoinBlockData> {
let mut cache_l1 = BTreeMap::new();
@@ -216,6 +221,7 @@ pub fn process_blocks(
&inscriptions_db_tx,
brc20_db_tx.as_ref(),
brc20_cache.as_mut(),
prometheus,
ordhook_config,
ctx,
);
@@ -284,6 +290,7 @@ pub fn process_block(
inscriptions_db_tx: &Transaction,
brc20_db_tx: Option<&Transaction>,
brc20_cache: Option<&mut Brc20MemoryCache>,
prometheus: &PrometheusMonitoring,
ordhook_config: &OrdhookConfig,
ctx: &Context,
) -> Result<(), String> {
@@ -307,7 +314,7 @@ pub fn process_block(
Context::empty()
};
// Handle inscriptions
// Inscriptions
if any_processable_transactions {
let _ = augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
block,
@@ -317,10 +324,9 @@ pub fn process_block(
&inner_ctx,
);
}
// Handle transfers
// Transfers
let _ = augment_block_with_ordinals_transfer_data(block, inscriptions_db_tx, true, &inner_ctx);
// BRC-20
match (brc20_db_tx, brc20_cache) {
(Some(brc20_db_tx), Some(brc20_cache)) => write_brc20_block_operations(
block,
@@ -332,5 +338,11 @@ pub fn process_block(
_ => {}
}
// Monitoring
prometheus.metrics_block_indexed(block.block_identifier.index);
prometheus.metrics_inscription_indexed(
get_latest_indexed_inscription_number(inscriptions_db_tx, &inner_ctx).unwrap_or(0),
);
Ok(())
}

View File

@@ -835,6 +835,12 @@ pub fn find_latest_transfers_block_height(db_conn: &Connection, ctx: &Context) -
entry
}
pub fn get_latest_indexed_inscription_number(db_conn: &Connection, ctx: &Context) -> Option<u64> {
let args: &[&dyn ToSql] = &[];
let query = "SELECT MAX(jubilee_inscription_number) FROM inscriptions";
perform_query_one(query, args, db_conn, ctx, |row| row.get(0).unwrap())
}
#[derive(Debug, Clone)]
pub struct TransferData {
pub inscription_offset_intra_output: u64,

View File

@@ -22,12 +22,10 @@ use rocket::{
use rocket::{response::status::Custom, State};
use crate::{
config::PredicatesApi,
service::observers::{
config::PredicatesApi, service::observers::{
insert_entry_in_observers, open_readwrite_observers_db_conn, remove_entry_from_observers,
update_observer_progress, update_observer_streaming_enabled,
},
try_error, try_info,
}, try_error, try_info, utils::monitoring::PrometheusMonitoring
};
use super::observers::{
@@ -39,6 +37,7 @@ pub async fn start_observers_http_server(
observer_commands_tx: &std::sync::mpsc::Sender<ObserverCommand>,
observer_event_rx: crossbeam_channel::Receiver<ObserverEvent>,
bitcoin_scan_op_tx: crossbeam_channel::Sender<BitcoinChainhookSpecification>,
prometheus: &PrometheusMonitoring,
ctx: &Context,
) -> Result<Shutdown, String> {
// Build and start HTTP server.
@@ -51,6 +50,7 @@ pub async fn start_observers_http_server(
// Spawn predicate observer event tread.
let moved_config = config.clone();
let moved_ctx = ctx.clone();
let moved_prometheus = prometheus.clone();
let _ = hiro_system_kit::thread_named("observers_api-events").spawn(move || loop {
let event = match observer_event_rx.recv() {
Ok(cmd) => cmd,
@@ -72,8 +72,10 @@ pub async fn start_observers_http_server(
};
let report = ObserverReport::default();
insert_entry_in_observers(&spec, &report, &observers_db_conn, &moved_ctx);
moved_prometheus.metrics_register_predicate();
match spec {
ChainhookSpecification::Bitcoin(predicate_spec) => {
// TODO: This action blocks this thread until the scan operation is complete. We should not do this.
let _ = bitcoin_scan_op_tx.send(predicate_spec);
}
_ => {}
@@ -109,6 +111,7 @@ pub async fn start_observers_http_server(
}
};
remove_entry_from_observers(&uuid, &observers_db_conn, &moved_ctx);
moved_prometheus.metrics_deregister_predicate();
}
ObserverEvent::BitcoinPredicateTriggered(data) => {
if let Some(ref tip) = data.apply.last() {
@@ -426,7 +429,7 @@ mod test {
use crate::{
config::{Config, PredicatesApi, PredicatesApiConfig},
service::observers::{delete_observers_db, initialize_observers_db},
service::observers::{delete_observers_db, initialize_observers_db}, utils::monitoring::PrometheusMonitoring,
};
use super::start_observers_http_server;
@@ -448,6 +451,7 @@ mod test {
&observer_command_tx,
observer_event_rx,
bitcoin_scan_op_tx,
&PrometheusMonitoring::new(),
&ctx,
)
.await

View File

@@ -23,7 +23,8 @@ use crate::core::protocol::inscription_parsing::{
use crate::core::protocol::inscription_sequencing::SequenceCursor;
use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db, should_sync_rocks_db};
use crate::db::{
delete_data_in_ordhook_db, find_latest_inscription_block_height, insert_entry_in_blocks,
delete_data_in_ordhook_db, find_latest_inscription_block_height,
get_latest_indexed_inscription_number, insert_entry_in_blocks,
open_ordhook_db_conn_rocks_db_loop, open_readonly_ordhook_db_conn, open_readwrite_ordhook_dbs,
update_ordinals_db_with_block, BlockBytesCursor, TransactionBytesCursor,
};
@@ -31,6 +32,7 @@ use crate::db::{find_missing_blocks, run_compaction, update_sequence_metadata_wi
use crate::scan::bitcoin::process_block_with_predicates;
use crate::service::observers::create_and_consolidate_chainhook_config_with_predicates;
use crate::service::runloops::start_bitcoin_scan_runloop;
use crate::utils::monitoring::{start_serving_prometheus_metrics, PrometheusMonitoring};
use crate::{try_debug, try_error, try_info};
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookOccurrencePayload;
use chainhook_sdk::chainhooks::types::{
@@ -59,13 +61,18 @@ use std::sync::mpsc::channel;
use std::sync::Arc;
pub struct Service {
pub prometheus: PrometheusMonitoring,
pub config: Config,
pub ctx: Context,
}
impl Service {
pub fn new(config: Config, ctx: Context) -> Self {
Self { config, ctx }
Self {
prometheus: PrometheusMonitoring::new(),
config,
ctx,
}
}
pub async fn run(
@@ -77,8 +84,29 @@ impl Service {
check_blocks_integrity: bool,
stream_indexing_to_observers: bool,
) -> Result<(), String> {
let mut event_observer_config = self.config.get_event_observer_config();
// Start Prometheus monitoring server.
if let Some(port) = self.config.network.prometheus_monitoring_port {
let registry_moved = self.prometheus.registry.clone();
let ctx_cloned = self.ctx.clone();
let _ = std::thread::spawn(move || {
let _ = hiro_system_kit::nestable_block_on(start_serving_prometheus_metrics(
port,
registry_moved,
ctx_cloned,
));
});
}
let ordhook_db =
open_readonly_ordhook_db_conn(&self.config.expected_cache_path(), &self.ctx)
.expect("unable to retrieve ordhook db");
self.prometheus.initialize(
0,
get_latest_indexed_inscription_number(&ordhook_db, &self.ctx).unwrap_or(0),
find_latest_inscription_block_height(&ordhook_db, &self.ctx)?.unwrap_or(0),
);
// Catch-up with chain tip.
let mut event_observer_config = self.config.get_event_observer_config();
let block_post_processor = if stream_indexing_to_observers && !observer_specs.is_empty() {
let mut chainhook_config: ChainhookConfig = ChainhookConfig::new();
let specs = observer_specs.clone();
@@ -93,8 +121,6 @@ impl Service {
} else {
None
};
// Catch-up with chain tip
self.catch_up_with_chain_tip(false, check_blocks_integrity, block_post_processor)
.await?;
try_info!(
@@ -116,18 +142,14 @@ impl Service {
};
// Observers handling
let ordhook_db =
open_readonly_ordhook_db_conn(&self.config.expected_cache_path(), &self.ctx)
.expect("unable to retrieve ordhook db");
let chain_tip_height =
find_latest_inscription_block_height(&ordhook_db, &self.ctx)?.unwrap();
// 1) update event_observer_config with observers ready to be used
// 2) catch-up outdated observers by dispatching replays
let (chainhook_config, outdated_observers) =
create_and_consolidate_chainhook_config_with_predicates(
observer_specs,
chain_tip_height,
find_latest_inscription_block_height(&ordhook_db, &self.ctx)?.unwrap(),
predicate_activity_relayer.is_some(),
&self.prometheus,
&self.config,
&self.ctx,
)?;
@@ -160,6 +182,7 @@ impl Service {
Ok(())
}
// TODO: Deprecated? Only used by ordhook-sdk-js.
pub async fn start_event_observer(
&mut self,
observer_sidecar: ObserverSidecar,
@@ -175,6 +198,7 @@ impl Service {
vec![],
0,
true,
&self.prometheus,
&self.config,
&self.ctx,
)?;
@@ -273,12 +297,14 @@ impl Service {
let moved_ctx = self.ctx.clone();
let moved_observer_commands_tx = observer_command_tx.clone();
let moved_observer_event_rx = observer_event_rx.clone();
let moved_prometheus = self.prometheus.clone();
let _ = hiro_system_kit::thread_named("HTTP Observers API").spawn(move || {
let _ = hiro_system_kit::nestable_block_on(start_observers_http_server(
&moved_config,
&moved_observer_commands_tx,
moved_observer_event_rx,
bitcoin_scan_op_tx,
&moved_prometheus,
&moved_ctx,
));
});
@@ -305,6 +331,7 @@ impl Service {
Ok(())
}
// TODO: Deprecated? Only used by ordhook-sdk-js.
pub fn set_up_observer_config(
&self,
predicates: Vec<BitcoinChainhookSpecification>,
@@ -321,6 +348,7 @@ impl Service {
predicates,
0,
enable_internal_trigger,
&self.prometheus,
&self.config,
&self.ctx,
)?;
@@ -347,6 +375,7 @@ impl Service {
let mut brc20_cache = brc20_new_cache(&self.config);
let ctx = self.ctx.clone();
let config = self.config.clone();
let prometheus = self.prometheus.clone();
let _ = hiro_system_kit::thread_named("Observer Sidecar Runloop").spawn(move || loop {
select! {
@@ -357,6 +386,7 @@ impl Service {
&blocks_ids_to_rollback,
&cache_l2,
&mut brc20_cache,
&prometheus,
&config,
&ctx,
);
@@ -485,6 +515,7 @@ impl Service {
&self.config,
&self.ctx,
block_post_processor.clone(),
&self.prometheus,
);
try_info!(
@@ -652,6 +683,7 @@ pub fn chainhook_sidecar_mutate_blocks(
blocks_ids_to_rollback: &Vec<BlockIdentifier>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
brc20_cache: &mut Option<Brc20MemoryCache>,
prometheus: &PrometheusMonitoring,
config: &Config,
ctx: &Context,
) {
@@ -736,6 +768,7 @@ pub fn chainhook_sidecar_mutate_blocks(
&inscriptions_db_tx,
brc20_db_tx.as_ref(),
brc20_cache.as_mut(),
prometheus,
&ordhook_config,
&ctx,
);

View File

@@ -24,7 +24,7 @@ use crate::{
perform_query_set,
},
scan::bitcoin::process_block_with_predicates,
try_warn,
try_warn, utils::monitoring::PrometheusMonitoring,
};
pub fn update_observer_progress(
@@ -231,6 +231,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
provided_observers: Vec<BitcoinChainhookSpecification>,
chain_tip_height: u64,
enable_internal_trigger: bool,
prometheus: &PrometheusMonitoring,
config: &Config,
ctx: &Context,
) -> Result<(ChainhookConfig, Vec<BitcoinChainhookFullSpecification>), String> {
@@ -333,6 +334,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
let mut full_specs = vec![];
prometheus.metrics_set_registered_predicates(observers_to_catchup.len() as u64);
for (observer, report) in observers_to_catchup.into_iter() {
let mut networks = BTreeMap::new();
networks.insert(

View File

@@ -1,5 +1,6 @@
pub mod bitcoind;
pub mod logger;
pub mod monitoring;
use std::{
fs,

View File

@@ -0,0 +1,192 @@
use chainhook_sdk::utils::Context;
use hyper::{
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server,
};
use prometheus::{
core::{AtomicU64, GenericGauge},
Encoder, Registry, TextEncoder,
};
use crate::{try_debug, try_info, try_warn};
type UInt64Gauge = GenericGauge<AtomicU64>;
#[derive(Debug, Clone)]
pub struct PrometheusMonitoring {
pub last_indexed_block_height: UInt64Gauge,
pub last_indexed_inscription_number: UInt64Gauge,
pub registered_predicates: UInt64Gauge,
pub registry: Registry,
}
impl PrometheusMonitoring {
pub fn new() -> PrometheusMonitoring {
let registry = Registry::new();
let last_indexed_block_height = PrometheusMonitoring::create_and_register_uint64_gauge(
&registry,
"last_indexed_block_height",
"The latest Bitcoin block indexed for ordinals.",
);
let last_indexed_inscription_number =
PrometheusMonitoring::create_and_register_uint64_gauge(
&registry,
"last_indexed_inscription_number",
"The latest indexed inscription number.",
);
let registered_predicates = PrometheusMonitoring::create_and_register_uint64_gauge(
&registry,
"registered_predicates",
"The current number of predicates registered to receive ordinal events.",
);
PrometheusMonitoring {
last_indexed_block_height,
last_indexed_inscription_number,
registered_predicates,
registry,
}
}
pub fn create_and_register_uint64_gauge(
registry: &Registry,
name: &str,
help: &str,
) -> UInt64Gauge {
let g = UInt64Gauge::new(name, help).unwrap();
registry.register(Box::new(g.clone())).unwrap();
g
}
pub fn initialize(
&self,
total_predicates: u64,
max_inscription_number: u64,
block_height: u64,
) {
self.metrics_set_registered_predicates(total_predicates);
self.metrics_block_indexed(block_height);
self.metrics_inscription_indexed(max_inscription_number);
}
pub fn metrics_deregister_predicate(&self) {
self.registered_predicates.dec();
}
pub fn metrics_register_predicate(&self) {
self.registered_predicates.inc();
}
pub fn metrics_set_registered_predicates(&self, registered_predicates: u64) {
self.registered_predicates.set(registered_predicates);
}
pub fn metrics_inscription_indexed(&self, inscription_number: u64) {
let highest_appended = self.last_indexed_inscription_number.get();
if inscription_number > highest_appended {
self.last_indexed_inscription_number.set(inscription_number);
}
}
pub fn metrics_block_indexed(&self, block_height: u64) {
let highest_appended = self.last_indexed_block_height.get();
if block_height > highest_appended {
self.last_indexed_block_height.set(block_height);
}
}
}
async fn serve_req(
req: Request<Body>,
registry: Registry,
ctx: Context,
) -> Result<Response<Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
try_debug!(ctx, "Prometheus monitoring: responding to metrics request");
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut buffer = vec![];
let response = match encoder.encode(&metric_families, &mut buffer) {
Ok(_) => Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap(),
Err(e) => {
try_debug!(
ctx,
"Prometheus monitoring: failed to encode metrics: {}",
e.to_string()
);
Response::builder().status(500).body(Body::empty()).unwrap()
}
};
Ok(response)
}
(_, _) => {
try_debug!(
ctx,
"Prometheus monitoring: received request with invalid method/route: {}/{}",
req.method(),
req.uri().path()
);
let response = Response::builder().status(404).body(Body::empty()).unwrap();
Ok(response)
}
}
}
pub async fn start_serving_prometheus_metrics(port: u16, registry: Registry, ctx: Context) {
let addr = ([0, 0, 0, 0], port).into();
let ctx_clone = ctx.clone();
let make_svc = make_service_fn(|_| {
let registry = registry.clone();
let ctx_clone = ctx_clone.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |r| {
serve_req(r, registry.clone(), ctx_clone.clone())
}))
}
});
let serve_future = Server::bind(&addr).serve(make_svc);
try_info!(ctx, "Prometheus monitoring: listening on port {}", port);
if let Err(err) = serve_future.await {
try_warn!(ctx, "Prometheus monitoring: server error: {}", err);
}
}
#[cfg(test)]
mod test {
use crate::utils::monitoring::PrometheusMonitoring;
#[test]
fn it_tracks_predicate_registration_deregistration_with_defaults() {
let prometheus = PrometheusMonitoring::new();
assert_eq!(prometheus.registered_predicates.get(), 0);
prometheus.metrics_set_registered_predicates(10);
assert_eq!(prometheus.registered_predicates.get(), 10);
prometheus.metrics_register_predicate();
assert_eq!(prometheus.registered_predicates.get(), 11);
prometheus.metrics_deregister_predicate();
assert_eq!(prometheus.registered_predicates.get(), 10);
}
#[test]
fn it_tracks_block_ingestion() {
let prometheus = PrometheusMonitoring::new();
assert_eq!(prometheus.last_indexed_block_height.get(), 0);
prometheus.metrics_block_indexed(100);
assert_eq!(prometheus.last_indexed_block_height.get(), 100);
}
#[test]
fn it_tracks_inscription_indexing() {
let prometheus = PrometheusMonitoring::new();
assert_eq!(prometheus.last_indexed_inscription_number.get(), 0);
prometheus.metrics_inscription_indexed(5000);
assert_eq!(prometheus.last_indexed_inscription_number.get(), 5000);
}
}