diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index e79bdbc..7af8f73 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -147,9 +147,13 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( let mut last_block_scanned = BlockIdentifier::default(); let mut err_count = 0; for cursor in start_block..=end_block { - let block_data = match get_stacks_block_at_block_height(cursor, 3, stacks_db_conn) { + let block_data = match get_stacks_block_at_block_height(cursor, true, 3, stacks_db_conn) { Ok(Some(block)) => block, - Ok(None) => unimplemented!(), + Ok(None) => match get_stacks_block_at_block_height(cursor, false, 3, stacks_db_conn) { + Ok(Some(block)) => block, + Ok(None) => unimplemented!(), + Err(_) => unimplemented!(), + }, Err(_) => unimplemented!(), }; last_block_scanned = block_data.block_identifier.clone(); diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index ae410dd..5b4784f 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -1,14 +1,18 @@ mod http_api; +mod runloops; use crate::config::{Config, PredicatesApi}; -use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate; +use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate; use crate::scan::stacks::{ consolidate_local_stacks_chainstate_using_csv, scan_stacks_chainstate_via_rocksdb_using_predicate, }; use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server}; +use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop}; use crate::storage::{ - insert_entries_in_stacks_blocks, open_readonly_stacks_db_conn, open_readwrite_stacks_db_conn, + confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, + insert_unconfirmed_entry_in_stacks_blocks, open_readonly_stacks_db_conn, + open_readwrite_stacks_db_conn, }; use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification}; @@ -135,102 +139,33 @@ impl Service { // Stacks scan operation threadpool let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded(); - let stacks_scan_pool = - ThreadPool::new(self.config.limits.max_number_of_concurrent_stacks_scans); let ctx = self.ctx.clone(); let config = self.config.clone(); let observer_command_tx_moved = observer_command_tx.clone(); let _ = hiro_system_kit::thread_named("Stacks scan runloop") .spawn(move || { - while let Ok(mut predicate_spec) = stacks_scan_op_rx.recv() { - let moved_ctx = ctx.clone(); - let moved_config = config.clone(); - let observer_command_tx = observer_command_tx_moved.clone(); - stacks_scan_pool.execute(move || { - let stacks_db_conn = match open_readonly_stacks_db_conn( - &moved_config.expected_cache_path(), - &moved_ctx, - ) { - Ok(db_conn) => db_conn, - Err(e) => { - error!( - moved_ctx.expect_logger(), - "unable to store stacks block: {}", - e.to_string() - ); - unimplemented!() - } - }; - - let op = scan_stacks_chainstate_via_rocksdb_using_predicate( - &predicate_spec, - &stacks_db_conn, - &moved_config, - &moved_ctx, - ); - let last_block_scanned = match hiro_system_kit::nestable_block_on(op) { - Ok(last_block_scanned) => last_block_scanned, - Err(e) => { - error!( - moved_ctx.expect_logger(), - "Unable to evaluate predicate on Stacks chainstate: {e}", - ); - return; - } - }; - info!( - moved_ctx.expect_logger(), - "Stacks chainstate scan completed up to block: {}", - last_block_scanned.index - ); - predicate_spec.end_block = Some(last_block_scanned.index); - let _ = observer_command_tx.send(ObserverCommand::EnablePredicate( - ChainhookSpecification::Stacks(predicate_spec), - )); - }); - } - let res = stacks_scan_pool.join(); - res + start_stacks_scan_runloop( + &config, + stacks_scan_op_rx, + observer_command_tx_moved, + &ctx, + ); }) .expect("unable to spawn thread"); // Bitcoin scan operation threadpool let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded(); - let bitcoin_scan_pool = - ThreadPool::new(self.config.limits.max_number_of_concurrent_bitcoin_scans); let ctx = self.ctx.clone(); let config = self.config.clone(); - let moved_observer_command_tx = observer_command_tx.clone(); + let observer_command_tx_moved = observer_command_tx.clone(); let _ = hiro_system_kit::thread_named("Bitcoin scan runloop") .spawn(move || { - while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() { - let moved_ctx = ctx.clone(); - let moved_config = config.clone(); - let observer_command_tx = moved_observer_command_tx.clone(); - bitcoin_scan_pool.execute(move || { - let op = scan_bitcoin_chainstate_via_http_using_predicate( - &predicate_spec, - &moved_config, - &moved_ctx, - ); - - match hiro_system_kit::nestable_block_on(op) { - Ok(_) => {} - Err(e) => { - error!( - moved_ctx.expect_logger(), - "Unable to evaluate predicate on Bitcoin chainstate: {e}", - ); - return; - } - }; - let _ = observer_command_tx.send(ObserverCommand::EnablePredicate( - ChainhookSpecification::Bitcoin(predicate_spec), - )); - }); - } - let res = bitcoin_scan_pool.join(); - res + start_bitcoin_scan_runloop( + &config, + bitcoin_scan_op_rx, + observer_command_tx_moved, + &ctx, + ); }) .expect("unable to spawn thread"); @@ -358,18 +293,28 @@ impl Service { match &chain_event { StacksChainEvent::ChainUpdatedWithBlocks(data) => { stacks_event += 1; - insert_entries_in_stacks_blocks( + confirm_entries_in_stacks_blocks( &data.confirmed_blocks, &stacks_db_conn_rw, &self.ctx, ); + draft_entries_in_stacks_blocks( + &data.new_blocks, + &stacks_db_conn_rw, + &self.ctx, + ) } StacksChainEvent::ChainUpdatedWithReorg(data) => { - insert_entries_in_stacks_blocks( + confirm_entries_in_stacks_blocks( &data.confirmed_blocks, &stacks_db_conn_rw, &self.ctx, ); + draft_entries_in_stacks_blocks( + &data.blocks_to_apply, + &stacks_db_conn_rw, + &self.ctx, + ) } StacksChainEvent::ChainUpdatedWithMicroblocks(_) | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} diff --git a/components/chainhook-cli/src/service/runloops.rs b/components/chainhook-cli/src/service/runloops.rs new file mode 100644 index 0000000..5ff855a --- /dev/null +++ b/components/chainhook-cli/src/service/runloops.rs @@ -0,0 +1,112 @@ +use std::sync::mpsc::Sender; + +use chainhook_event_observer::{ + chainhooks::types::{ + BitcoinChainhookSpecification, ChainhookSpecification, StacksChainhookSpecification, + }, + observer::ObserverCommand, + utils::Context, +}; +use threadpool::ThreadPool; + +use crate::{ + config::{Config, PredicatesApiConfig}, + scan::{ + bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate, + stacks::scan_stacks_chainstate_via_rocksdb_using_predicate, + }, + storage::open_readonly_stacks_db_conn, +}; + +pub fn start_stacks_scan_runloop( + config: &Config, + stacks_scan_op_rx: crossbeam_channel::Receiver, + observer_command_tx: Sender, + ctx: &Context, +) { + let stacks_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_stacks_scans); + while let Ok(mut predicate_spec) = stacks_scan_op_rx.recv() { + let moved_ctx = ctx.clone(); + let moved_config = config.clone(); + let observer_command_tx = observer_command_tx.clone(); + stacks_scan_pool.execute(move || { + let stacks_db_conn = + match open_readonly_stacks_db_conn(&moved_config.expected_cache_path(), &moved_ctx) + { + Ok(db_conn) => db_conn, + Err(e) => { + error!( + moved_ctx.expect_logger(), + "unable to store stacks block: {}", + e.to_string() + ); + unimplemented!() + } + }; + + let op = scan_stacks_chainstate_via_rocksdb_using_predicate( + &predicate_spec, + &stacks_db_conn, + &moved_config, + &moved_ctx, + ); + let last_block_scanned = match hiro_system_kit::nestable_block_on(op) { + Ok(last_block_scanned) => last_block_scanned, + Err(e) => { + error!( + moved_ctx.expect_logger(), + "Unable to evaluate predicate on Stacks chainstate: {e}", + ); + return; + } + }; + info!( + moved_ctx.expect_logger(), + "Stacks chainstate scan completed up to block: {}", last_block_scanned.index + ); + predicate_spec.end_block = Some(last_block_scanned.index); + let _ = observer_command_tx.send(ObserverCommand::EnablePredicate( + ChainhookSpecification::Stacks(predicate_spec), + )); + }); + } + let res = stacks_scan_pool.join(); + res +} + +pub fn start_bitcoin_scan_runloop( + config: &Config, + bitcoin_scan_op_rx: crossbeam_channel::Receiver, + observer_command_tx: Sender, + ctx: &Context, +) { + let bitcoin_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_bitcoin_scans); + + while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() { + let moved_ctx = ctx.clone(); + let moved_config = config.clone(); + let observer_command_tx = observer_command_tx.clone(); + bitcoin_scan_pool.execute(move || { + let op = scan_bitcoin_chainstate_via_rpc_using_predicate( + &predicate_spec, + &moved_config, + &moved_ctx, + ); + + match hiro_system_kit::nestable_block_on(op) { + Ok(_) => {} + Err(e) => { + error!( + moved_ctx.expect_logger(), + "Unable to evaluate predicate on Bitcoin chainstate: {e}", + ); + return; + } + }; + let _ = observer_command_tx.send(ObserverCommand::EnablePredicate( + ChainhookSpecification::Bitcoin(predicate_spec), + )); + }); + } + let res = bitcoin_scan_pool.join(); +} diff --git a/components/chainhook-cli/src/storage/mod.rs b/components/chainhook-cli/src/storage/mod.rs index ec56b50..d3a3956 100644 --- a/components/chainhook-cli/src/storage/mod.rs +++ b/components/chainhook-cli/src/storage/mod.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use chainhook_event_observer::{rocksdb::Options, rocksdb::DB, utils::Context}; -use chainhook_types::{BlockIdentifier, StacksBlockData}; +use chainhook_types::{BlockIdentifier, StacksBlockData, StacksBlockUpdate}; fn get_db_default_options() -> Options { let mut opts = Options::default(); @@ -73,10 +73,22 @@ fn get_block_key(block_identifier: &BlockIdentifier) -> [u8; 12] { key } -fn get_last_insert_key() -> [u8; 3] { +fn get_unconfirmed_block_key(block_identifier: &BlockIdentifier) -> [u8; 12] { + let mut key = [0u8; 12]; + key[..2].copy_from_slice(b"~:"); + key[2..10].copy_from_slice(&block_identifier.index.to_be_bytes()); + key[10..].copy_from_slice(b":d"); + key +} + +fn get_last_confirmed_insert_key() -> [u8; 3] { *b"m:t" } +fn get_last_unconfirmed_insert_key() -> [u8; 3] { + *b"m:~" +} + pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, _ctx: &Context) { let key = get_block_key(&block.block_identifier); let block_bytes = json!(block); @@ -85,15 +97,42 @@ pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, .expect("unable to insert blocks"); stacks_db_rw .put( - get_last_insert_key(), + get_last_confirmed_insert_key(), block.block_identifier.index.to_be_bytes(), ) .expect("unable to insert metadata"); } -pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option { +pub fn insert_unconfirmed_entry_in_stacks_blocks( + block: &StacksBlockData, + stacks_db_rw: &DB, + _ctx: &Context, +) { + let key = get_unconfirmed_block_key(&block.block_identifier); + let block_bytes = json!(block); + stacks_db_rw + .put(&key, &block_bytes.to_string().as_bytes()) + .expect("unable to insert blocks"); + stacks_db_rw + .put( + get_last_unconfirmed_insert_key(), + block.block_identifier.index.to_be_bytes(), + ) + .expect("unable to insert metadata"); +} + +pub fn delete_unconfirmed_entry_from_stacks_blocks( + block_identifier: &BlockIdentifier, + stacks_db_rw: &DB, + _ctx: &Context, +) { + let key = get_unconfirmed_block_key(&block_identifier); + stacks_db_rw.delete(&key).expect("unable to delete blocks"); +} + +pub fn get_last_unconfirmed_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option { stacks_db - .get(get_last_insert_key()) + .get(get_last_unconfirmed_insert_key()) .unwrap_or(None) .and_then(|bytes| { Some(u64::from_be_bytes([ @@ -102,27 +141,55 @@ pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option< }) } -pub fn insert_entries_in_stacks_blocks( +pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option { + stacks_db + .get(get_last_confirmed_insert_key()) + .unwrap_or(None) + .and_then(|bytes| { + Some(u64::from_be_bytes([ + bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], + ])) + }) +} + +pub fn confirm_entries_in_stacks_blocks( blocks: &Vec, stacks_db_rw: &DB, ctx: &Context, ) { for block in blocks.iter() { insert_entry_in_stacks_blocks(block, stacks_db_rw, ctx); + delete_unconfirmed_entry_from_stacks_blocks(&block.block_identifier, stacks_db_rw, ctx); + } +} + +pub fn draft_entries_in_stacks_blocks( + block_updates: &Vec, + stacks_db_rw: &DB, + ctx: &Context, +) { + for update in block_updates.iter() { + // TODO: Could be imperfect, from a microblock point of view + insert_unconfirmed_entry_in_stacks_blocks(&update.block, stacks_db_rw, ctx); } } pub fn get_stacks_block_at_block_height( block_height: u64, + confirmed: bool, retry: u8, stacks_db: &DB, ) -> Result, String> { let mut attempt = 0; loop { - match stacks_db.get(get_block_key(&BlockIdentifier { + let block_identifier = &BlockIdentifier { hash: "".to_string(), index: block_height, - })) { + }; + match stacks_db.get(match confirmed { + true => get_block_key(block_identifier), + false => get_unconfirmed_block_key(block_identifier), + }) { Ok(Some(entry)) => { return Ok(Some({ let spec: StacksBlockData =