diff --git a/components/chainhook-cli/src/config/generator.rs b/components/chainhook-cli/src/config/generator.rs index 53c4cff..c1065b1 100644 --- a/components/chainhook-cli/src/config/generator.rs +++ b/components/chainhook-cli/src/config/generator.rs @@ -7,7 +7,7 @@ working_dir = "cache" # dynamically predicates. # Disable by default. # -# [http-api] +# [http_api] # http_port = 20456 # database_uri = "redis://localhost:6379/" diff --git a/components/chainhook-cli/src/config/mod.rs b/components/chainhook-cli/src/config/mod.rs index e24a58c..e323f63 100644 --- a/components/chainhook-cli/src/config/mod.rs +++ b/components/chainhook-cli/src/config/mod.rs @@ -263,8 +263,12 @@ impl Config { } pub fn expected_api_database_uri(&self) -> &str { + &self.expected_api_config().database_uri + } + + pub fn expected_api_config(&self) -> &PredicatesApiConfig { match self.http_api { - PredicatesApi::On(ref config) => config.database_uri.as_str(), + PredicatesApi::On(ref config) => config, _ => unreachable!(), } } diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index 740dde7..8008c4d 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -3,7 +3,11 @@ use std::collections::{HashMap, VecDeque}; use crate::{ archive::download_stacks_dataset_if_required, block::{Record, RecordKind}, - config::Config, + config::{Config, PredicatesApi, PredicatesApiConfig}, + service::{ + open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus, + ScanningData, + }, storage::{ get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted, get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present, @@ -11,7 +15,7 @@ use crate::{ }, }; use chainhook_event_observer::{ - chainhooks::stacks::evaluate_stacks_chainhook_on_blocks, + chainhooks::{stacks::evaluate_stacks_chainhook_on_blocks, types::ChainhookSpecification}, indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer}, rocksdb::DB, utils::Context, @@ -140,9 +144,15 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( }, }; - let proofs = HashMap::new(); + let mut predicates_db_conn = match config.http_api { + PredicatesApi::On(ref api_config) => { + Some(open_readwrite_predicates_db_conn_or_panic(api_config, ctx)) + } + PredicatesApi::Off => None, + }; - let mut actions_triggered = 0; + let proofs = HashMap::new(); + let mut occurrences_found = 0; let mut blocks_scanned = 0; info!( ctx.expect_logger(), @@ -182,7 +192,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( error!(ctx.expect_logger(), "unable to handle action {}", e); } Ok(action) => { - actions_triggered += 1; + occurrences_found += 1; let res = match action { StacksChainhookOccurrence::Http(request) => { send_request(request, 3, 1, &ctx).await @@ -202,6 +212,18 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( return Err(format!("Scan aborted (consecutive action errors >= 3)")); } + if let Some(ref mut predicates_db_conn) = predicates_db_conn { + if blocks_scanned % 5000 == 0 { + let status = PredicateStatus::Scanning(ScanningData { + start_block, + end_block, + cursor, + occurrences_found, + }); + update_predicate_status(&predicate_spec.key(), status, predicates_db_conn) + } + } + cursor += 1; // Update end_block, in case a new block was discovered during the scan if cursor == end_block { @@ -224,9 +246,17 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( } info!( ctx.expect_logger(), - "{blocks_scanned} blocks scanned, {actions_triggered} actions triggered" + "{blocks_scanned} blocks scanned, {occurrences_found} occurrences found" ); - + let status = PredicateStatus::Scanning(ScanningData { + start_block, + end_block, + cursor, + occurrences_found, + }); + if let Some(ref mut predicates_db_conn) = predicates_db_conn { + update_predicate_status(&predicate_spec.key(), status, predicates_db_conn) + } Ok(last_block_scanned) } @@ -253,7 +283,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( let proofs = HashMap::new(); - let mut actions_triggered = 0; + let mut occurrences_found = 0; let mut blocks_scanned = 0; info!( ctx.expect_logger(), @@ -294,7 +324,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( error!(ctx.expect_logger(), "unable to handle action {}", e); } Ok(action) => { - actions_triggered += 1; + occurrences_found += 1; let res = match action { StacksChainhookOccurrence::Http(request) => { send_request(request, 3, 1, &ctx).await @@ -316,7 +346,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( } info!( ctx.expect_logger(), - "{blocks_scanned} blocks scanned, {actions_triggered} actions triggered" + "{blocks_scanned} blocks scanned, {occurrences_found} occurrences found" ); Ok(last_block_scanned) diff --git a/components/chainhook-cli/src/service/http_api.rs b/components/chainhook-cli/src/service/http_api.rs index 08d01b7..1441c19 100644 --- a/components/chainhook-cli/src/service/http_api.rs +++ b/components/chainhook-cli/src/service/http_api.rs @@ -1,15 +1,16 @@ use std::{ + collections::HashMap, net::{IpAddr, Ipv4Addr}, sync::{mpsc::Sender, Arc, Mutex, RwLock}, }; use chainhook_event_observer::{ chainhooks::types::{ChainhookFullSpecification, ChainhookSpecification}, - observer::{ChainhookStore, ObserverCommand}, + observer::ObserverCommand, utils::Context, }; use hiro_system_kit::slog; -use redis::Commands; +use redis::{Commands, Connection}; use rocket::config::{self, Config, LogLevel}; use rocket::serde::json::{json, Json, Value as JsonValue}; use rocket::State; @@ -19,16 +20,14 @@ use std::error::Error; use crate::config::PredicatesApiConfig; +use super::{open_readwrite_predicates_db_conn_or_panic, PredicateStatus}; + pub async fn start_predicate_api_server( api_config: &PredicatesApiConfig, observer_commands_tx: Sender, ctx: Context, ) -> Result<(), Box> { - let log_level = if api_config.display_logs { - LogLevel::Critical - } else { - LogLevel::Off - }; + let log_level = LogLevel::Off; let mut shutdown_config = config::Shutdown::default(); shutdown_config.ctrlc = false; @@ -57,11 +56,16 @@ pub async fn start_predicate_api_server( ]; let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone())); + let redis_con_rw_lock = Arc::new(RwLock::new(open_readwrite_predicates_db_conn_or_panic( + api_config, &ctx, + ))); + let ctx_cloned = ctx.clone(); let ignite = rocket::custom(control_config) .manage(background_job_tx_mutex) .manage(ctx_cloned) + .manage(redis_con_rw_lock) .mount("/", routes) .ignite() .await?; @@ -85,7 +89,7 @@ fn handle_ping(ctx: &State) -> Json { #[openapi(tag = "Chainhooks")] #[get("/v1/chainhooks", format = "application/json")] fn handle_get_predicates( - chainhook_store: &State>>, + predicate_db: &State>>, ctx: &State, ) -> Json { ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks")); @@ -137,7 +141,7 @@ fn handle_get_predicates( #[post("/v1/chainhooks", format = "application/json", data = "")] fn handle_create_predicate( predicate: Json, - chainhook_store: &State>>, + predicate_db: &State>>, background_job_tx: &State>>>, ctx: &State, ) -> Json { @@ -150,15 +154,15 @@ fn handle_create_predicate( })); } - if let Ok(chainhook_store_reader) = chainhook_store.inner().read() { - if let Some(_) = chainhook_store_reader - .predicates - .get_spec_with_uuid(predicate.get_uuid()) - { - return Json(json!({ - "status": 409, - "error": "uuid already in use", - })); + if let Ok(mut predicates_db_conn) = predicate_db.inner().write() { + match get_entry_from_predicates_db(&predicate.get_uuid(), &mut predicates_db_conn, &ctx) { + Ok(Some(_)) => { + return Json(json!({ + "status": 409, + "error": "Predicate uuid already in use", + })) + } + _ => {} } } @@ -180,41 +184,31 @@ fn handle_create_predicate( #[get("/v1/chainhooks/", format = "application/json")] fn handle_get_predicate( predicate_uuid: String, - chainhook_store: &State>>, + predicate_db: &State>>, ctx: &State, ) -> Json { - ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks/")); - if let Ok(chainhook_store_reader) = chainhook_store.inner().read() { - let predicate = match chainhook_store_reader - .predicates - .get_spec_with_uuid(&predicate_uuid) - { - Some(ChainhookSpecification::Stacks(spec)) => { - json!({ - "chain": "stacks", - "uuid": spec.uuid, - "network": spec.network, - "predicate": spec.predicate, - }) - } - Some(ChainhookSpecification::Bitcoin(spec)) => { - json!({ - "chain": "bitcoin", - "uuid": spec.uuid, - "network": spec.network, - "predicate": spec.predicate, - }) - } - None => { - return Json(json!({ - "status": 404, - })) - } - }; - return Json(json!({ - "status": 200, - "result": predicate - })); + ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks/{}", predicate_uuid)); + + if let Ok(mut predicates_db_conn) = predicate_db.inner().write() { + match get_entry_from_predicates_db(&predicate_uuid, &mut predicates_db_conn, &ctx) { + Ok(Some((ChainhookSpecification::Stacks(spec), status))) => Json(json!({ + "chain": "stacks", + "uuid": spec.uuid, + "network": spec.network, + "predicate": spec.predicate, + "status": status + })), + Ok(Some((ChainhookSpecification::Bitcoin(spec), status))) => Json(json!({ + "chain": "bitcoin", + "uuid": spec.uuid, + "network": spec.network, + "predicate": spec.predicate, + "status": status + })), + _ => Json(json!({ + "status": 404, + })), + } } else { Json(json!({ "status": 500, @@ -230,7 +224,7 @@ fn handle_delete_stacks_predicate( background_job_tx: &State>>>, ctx: &State, ) -> Json { - ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/")); + ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/{}", predicate_uuid)); let background_job_tx = background_job_tx.inner(); match background_job_tx.lock() { @@ -253,7 +247,7 @@ fn handle_delete_bitcoin_predicate( background_job_tx: &State>>>, ctx: &State, ) -> Json { - ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/")); + ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/bitcoin/{}", predicate_uuid)); let background_job_tx = background_job_tx.inner(); match background_job_tx.lock() { @@ -269,26 +263,58 @@ fn handle_delete_bitcoin_predicate( })) } -pub fn load_predicates_from_redis( - config: &crate::config::Config, +pub fn get_entry_from_predicates_db( + uuid: &str, + predicate_db_conn: &mut Connection, + _ctx: &Context, +) -> Result, String> { + let entry: HashMap = predicate_db_conn + .hgetall(ChainhookSpecification::either_stx_or_btc_key(uuid)) + .map_err(|e| { + format!( + "unable to load chainhook associated with key {}: {}", + uuid, + e.to_string() + ) + })?; + + let encoded_spec = match entry.get("specification") { + None => return Ok(None), + Some(payload) => payload, + }; + + let spec = match ChainhookSpecification::deserialize_specification(&encoded_spec) { + Err(e) => unimplemented!(), + Ok(spec) => spec, + }; + + let encoded_status = match entry.get("status") { + None => unimplemented!(), + Some(payload) => payload, + }; + + let status = match serde_json::from_str(&encoded_status) { + Err(e) => unimplemented!(), + Ok(status) => status, + }; + + Ok(Some((spec, status))) +} + +pub fn get_entries_from_predicates_db( + predicate_db_conn: &mut Connection, ctx: &Context, ) -> Result, String> { - let redis_uri = config.expected_api_database_uri(); - let client = redis::Client::open(redis_uri.clone()) - .map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?; - let mut redis_con = client - .get_connection() - .map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?; - let chainhooks_to_load: Vec = redis_con - .scan_match("chainhook:*:*:*") + let chainhooks_to_load: Vec = predicate_db_conn + .scan_match(ChainhookSpecification::either_stx_or_btc_key("*")) .map_err(|e| format!("unable to connect to redis: {}", e.to_string()))? .into_iter() .collect(); let mut predicates = vec![]; for key in chainhooks_to_load.iter() { - let chainhook = match redis_con.hget::<_, _, String>(key, "specification") { - Ok(spec) => match ChainhookSpecification::deserialize_specification(&spec, key) { + let chainhook = match predicate_db_conn.hget::<_, _, String>(key, "specification") { + Ok(spec) => match ChainhookSpecification::deserialize_specification(&spec) { Ok(spec) => spec, Err(e) => { error!( @@ -314,3 +340,16 @@ pub fn load_predicates_from_redis( } Ok(predicates) } + +pub fn load_predicates_from_redis( + config: &crate::config::Config, + ctx: &Context, +) -> Result, String> { + let redis_uri: &str = config.expected_api_database_uri(); + let client = redis::Client::open(redis_uri.clone()) + .map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?; + let mut predicate_db_conn = client + .get_connection() + .map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?; + get_entries_from_predicates_db(&mut predicate_db_conn, ctx) +} diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index 5b4784f..d99f763 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -1,28 +1,21 @@ mod http_api; mod runloops; -use crate::config::{Config, PredicatesApi}; -use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate; -use crate::scan::stacks::{ - consolidate_local_stacks_chainstate_using_csv, - scan_stacks_chainstate_via_rocksdb_using_predicate, -}; +use crate::config::{Config, PredicatesApi, PredicatesApiConfig}; +use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv; use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server}; use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop}; use crate::storage::{ - confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, - insert_unconfirmed_entry_in_stacks_blocks, open_readonly_stacks_db_conn, - open_readwrite_stacks_db_conn, + confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, open_readwrite_stacks_db_conn, }; use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification}; use chainhook_event_observer::chainhooks::types::ChainhookSpecification; -use chainhook_event_observer::observer::{start_event_observer, ObserverCommand, ObserverEvent}; +use chainhook_event_observer::observer::{start_event_observer, ObserverEvent}; use chainhook_event_observer::utils::Context; use chainhook_types::{BitcoinBlockSignaling, StacksChainEvent}; use redis::{Commands, Connection}; -use threadpool::ThreadPool; use std::sync::mpsc::channel; @@ -198,6 +191,8 @@ impl Service { let ctx = self.ctx.clone(); let api_config = api_config.clone(); let moved_observer_command_tx = observer_command_tx.clone(); + // Test and initialize a database connection + let redis_con = open_readwrite_predicates_db_conn_or_panic(&api_config, &self.ctx); let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || { let future = @@ -205,16 +200,6 @@ impl Service { let _ = hiro_system_kit::nestable_block_on(future); }); - // Test and initialize a database connection - let redis_uri = self.config.expected_api_database_uri(); - let client = redis::Client::open(redis_uri.clone()).unwrap(); - let redis_con = match client.get_connection() { - Ok(con) => con, - Err(message) => { - error!(self.ctx.expect_logger(), "Redis: {}", message.to_string()); - panic!(); - } - }; Some(redis_con) } PredicatesApi::Off => None, @@ -245,8 +230,7 @@ impl Service { &chainhook_key, &[ ("specification", json!(chainhook).to_string()), - ("last_evaluation", json!(0).to_string()), - ("last_trigger", json!(0).to_string()), + ("status", json!(PredicateStatus::Disabled).to_string()), ], ); if let Err(e) = res { @@ -342,19 +326,62 @@ impl Service { } } +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum PredicateStatus { - InitialScan(u64, u64, u64), - Active(u64), + Scanning(ScanningData), + Streaming(StreamingData), Disabled, } -pub fn update_predicate(uuid: String, status: PredicateStatus, redis_con: &Connection) { - // let res: Result<(), redis::RedisError> = redis_con.hset_multiple( - // &chainhook_key, - // &[ - // ("specification", json!(chainhook).to_string()), - // ("last_evaluation", json!(0).to_string()), - // ("last_trigger", json!(0).to_string()), - // ], - // ); +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScanningData { + pub start_block: u64, + pub cursor: u64, + pub end_block: u64, + pub occurrences_found: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StreamingData { + pub last_occurence: u64, + pub last_evaluation: u64, +} + +pub fn update_predicate_status( + predicate_key: &str, + status: PredicateStatus, + predicates_db_conn: &mut Connection, +) { + let res: Result<(), redis::RedisError> = + predicates_db_conn.hset_multiple(&predicate_key, &[("status", json!(status).to_string())]); +} + +pub fn retrieve_predicate_status( + predicate_key: &str, + predicates_db_conn: &mut Connection, +) -> Option { + match predicates_db_conn.hget::<_, _, String>(predicate_key.to_string(), "status") { + Ok(ref payload) => match serde_json::from_str(payload) { + Ok(data) => Some(data), + Err(_) => None, + }, + Err(_) => None, + } +} + +pub fn open_readwrite_predicates_db_conn_or_panic( + config: &PredicatesApiConfig, + ctx: &Context, +) -> Connection { + // Test and initialize a database connection + let redis_uri = &config.database_uri; + let client = redis::Client::open(redis_uri.clone()).unwrap(); + let redis_con = match client.get_connection() { + Ok(con) => con, + Err(message) => { + error!(ctx.expect_logger(), "Redis: {}", message.to_string()); + panic!(); + } + }; + redis_con } diff --git a/components/chainhook-cli/src/service/runloops.rs b/components/chainhook-cli/src/service/runloops.rs index 5ff855a..384aee3 100644 --- a/components/chainhook-cli/src/service/runloops.rs +++ b/components/chainhook-cli/src/service/runloops.rs @@ -10,7 +10,7 @@ use chainhook_event_observer::{ use threadpool::ThreadPool; use crate::{ - config::{Config, PredicatesApiConfig}, + config::Config, scan::{ bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate, stacks::scan_stacks_chainstate_via_rocksdb_using_predicate, diff --git a/components/chainhook-event-observer/src/chainhooks/types.rs b/components/chainhook-event-observer/src/chainhooks/types.rs index 5bd08b8..a25a879 100644 --- a/components/chainhook-event-observer/src/chainhooks/types.rs +++ b/components/chainhook-event-observer/src/chainhooks/types.rs @@ -191,25 +191,26 @@ impl ChainhookSpecification { } } - pub fn stacks_key_prefix() -> &'static str { - "predicate:stx:" + pub fn either_stx_or_btc_key(uuid: &str) -> String { + format!("predicate:{}", uuid) } - pub fn bitcoin_key_prefix() -> &'static str { - "predicate:btc:" + pub fn stacks_key(uuid: &str) -> String { + format!("predicate:{}", uuid) + } + + pub fn bitcoin_key(uuid: &str) -> String { + format!("predicate:{}", uuid) } pub fn key(&self) -> String { match &self { - Self::Bitcoin(data) => format!("{}{}", Self::bitcoin_key_prefix(), data.uuid), - Self::Stacks(data) => format!("{}{}", Self::stacks_key_prefix(), data.uuid), + Self::Bitcoin(data) => Self::bitcoin_key(&data.uuid), + Self::Stacks(data) => Self::stacks_key(&data.uuid), } } - pub fn deserialize_specification( - spec: &str, - _key: &str, - ) -> Result { + pub fn deserialize_specification(spec: &str) -> Result { let spec: ChainhookSpecification = serde_json::from_str(spec) .map_err(|e| format!("unable to deserialize Stacks chainhook {}", e.to_string()))?; Ok(spec) @@ -714,6 +715,10 @@ pub struct StacksChainhookSpecification { } impl StacksChainhookSpecification { + pub fn key(&self) -> String { + ChainhookSpecification::stacks_key(&self.uuid) + } + pub fn is_predicate_targeting_block_header(&self) -> bool { match &self.predicate { StacksPredicate::BlockHeight(_)