mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-13 08:40:17 +08:00
feat: add prometheus monitoring (#356)
* feat: add prometheus monitoring * chore: progress * fix: create object * chore: port config * chore: set initial metrics on start * fix: monitor predicates * test: add
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2513,6 +2513,7 @@ dependencies = [
|
||||
"num_cpus",
|
||||
"pprof",
|
||||
"progressing",
|
||||
"prometheus",
|
||||
"rand",
|
||||
"regex",
|
||||
"reqwest",
|
||||
|
||||
@@ -37,6 +37,7 @@ use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
|
||||
use ordhook::service::observers::initialize_observers_db;
|
||||
use ordhook::service::{start_observer_forwarding, Service};
|
||||
use ordhook::utils::bitcoind::bitcoind_get_block_height;
|
||||
use ordhook::utils::monitoring::PrometheusMonitoring;
|
||||
use ordhook::{hex, initialize_databases, try_error, try_info, try_warn};
|
||||
use reqwest::Client as HttpClient;
|
||||
use std::collections::HashSet;
|
||||
@@ -883,8 +884,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
_ => None,
|
||||
};
|
||||
let blocks = cmd.get_blocks();
|
||||
let inscription_indexing_processor =
|
||||
start_inscription_indexing_processor(&config, ctx, block_post_processor);
|
||||
let inscription_indexing_processor = start_inscription_indexing_processor(
|
||||
&config,
|
||||
ctx,
|
||||
block_post_processor,
|
||||
&PrometheusMonitoring::new(),
|
||||
);
|
||||
|
||||
download_and_pipeline_blocks(
|
||||
&config,
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
use ordhook::chainhook_sdk::indexer::IndexerConfig;
|
||||
use ordhook::chainhook_sdk::observer::DEFAULT_INGESTION_PORT;
|
||||
use ordhook::chainhook_sdk::types::{
|
||||
BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig,
|
||||
};
|
||||
use ordhook::config::{
|
||||
Config, LogConfig, MetaProtocolsConfig, PredicatesApi, PredicatesApiConfig, ResourcesConfig,
|
||||
SnapshotConfig, SnapshotConfigDownloadUrls, StorageConfig, DEFAULT_BITCOIND_RPC_THREADS,
|
||||
DEFAULT_BITCOIND_RPC_TIMEOUT, DEFAULT_BRC20_LRU_CACHE_SIZE, DEFAULT_CONTROL_PORT,
|
||||
DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
|
||||
Config, IndexerConfig, LogConfig, MetaProtocolsConfig, PredicatesApi, PredicatesApiConfig,
|
||||
ResourcesConfig, SnapshotConfig, SnapshotConfigDownloadUrls, StorageConfig,
|
||||
DEFAULT_BITCOIND_RPC_THREADS, DEFAULT_BITCOIND_RPC_TIMEOUT, DEFAULT_BRC20_LRU_CACHE_SIZE,
|
||||
DEFAULT_CONTROL_PORT, DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
|
||||
};
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, Read};
|
||||
@@ -43,7 +42,7 @@ impl ConfigFile {
|
||||
}
|
||||
|
||||
pub fn from_config_file(config_file: ConfigFile) -> Result<Config, String> {
|
||||
let (stacks_network, bitcoin_network) = match config_file.network.mode.as_str() {
|
||||
let (_, bitcoin_network) = match config_file.network.mode.as_str() {
|
||||
"devnet" => (StacksNetwork::Devnet, BitcoinNetwork::Regtest),
|
||||
"testnet" => (StacksNetwork::Testnet, BitcoinNetwork::Testnet),
|
||||
"mainnet" => (StacksNetwork::Mainnet, BitcoinNetwork::Mainnet),
|
||||
@@ -115,14 +114,11 @@ impl ConfigFile {
|
||||
bitcoin_block_signaling: match config_file.network.bitcoind_zmq_url {
|
||||
Some(ref zmq_url) => BitcoinBlockSignaling::ZeroMQ(zmq_url.clone()),
|
||||
None => BitcoinBlockSignaling::Stacks(StacksNodeConfig::default_localhost(
|
||||
config_file
|
||||
.network
|
||||
.stacks_events_ingestion_port
|
||||
.unwrap_or(DEFAULT_INGESTION_PORT),
|
||||
DEFAULT_INGESTION_PORT,
|
||||
)),
|
||||
},
|
||||
stacks_network,
|
||||
bitcoin_network,
|
||||
prometheus_monitoring_port: config_file.network.prometheus_monitoring_port,
|
||||
},
|
||||
logs: LogConfig {
|
||||
ordinals_internals: config_file
|
||||
@@ -220,6 +216,5 @@ pub struct NetworkConfigFile {
|
||||
pub bitcoind_rpc_username: String,
|
||||
pub bitcoind_rpc_password: String,
|
||||
pub bitcoind_zmq_url: Option<String>,
|
||||
pub stacks_node_rpc_url: Option<String>,
|
||||
pub stacks_events_ingestion_port: Option<u16>,
|
||||
pub prometheus_monitoring_port: Option<u16>,
|
||||
}
|
||||
|
||||
@@ -46,6 +46,7 @@ hyper = { version = "=0.14.27" }
|
||||
lazy_static = { version = "1.4.0" }
|
||||
ciborium = "0.2.1"
|
||||
regex = "1.10.3"
|
||||
prometheus = "0.13.3"
|
||||
|
||||
[dev-dependencies]
|
||||
test-case = "3.1.0"
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use crate::core::OrdhookConfig;
|
||||
pub use chainhook_sdk::indexer::IndexerConfig;
|
||||
use chainhook_sdk::observer::EventObserverConfig;
|
||||
use chainhook_sdk::types::{
|
||||
BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig,
|
||||
@@ -81,6 +80,16 @@ pub struct UrlConfig {
|
||||
pub file_url: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IndexerConfig {
|
||||
pub bitcoin_network: BitcoinNetwork,
|
||||
pub bitcoind_rpc_url: String,
|
||||
pub bitcoind_rpc_username: String,
|
||||
pub bitcoind_rpc_password: String,
|
||||
pub bitcoin_block_signaling: BitcoinBlockSignaling,
|
||||
pub prometheus_monitoring_port: Option<u16>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct ResourcesConfig {
|
||||
pub ulimit: usize,
|
||||
@@ -136,7 +145,7 @@ impl Config {
|
||||
display_logs: false,
|
||||
cache_path: self.storage.working_dir.clone(),
|
||||
bitcoin_network: self.network.bitcoin_network.clone(),
|
||||
stacks_network: self.network.stacks_network.clone(),
|
||||
stacks_network: StacksNetwork::Devnet,
|
||||
prometheus_monitoring_port: None,
|
||||
data_handler_tx: None,
|
||||
}
|
||||
@@ -192,8 +201,8 @@ impl Config {
|
||||
bitcoin_block_signaling: BitcoinBlockSignaling::Stacks(
|
||||
StacksNodeConfig::default_localhost(DEFAULT_INGESTION_PORT),
|
||||
),
|
||||
stacks_network: StacksNetwork::Devnet,
|
||||
bitcoin_network: BitcoinNetwork::Regtest,
|
||||
prometheus_monitoring_port: None,
|
||||
},
|
||||
logs: LogConfig {
|
||||
ordinals_internals: true,
|
||||
@@ -227,8 +236,8 @@ impl Config {
|
||||
bitcoin_block_signaling: BitcoinBlockSignaling::Stacks(
|
||||
StacksNodeConfig::default_localhost(DEFAULT_INGESTION_PORT),
|
||||
),
|
||||
stacks_network: StacksNetwork::Testnet,
|
||||
bitcoin_network: BitcoinNetwork::Testnet,
|
||||
prometheus_monitoring_port: Some(9153),
|
||||
},
|
||||
logs: LogConfig {
|
||||
ordinals_internals: true,
|
||||
@@ -265,8 +274,8 @@ impl Config {
|
||||
bitcoin_block_signaling: BitcoinBlockSignaling::Stacks(
|
||||
StacksNodeConfig::default_localhost(DEFAULT_INGESTION_PORT),
|
||||
),
|
||||
stacks_network: StacksNetwork::Mainnet,
|
||||
bitcoin_network: BitcoinNetwork::Mainnet,
|
||||
prometheus_monitoring_port: Some(9153),
|
||||
},
|
||||
logs: LogConfig {
|
||||
ordinals_internals: true,
|
||||
|
||||
@@ -39,11 +39,12 @@ use crate::{
|
||||
OrdhookConfig,
|
||||
},
|
||||
db::{
|
||||
get_any_entry_in_ordinal_activities, open_ordhook_db_conn_rocks_db_loop,
|
||||
open_readonly_ordhook_db_conn,
|
||||
get_any_entry_in_ordinal_activities, get_latest_indexed_inscription_number,
|
||||
open_ordhook_db_conn_rocks_db_loop, open_readonly_ordhook_db_conn,
|
||||
},
|
||||
service::write_brc20_block_operations,
|
||||
try_error, try_info,
|
||||
utils::monitoring::PrometheusMonitoring,
|
||||
};
|
||||
|
||||
use crate::db::{TransactionBytesCursor, TraversalResult};
|
||||
@@ -61,12 +62,14 @@ pub fn start_inscription_indexing_processor(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
post_processor: Option<Sender<BitcoinBlockData>>,
|
||||
prometheus: &PrometheusMonitoring,
|
||||
) -> PostProcessorController {
|
||||
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
|
||||
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();
|
||||
|
||||
let config = config.clone();
|
||||
let ctx = ctx.clone();
|
||||
let prometheus = prometheus.clone();
|
||||
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Inscription indexing runloop")
|
||||
.spawn(move || {
|
||||
let cache_l2 = Arc::new(new_traversals_lazy_cache(2048));
|
||||
@@ -143,6 +146,7 @@ pub fn start_inscription_indexing_processor(
|
||||
&mut brc20_db_conn_rw,
|
||||
&ordhook_config,
|
||||
&post_processor,
|
||||
&prometheus,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
@@ -181,6 +185,7 @@ pub fn process_blocks(
|
||||
brc20_db_conn_rw: &mut Option<Connection>,
|
||||
ordhook_config: &OrdhookConfig,
|
||||
post_processor: &Option<Sender<BitcoinBlockData>>,
|
||||
prometheus: &PrometheusMonitoring,
|
||||
ctx: &Context,
|
||||
) -> Vec<BitcoinBlockData> {
|
||||
let mut cache_l1 = BTreeMap::new();
|
||||
@@ -216,6 +221,7 @@ pub fn process_blocks(
|
||||
&inscriptions_db_tx,
|
||||
brc20_db_tx.as_ref(),
|
||||
brc20_cache.as_mut(),
|
||||
prometheus,
|
||||
ordhook_config,
|
||||
ctx,
|
||||
);
|
||||
@@ -284,6 +290,7 @@ pub fn process_block(
|
||||
inscriptions_db_tx: &Transaction,
|
||||
brc20_db_tx: Option<&Transaction>,
|
||||
brc20_cache: Option<&mut Brc20MemoryCache>,
|
||||
prometheus: &PrometheusMonitoring,
|
||||
ordhook_config: &OrdhookConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
@@ -307,7 +314,7 @@ pub fn process_block(
|
||||
Context::empty()
|
||||
};
|
||||
|
||||
// Handle inscriptions
|
||||
// Inscriptions
|
||||
if any_processable_transactions {
|
||||
let _ = augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
|
||||
block,
|
||||
@@ -317,10 +324,9 @@ pub fn process_block(
|
||||
&inner_ctx,
|
||||
);
|
||||
}
|
||||
|
||||
// Handle transfers
|
||||
// Transfers
|
||||
let _ = augment_block_with_ordinals_transfer_data(block, inscriptions_db_tx, true, &inner_ctx);
|
||||
|
||||
// BRC-20
|
||||
match (brc20_db_tx, brc20_cache) {
|
||||
(Some(brc20_db_tx), Some(brc20_cache)) => write_brc20_block_operations(
|
||||
block,
|
||||
@@ -332,5 +338,11 @@ pub fn process_block(
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Monitoring
|
||||
prometheus.metrics_block_indexed(block.block_identifier.index);
|
||||
prometheus.metrics_inscription_indexed(
|
||||
get_latest_indexed_inscription_number(inscriptions_db_tx, &inner_ctx).unwrap_or(0),
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -835,6 +835,12 @@ pub fn find_latest_transfers_block_height(db_conn: &Connection, ctx: &Context) -
|
||||
entry
|
||||
}
|
||||
|
||||
pub fn get_latest_indexed_inscription_number(db_conn: &Connection, ctx: &Context) -> Option<u64> {
|
||||
let args: &[&dyn ToSql] = &[];
|
||||
let query = "SELECT MAX(jubilee_inscription_number) FROM inscriptions";
|
||||
perform_query_one(query, args, db_conn, ctx, |row| row.get(0).unwrap())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TransferData {
|
||||
pub inscription_offset_intra_output: u64,
|
||||
|
||||
@@ -22,12 +22,10 @@ use rocket::{
|
||||
use rocket::{response::status::Custom, State};
|
||||
|
||||
use crate::{
|
||||
config::PredicatesApi,
|
||||
service::observers::{
|
||||
config::PredicatesApi, service::observers::{
|
||||
insert_entry_in_observers, open_readwrite_observers_db_conn, remove_entry_from_observers,
|
||||
update_observer_progress, update_observer_streaming_enabled,
|
||||
},
|
||||
try_error, try_info,
|
||||
}, try_error, try_info, utils::monitoring::PrometheusMonitoring
|
||||
};
|
||||
|
||||
use super::observers::{
|
||||
@@ -39,6 +37,7 @@ pub async fn start_observers_http_server(
|
||||
observer_commands_tx: &std::sync::mpsc::Sender<ObserverCommand>,
|
||||
observer_event_rx: crossbeam_channel::Receiver<ObserverEvent>,
|
||||
bitcoin_scan_op_tx: crossbeam_channel::Sender<BitcoinChainhookSpecification>,
|
||||
prometheus: &PrometheusMonitoring,
|
||||
ctx: &Context,
|
||||
) -> Result<Shutdown, String> {
|
||||
// Build and start HTTP server.
|
||||
@@ -51,6 +50,7 @@ pub async fn start_observers_http_server(
|
||||
// Spawn predicate observer event tread.
|
||||
let moved_config = config.clone();
|
||||
let moved_ctx = ctx.clone();
|
||||
let moved_prometheus = prometheus.clone();
|
||||
let _ = hiro_system_kit::thread_named("observers_api-events").spawn(move || loop {
|
||||
let event = match observer_event_rx.recv() {
|
||||
Ok(cmd) => cmd,
|
||||
@@ -72,8 +72,10 @@ pub async fn start_observers_http_server(
|
||||
};
|
||||
let report = ObserverReport::default();
|
||||
insert_entry_in_observers(&spec, &report, &observers_db_conn, &moved_ctx);
|
||||
moved_prometheus.metrics_register_predicate();
|
||||
match spec {
|
||||
ChainhookSpecification::Bitcoin(predicate_spec) => {
|
||||
// TODO: This action blocks this thread until the scan operation is complete. We should not do this.
|
||||
let _ = bitcoin_scan_op_tx.send(predicate_spec);
|
||||
}
|
||||
_ => {}
|
||||
@@ -109,6 +111,7 @@ pub async fn start_observers_http_server(
|
||||
}
|
||||
};
|
||||
remove_entry_from_observers(&uuid, &observers_db_conn, &moved_ctx);
|
||||
moved_prometheus.metrics_deregister_predicate();
|
||||
}
|
||||
ObserverEvent::BitcoinPredicateTriggered(data) => {
|
||||
if let Some(ref tip) = data.apply.last() {
|
||||
@@ -426,7 +429,7 @@ mod test {
|
||||
|
||||
use crate::{
|
||||
config::{Config, PredicatesApi, PredicatesApiConfig},
|
||||
service::observers::{delete_observers_db, initialize_observers_db},
|
||||
service::observers::{delete_observers_db, initialize_observers_db}, utils::monitoring::PrometheusMonitoring,
|
||||
};
|
||||
|
||||
use super::start_observers_http_server;
|
||||
@@ -448,6 +451,7 @@ mod test {
|
||||
&observer_command_tx,
|
||||
observer_event_rx,
|
||||
bitcoin_scan_op_tx,
|
||||
&PrometheusMonitoring::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -23,7 +23,8 @@ use crate::core::protocol::inscription_parsing::{
|
||||
use crate::core::protocol::inscription_sequencing::SequenceCursor;
|
||||
use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db, should_sync_rocks_db};
|
||||
use crate::db::{
|
||||
delete_data_in_ordhook_db, find_latest_inscription_block_height, insert_entry_in_blocks,
|
||||
delete_data_in_ordhook_db, find_latest_inscription_block_height,
|
||||
get_latest_indexed_inscription_number, insert_entry_in_blocks,
|
||||
open_ordhook_db_conn_rocks_db_loop, open_readonly_ordhook_db_conn, open_readwrite_ordhook_dbs,
|
||||
update_ordinals_db_with_block, BlockBytesCursor, TransactionBytesCursor,
|
||||
};
|
||||
@@ -31,6 +32,7 @@ use crate::db::{find_missing_blocks, run_compaction, update_sequence_metadata_wi
|
||||
use crate::scan::bitcoin::process_block_with_predicates;
|
||||
use crate::service::observers::create_and_consolidate_chainhook_config_with_predicates;
|
||||
use crate::service::runloops::start_bitcoin_scan_runloop;
|
||||
use crate::utils::monitoring::{start_serving_prometheus_metrics, PrometheusMonitoring};
|
||||
use crate::{try_debug, try_error, try_info};
|
||||
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookOccurrencePayload;
|
||||
use chainhook_sdk::chainhooks::types::{
|
||||
@@ -59,13 +61,18 @@ use std::sync::mpsc::channel;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct Service {
|
||||
pub prometheus: PrometheusMonitoring,
|
||||
pub config: Config,
|
||||
pub ctx: Context,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn new(config: Config, ctx: Context) -> Self {
|
||||
Self { config, ctx }
|
||||
Self {
|
||||
prometheus: PrometheusMonitoring::new(),
|
||||
config,
|
||||
ctx,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(
|
||||
@@ -77,8 +84,29 @@ impl Service {
|
||||
check_blocks_integrity: bool,
|
||||
stream_indexing_to_observers: bool,
|
||||
) -> Result<(), String> {
|
||||
let mut event_observer_config = self.config.get_event_observer_config();
|
||||
// Start Prometheus monitoring server.
|
||||
if let Some(port) = self.config.network.prometheus_monitoring_port {
|
||||
let registry_moved = self.prometheus.registry.clone();
|
||||
let ctx_cloned = self.ctx.clone();
|
||||
let _ = std::thread::spawn(move || {
|
||||
let _ = hiro_system_kit::nestable_block_on(start_serving_prometheus_metrics(
|
||||
port,
|
||||
registry_moved,
|
||||
ctx_cloned,
|
||||
));
|
||||
});
|
||||
}
|
||||
let ordhook_db =
|
||||
open_readonly_ordhook_db_conn(&self.config.expected_cache_path(), &self.ctx)
|
||||
.expect("unable to retrieve ordhook db");
|
||||
self.prometheus.initialize(
|
||||
0,
|
||||
get_latest_indexed_inscription_number(&ordhook_db, &self.ctx).unwrap_or(0),
|
||||
find_latest_inscription_block_height(&ordhook_db, &self.ctx)?.unwrap_or(0),
|
||||
);
|
||||
|
||||
// Catch-up with chain tip.
|
||||
let mut event_observer_config = self.config.get_event_observer_config();
|
||||
let block_post_processor = if stream_indexing_to_observers && !observer_specs.is_empty() {
|
||||
let mut chainhook_config: ChainhookConfig = ChainhookConfig::new();
|
||||
let specs = observer_specs.clone();
|
||||
@@ -93,8 +121,6 @@ impl Service {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Catch-up with chain tip
|
||||
self.catch_up_with_chain_tip(false, check_blocks_integrity, block_post_processor)
|
||||
.await?;
|
||||
try_info!(
|
||||
@@ -116,18 +142,14 @@ impl Service {
|
||||
};
|
||||
|
||||
// Observers handling
|
||||
let ordhook_db =
|
||||
open_readonly_ordhook_db_conn(&self.config.expected_cache_path(), &self.ctx)
|
||||
.expect("unable to retrieve ordhook db");
|
||||
let chain_tip_height =
|
||||
find_latest_inscription_block_height(&ordhook_db, &self.ctx)?.unwrap();
|
||||
// 1) update event_observer_config with observers ready to be used
|
||||
// 2) catch-up outdated observers by dispatching replays
|
||||
let (chainhook_config, outdated_observers) =
|
||||
create_and_consolidate_chainhook_config_with_predicates(
|
||||
observer_specs,
|
||||
chain_tip_height,
|
||||
find_latest_inscription_block_height(&ordhook_db, &self.ctx)?.unwrap(),
|
||||
predicate_activity_relayer.is_some(),
|
||||
&self.prometheus,
|
||||
&self.config,
|
||||
&self.ctx,
|
||||
)?;
|
||||
@@ -160,6 +182,7 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: Deprecated? Only used by ordhook-sdk-js.
|
||||
pub async fn start_event_observer(
|
||||
&mut self,
|
||||
observer_sidecar: ObserverSidecar,
|
||||
@@ -175,6 +198,7 @@ impl Service {
|
||||
vec![],
|
||||
0,
|
||||
true,
|
||||
&self.prometheus,
|
||||
&self.config,
|
||||
&self.ctx,
|
||||
)?;
|
||||
@@ -273,12 +297,14 @@ impl Service {
|
||||
let moved_ctx = self.ctx.clone();
|
||||
let moved_observer_commands_tx = observer_command_tx.clone();
|
||||
let moved_observer_event_rx = observer_event_rx.clone();
|
||||
let moved_prometheus = self.prometheus.clone();
|
||||
let _ = hiro_system_kit::thread_named("HTTP Observers API").spawn(move || {
|
||||
let _ = hiro_system_kit::nestable_block_on(start_observers_http_server(
|
||||
&moved_config,
|
||||
&moved_observer_commands_tx,
|
||||
moved_observer_event_rx,
|
||||
bitcoin_scan_op_tx,
|
||||
&moved_prometheus,
|
||||
&moved_ctx,
|
||||
));
|
||||
});
|
||||
@@ -305,6 +331,7 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: Deprecated? Only used by ordhook-sdk-js.
|
||||
pub fn set_up_observer_config(
|
||||
&self,
|
||||
predicates: Vec<BitcoinChainhookSpecification>,
|
||||
@@ -321,6 +348,7 @@ impl Service {
|
||||
predicates,
|
||||
0,
|
||||
enable_internal_trigger,
|
||||
&self.prometheus,
|
||||
&self.config,
|
||||
&self.ctx,
|
||||
)?;
|
||||
@@ -347,6 +375,7 @@ impl Service {
|
||||
let mut brc20_cache = brc20_new_cache(&self.config);
|
||||
let ctx = self.ctx.clone();
|
||||
let config = self.config.clone();
|
||||
let prometheus = self.prometheus.clone();
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Observer Sidecar Runloop").spawn(move || loop {
|
||||
select! {
|
||||
@@ -357,6 +386,7 @@ impl Service {
|
||||
&blocks_ids_to_rollback,
|
||||
&cache_l2,
|
||||
&mut brc20_cache,
|
||||
&prometheus,
|
||||
&config,
|
||||
&ctx,
|
||||
);
|
||||
@@ -485,6 +515,7 @@ impl Service {
|
||||
&self.config,
|
||||
&self.ctx,
|
||||
block_post_processor.clone(),
|
||||
&self.prometheus,
|
||||
);
|
||||
|
||||
try_info!(
|
||||
@@ -652,6 +683,7 @@ pub fn chainhook_sidecar_mutate_blocks(
|
||||
blocks_ids_to_rollback: &Vec<BlockIdentifier>,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
|
||||
brc20_cache: &mut Option<Brc20MemoryCache>,
|
||||
prometheus: &PrometheusMonitoring,
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
) {
|
||||
@@ -736,6 +768,7 @@ pub fn chainhook_sidecar_mutate_blocks(
|
||||
&inscriptions_db_tx,
|
||||
brc20_db_tx.as_ref(),
|
||||
brc20_cache.as_mut(),
|
||||
prometheus,
|
||||
&ordhook_config,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::{
|
||||
perform_query_set,
|
||||
},
|
||||
scan::bitcoin::process_block_with_predicates,
|
||||
try_warn,
|
||||
try_warn, utils::monitoring::PrometheusMonitoring,
|
||||
};
|
||||
|
||||
pub fn update_observer_progress(
|
||||
@@ -231,6 +231,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
|
||||
provided_observers: Vec<BitcoinChainhookSpecification>,
|
||||
chain_tip_height: u64,
|
||||
enable_internal_trigger: bool,
|
||||
prometheus: &PrometheusMonitoring,
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
) -> Result<(ChainhookConfig, Vec<BitcoinChainhookFullSpecification>), String> {
|
||||
@@ -333,6 +334,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
|
||||
|
||||
let mut full_specs = vec![];
|
||||
|
||||
prometheus.metrics_set_registered_predicates(observers_to_catchup.len() as u64);
|
||||
for (observer, report) in observers_to_catchup.into_iter() {
|
||||
let mut networks = BTreeMap::new();
|
||||
networks.insert(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod bitcoind;
|
||||
pub mod logger;
|
||||
pub mod monitoring;
|
||||
|
||||
use std::{
|
||||
fs,
|
||||
|
||||
192
components/ordhook-core/src/utils/monitoring.rs
Normal file
192
components/ordhook-core/src/utils/monitoring.rs
Normal file
@@ -0,0 +1,192 @@
|
||||
use chainhook_sdk::utils::Context;
|
||||
use hyper::{
|
||||
header::CONTENT_TYPE,
|
||||
service::{make_service_fn, service_fn},
|
||||
Body, Method, Request, Response, Server,
|
||||
};
|
||||
use prometheus::{
|
||||
core::{AtomicU64, GenericGauge},
|
||||
Encoder, Registry, TextEncoder,
|
||||
};
|
||||
|
||||
use crate::{try_debug, try_info, try_warn};
|
||||
|
||||
type UInt64Gauge = GenericGauge<AtomicU64>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PrometheusMonitoring {
|
||||
pub last_indexed_block_height: UInt64Gauge,
|
||||
pub last_indexed_inscription_number: UInt64Gauge,
|
||||
pub registered_predicates: UInt64Gauge,
|
||||
pub registry: Registry,
|
||||
}
|
||||
|
||||
impl PrometheusMonitoring {
|
||||
pub fn new() -> PrometheusMonitoring {
|
||||
let registry = Registry::new();
|
||||
let last_indexed_block_height = PrometheusMonitoring::create_and_register_uint64_gauge(
|
||||
®istry,
|
||||
"last_indexed_block_height",
|
||||
"The latest Bitcoin block indexed for ordinals.",
|
||||
);
|
||||
let last_indexed_inscription_number =
|
||||
PrometheusMonitoring::create_and_register_uint64_gauge(
|
||||
®istry,
|
||||
"last_indexed_inscription_number",
|
||||
"The latest indexed inscription number.",
|
||||
);
|
||||
let registered_predicates = PrometheusMonitoring::create_and_register_uint64_gauge(
|
||||
®istry,
|
||||
"registered_predicates",
|
||||
"The current number of predicates registered to receive ordinal events.",
|
||||
);
|
||||
PrometheusMonitoring {
|
||||
last_indexed_block_height,
|
||||
last_indexed_inscription_number,
|
||||
registered_predicates,
|
||||
registry,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_and_register_uint64_gauge(
|
||||
registry: &Registry,
|
||||
name: &str,
|
||||
help: &str,
|
||||
) -> UInt64Gauge {
|
||||
let g = UInt64Gauge::new(name, help).unwrap();
|
||||
registry.register(Box::new(g.clone())).unwrap();
|
||||
g
|
||||
}
|
||||
|
||||
pub fn initialize(
|
||||
&self,
|
||||
total_predicates: u64,
|
||||
max_inscription_number: u64,
|
||||
block_height: u64,
|
||||
) {
|
||||
self.metrics_set_registered_predicates(total_predicates);
|
||||
self.metrics_block_indexed(block_height);
|
||||
self.metrics_inscription_indexed(max_inscription_number);
|
||||
}
|
||||
|
||||
pub fn metrics_deregister_predicate(&self) {
|
||||
self.registered_predicates.dec();
|
||||
}
|
||||
|
||||
pub fn metrics_register_predicate(&self) {
|
||||
self.registered_predicates.inc();
|
||||
}
|
||||
|
||||
pub fn metrics_set_registered_predicates(&self, registered_predicates: u64) {
|
||||
self.registered_predicates.set(registered_predicates);
|
||||
}
|
||||
|
||||
pub fn metrics_inscription_indexed(&self, inscription_number: u64) {
|
||||
let highest_appended = self.last_indexed_inscription_number.get();
|
||||
if inscription_number > highest_appended {
|
||||
self.last_indexed_inscription_number.set(inscription_number);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn metrics_block_indexed(&self, block_height: u64) {
|
||||
let highest_appended = self.last_indexed_block_height.get();
|
||||
if block_height > highest_appended {
|
||||
self.last_indexed_block_height.set(block_height);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn serve_req(
|
||||
req: Request<Body>,
|
||||
registry: Registry,
|
||||
ctx: Context,
|
||||
) -> Result<Response<Body>, hyper::Error> {
|
||||
match (req.method(), req.uri().path()) {
|
||||
(&Method::GET, "/metrics") => {
|
||||
try_debug!(ctx, "Prometheus monitoring: responding to metrics request");
|
||||
|
||||
let encoder = TextEncoder::new();
|
||||
let metric_families = registry.gather();
|
||||
let mut buffer = vec![];
|
||||
let response = match encoder.encode(&metric_families, &mut buffer) {
|
||||
Ok(_) => Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(Body::from(buffer))
|
||||
.unwrap(),
|
||||
Err(e) => {
|
||||
try_debug!(
|
||||
ctx,
|
||||
"Prometheus monitoring: failed to encode metrics: {}",
|
||||
e.to_string()
|
||||
);
|
||||
Response::builder().status(500).body(Body::empty()).unwrap()
|
||||
}
|
||||
};
|
||||
Ok(response)
|
||||
}
|
||||
(_, _) => {
|
||||
try_debug!(
|
||||
ctx,
|
||||
"Prometheus monitoring: received request with invalid method/route: {}/{}",
|
||||
req.method(),
|
||||
req.uri().path()
|
||||
);
|
||||
let response = Response::builder().status(404).body(Body::empty()).unwrap();
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_serving_prometheus_metrics(port: u16, registry: Registry, ctx: Context) {
|
||||
let addr = ([0, 0, 0, 0], port).into();
|
||||
let ctx_clone = ctx.clone();
|
||||
let make_svc = make_service_fn(|_| {
|
||||
let registry = registry.clone();
|
||||
let ctx_clone = ctx_clone.clone();
|
||||
async move {
|
||||
Ok::<_, hyper::Error>(service_fn(move |r| {
|
||||
serve_req(r, registry.clone(), ctx_clone.clone())
|
||||
}))
|
||||
}
|
||||
});
|
||||
let serve_future = Server::bind(&addr).serve(make_svc);
|
||||
try_info!(ctx, "Prometheus monitoring: listening on port {}", port);
|
||||
if let Err(err) = serve_future.await {
|
||||
try_warn!(ctx, "Prometheus monitoring: server error: {}", err);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::utils::monitoring::PrometheusMonitoring;
|
||||
|
||||
#[test]
|
||||
fn it_tracks_predicate_registration_deregistration_with_defaults() {
|
||||
let prometheus = PrometheusMonitoring::new();
|
||||
assert_eq!(prometheus.registered_predicates.get(), 0);
|
||||
prometheus.metrics_set_registered_predicates(10);
|
||||
assert_eq!(prometheus.registered_predicates.get(), 10);
|
||||
prometheus.metrics_register_predicate();
|
||||
assert_eq!(prometheus.registered_predicates.get(), 11);
|
||||
prometheus.metrics_deregister_predicate();
|
||||
assert_eq!(prometheus.registered_predicates.get(), 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn it_tracks_block_ingestion() {
|
||||
let prometheus = PrometheusMonitoring::new();
|
||||
assert_eq!(prometheus.last_indexed_block_height.get(), 0);
|
||||
prometheus.metrics_block_indexed(100);
|
||||
assert_eq!(prometheus.last_indexed_block_height.get(), 100);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn it_tracks_inscription_indexing() {
|
||||
let prometheus = PrometheusMonitoring::new();
|
||||
assert_eq!(prometheus.last_indexed_inscription_number.get(), 0);
|
||||
prometheus.metrics_inscription_indexed(5000);
|
||||
assert_eq!(prometheus.last_indexed_inscription_number.get(), 5000);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user