From df3661721496654bc2d288debe1df88e0a5da060 Mon Sep 17 00:00:00 2001 From: Ludo Galabru Date: Fri, 24 Mar 2023 16:54:14 -0400 Subject: [PATCH] feat: plug inscription processing in ibd --- components/chainhook-cli/src/cli/mod.rs | 8 +- components/chainhook-cli/src/node/mod.rs | 2 +- components/chainhook-cli/src/scan/bitcoin.rs | 10 +- .../src/hord/db/mod.rs | 105 +++++++++++++++++- .../chainhook-event-observer/src/hord/mod.rs | 16 ++- .../src/indexer/bitcoin/mod.rs | 16 +-- .../src/observer/mod.rs | 13 ++- 7 files changed, 135 insertions(+), 35 deletions(-) diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index 2d2a6de..dc5bf98 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -7,8 +7,8 @@ use crate::scan::stacks::scan_stacks_chain_with_predicate; use chainhook_event_observer::chainhooks::types::ChainhookFullSpecification; use chainhook_event_observer::hord::db::{ - build_bitcoin_traversal_local_storage, find_inscriptions_at_wached_outpoint, - initialize_hord_db, open_readonly_hord_db_conn, retrieve_satoshi_point_using_local_storage, + fetch_and_cache_blocks_in_hord_db, find_inscriptions_at_wached_outpoint, initialize_hord_db, + open_readonly_hord_db_conn, retrieve_satoshi_point_using_local_storage, }; use chainhook_event_observer::observer::BitcoinConfig; use chainhook_event_observer::utils::Context; @@ -381,17 +381,19 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { username: config.network.bitcoin_node_rpc_username.clone(), password: config.network.bitcoin_node_rpc_password.clone(), rpc_url: config.network.bitcoin_node_rpc_url.clone(), + network: config.network.bitcoin_network.clone(), }; let hord_db_conn = initialize_hord_db(&config.expected_cache_path(), &ctx); - let _ = build_bitcoin_traversal_local_storage( + let _ = fetch_and_cache_blocks_in_hord_db( &bitcoin_config, &hord_db_conn, cmd.start_block, cmd.end_block, &ctx, cmd.network_threads, + None, ) .await; } diff --git a/components/chainhook-cli/src/node/mod.rs b/components/chainhook-cli/src/node/mod.rs index 8c861e6..b1454b6 100644 --- a/components/chainhook-cli/src/node/mod.rs +++ b/components/chainhook-cli/src/node/mod.rs @@ -465,8 +465,8 @@ impl Node { .map_err(|e| format!("unable to parse response ({})", e))?; let block = indexer::bitcoin::standardize_bitcoin_block( - &event_observer_config, raw_block, + &event_observer_config.bitcoin_network, &self.ctx, )?; diff --git a/components/chainhook-cli/src/scan/bitcoin.rs b/components/chainhook-cli/src/scan/bitcoin.rs index 4cbed4a..b1803ca 100644 --- a/components/chainhook-cli/src/scan/bitcoin.rs +++ b/components/chainhook-cli/src/scan/bitcoin.rs @@ -9,9 +9,8 @@ use chainhook_event_observer::chainhooks::types::{ BitcoinChainhookFullSpecification, BitcoinPredicateType, Protocols, }; use chainhook_event_observer::hord::db::{ - build_bitcoin_traversal_local_storage, find_all_inscriptions, - find_compacted_block_at_block_height, find_latest_compacted_block_known, - open_readonly_hord_db_conn, open_readwrite_hord_db_conn, + fetch_and_cache_blocks_in_hord_db, find_all_inscriptions, find_compacted_block_at_block_height, + find_latest_compacted_block_known, open_readonly_hord_db_conn, open_readwrite_hord_db_conn, }; use chainhook_event_observer::indexer; use chainhook_event_observer::indexer::bitcoin::{ @@ -115,13 +114,14 @@ pub async fn scan_bitcoin_chain_with_predicate( "Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks", (end_block - start_block) ); - build_bitcoin_traversal_local_storage( + fetch_and_cache_blocks_in_hord_db( &config.get_event_observer_config().get_bitcoin_config(), &rw_hord_db_conn, start_block, end_block, &ctx, 8, + None, ) .await?; } @@ -163,8 +163,8 @@ pub async fn scan_bitcoin_chain_with_predicate( let block_breakdown = retrieve_full_block_breakdown_with_retry(&block_hash, &bitcoin_config, ctx).await?; let block = indexer::bitcoin::standardize_bitcoin_block( - &event_observer_config, block_breakdown, + &event_observer_config.bitcoin_network, ctx, )?; diff --git a/components/chainhook-event-observer/src/hord/db/mod.rs b/components/chainhook-event-observer/src/hord/db/mod.rs index 2acd7d2..928d9fb 100644 --- a/components/chainhook-event-observer/src/hord/db/mod.rs +++ b/components/chainhook-event-observer/src/hord/db/mod.rs @@ -1,4 +1,8 @@ -use std::path::PathBuf; +use std::{ + collections::HashMap, + path::PathBuf, + sync::mpsc::{channel, Sender}, +}; use chainhook_types::{ BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier, @@ -11,13 +15,13 @@ use threadpool::ThreadPool; use crate::{ indexer::bitcoin::{ retrieve_block_hash_with_retry, retrieve_full_block_breakdown_with_retry, - BitcoinBlockFullBreakdown, + standardize_bitcoin_block, BitcoinBlockFullBreakdown, }, observer::BitcoinConfig, utils::Context, }; -use super::ord::height::Height; +use super::{ord::height::Height, update_hord_db_and_augment_bitcoin_block}; fn get_default_hord_db_file_path(base_dir: &PathBuf) -> PathBuf { let mut destination_path = base_dir.clone(); @@ -414,13 +418,98 @@ pub fn remove_entry_from_inscriptions( } } -pub async fn build_bitcoin_traversal_local_storage( +pub async fn update_hord_db( + bitcoin_config: &BitcoinConfig, + hord_db_path: &PathBuf, + hord_db_conn: &Connection, + start_block: u64, + end_block: u64, + _ctx: &Context, + network_thread: usize, +) -> Result<(), String> { + let (block_tx, block_rx) = channel::(); + let first_inscription_block_height = 767430; + let ctx = _ctx.clone(); + let network = bitcoin_config.network.clone(); + let hord_db_path = hord_db_path.clone(); + let handle = hiro_system_kit::thread_named("Inscriptions indexing") + .spawn(move || { + let mut cursor = first_inscription_block_height; + let mut inbox = HashMap::new(); + + while let Ok(raw_block) = block_rx.recv() { + // Early return, only considering blocks after 1st inscription + if raw_block.height < first_inscription_block_height { + continue; + } + let block_height = raw_block.height; + inbox.insert(raw_block.height, raw_block); + + // In the context of ordinals, we're constrained to process blocks sequentially + // Blocks are processed by a threadpool and could be coming out of order. + // Inbox block for later if the current block is not the one we should be + // processing. + if block_height != cursor { + continue; + } + + // Is the action of processing a block allows us + // to process more blocks present in the inbox? + while let Some(next_block) = inbox.remove(&cursor) { + let mut new_block = match standardize_bitcoin_block(next_block, &network, &ctx) + { + Ok(block) => block, + Err(e) => { + ctx.try_log(|logger| { + slog::error!(logger, "Unable to standardize bitcoin block: {e}",) + }); + return; + } + }; + + if let Err(e) = update_hord_db_and_augment_bitcoin_block( + &mut new_block, + &hord_db_path, + &ctx, + ) { + ctx.try_log(|logger| { + slog::error!( + logger, + "Unable to augment bitcoin block with hord_db: {e}", + ) + }); + return; + } + cursor += 1; + } + } + }) + .expect("unable to detach thread"); + + fetch_and_cache_blocks_in_hord_db( + bitcoin_config, + hord_db_conn, + start_block, + end_block, + &_ctx, + network_thread, + Some(block_tx), + ) + .await?; + + let _ = handle.join(); + + Ok(()) +} + +pub async fn fetch_and_cache_blocks_in_hord_db( bitcoin_config: &BitcoinConfig, hord_db_conn: &Connection, start_block: u64, end_block: u64, ctx: &Context, network_thread: usize, + block_tx: Option>, ) -> Result<(), String> { let retrieve_block_hash_pool = ThreadPool::new(network_thread); let (block_hash_tx, block_hash_rx) = crossbeam_channel::unbounded(); @@ -471,10 +560,14 @@ pub async fn build_bitcoin_traversal_local_storage( .spawn(move || { while let Ok(Some(block_data)) = block_data_rx.recv() { let block_compressed_tx_moved = block_compressed_tx.clone(); + let block_tx = block_tx.clone(); compress_block_data_pool.execute(move || { let compressed_block = CompactedBlock::from_full_block(&block_data); - let _ = block_compressed_tx_moved - .send(Some((block_data.height as u32, compressed_block))); + let block_index = block_data.height as u32; + if let Some(block_tx) = block_tx { + let _ = block_tx.send(block_data); + } + let _ = block_compressed_tx_moved.send(Some((block_index, compressed_block))); }); let res = compress_block_data_pool.join(); diff --git a/components/chainhook-event-observer/src/hord/mod.rs b/components/chainhook-event-observer/src/hord/mod.rs index 10fdbac..7d29ce8 100644 --- a/components/chainhook-event-observer/src/hord/mod.rs +++ b/components/chainhook-event-observer/src/hord/mod.rs @@ -7,6 +7,7 @@ use bitcoincore_rpc::bitcoin::{Address, Network, Script}; use chainhook_types::{BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation}; use hiro_system_kit::slog; use std::collections::VecDeque; +use std::path::PathBuf; use crate::{ hord::{ @@ -64,7 +65,7 @@ pub fn revert_hord_db_with_augmented_bitcoin_block( pub fn update_hord_db_and_augment_bitcoin_block( new_block: &mut BitcoinBlockData, - config: &EventObserverConfig, + hord_db_path: &PathBuf, ctx: &Context, ) -> Result<(), String> { { @@ -77,7 +78,7 @@ pub fn update_hord_db_and_augment_bitcoin_block( }); let compacted_block = CompactedBlock::from_standardized_block(&new_block); - let rw_hord_db_conn = open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx)?; + let rw_hord_db_conn = open_readwrite_hord_db_conn(&hord_db_path, &ctx)?; insert_entry_in_blocks( new_block.block_identifier.index as u32, &compacted_block, @@ -100,8 +101,7 @@ pub fn update_hord_db_and_augment_bitcoin_block( new_tx.metadata.ordinal_operations.iter_mut().enumerate() { if let OrdinalOperation::InscriptionRevealed(inscription) = ordinal_event { - let hord_db_conn = - open_readonly_hord_db_conn(&config.get_cache_path_buf(), &ctx).unwrap(); // TODO(lgalabru) + let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &ctx).unwrap(); // TODO(lgalabru) let (ordinal_block_height, ordinal_offset, ordinal_number) = { // Are we looking at a re-inscription? @@ -170,8 +170,7 @@ pub fn update_hord_db_and_augment_bitcoin_block( { let rw_hord_db_conn = - open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx) - .unwrap(); // TODO(lgalabru) + open_readwrite_hord_db_conn(&hord_db_path, &ctx).unwrap(); // TODO(lgalabru) store_new_inscription( &inscription, &new_block.block_identifier, @@ -186,7 +185,7 @@ pub fn update_hord_db_and_augment_bitcoin_block( // Have inscriptions been transfered? let mut sats_in_offset = 0; let mut sats_out_offset = 0; - let hord_db_conn = open_readonly_hord_db_conn(&config.get_cache_path_buf(), &ctx).unwrap(); // TODO(lgalabru) + let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &ctx)?; for input in new_tx.metadata.inputs.iter() { // input.previous_output.txid @@ -291,8 +290,7 @@ pub fn update_hord_db_and_augment_bitcoin_block( // Update watched outpoint { - let rw_hord_db_conn = - open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx).unwrap(); // TODO(lgalabru) + let rw_hord_db_conn = open_readwrite_hord_db_conn(&hord_db_path, &ctx)?; update_transfered_inscription( &inscription_id, &outpoint_post_transfer, diff --git a/components/chainhook-event-observer/src/indexer/bitcoin/mod.rs b/components/chainhook-event-observer/src/indexer/bitcoin/mod.rs index ae20979..a8afee2 100644 --- a/components/chainhook-event-observer/src/indexer/bitcoin/mod.rs +++ b/components/chainhook-event-observer/src/indexer/bitcoin/mod.rs @@ -6,7 +6,7 @@ use crate::chainhooks::types::{ get_canonical_pox_config, get_stacks_canonical_magic_bytes, PoxConfig, StacksOpcodes, }; -use crate::observer::{BitcoinConfig, EventObserverConfig}; +use crate::observer::BitcoinConfig; use crate::utils::Context; use bitcoincore_rpc::bitcoin::hashes::hex::FromHex; use bitcoincore_rpc::bitcoin::hashes::Hash; @@ -17,10 +17,10 @@ use bitcoincore_rpc_json::{ pub use blocks_pool::BitcoinBlockPool; use chainhook_types::bitcoin::{OutPoint, TxIn, TxOut}; use chainhook_types::{ - BitcoinBlockData, BitcoinBlockMetadata, BitcoinTransactionData, BitcoinTransactionMetadata, - BlockCommitmentData, BlockHeader, BlockIdentifier, KeyRegistrationData, LockSTXData, - OrdinalInscriptionRevealData, OrdinalOperation, PoxReward, StacksBaseChainOperation, - StacksBlockCommitmentData, TransactionIdentifier, TransferSTXData, + BitcoinBlockData, BitcoinBlockMetadata, BitcoinNetwork, BitcoinTransactionData, + BitcoinTransactionMetadata, BlockCommitmentData, BlockHeader, BlockIdentifier, + KeyRegistrationData, LockSTXData, OrdinalInscriptionRevealData, OrdinalOperation, PoxReward, + StacksBaseChainOperation, StacksBlockCommitmentData, TransactionIdentifier, TransferSTXData, }; use hiro_system_kit::slog; @@ -261,14 +261,14 @@ pub async fn retrieve_block_hash( } pub fn standardize_bitcoin_block( - config: &EventObserverConfig, block: BitcoinBlockFullBreakdown, + network: &BitcoinNetwork, ctx: &Context, ) -> Result { let mut transactions = vec![]; let block_height = block.height as u64; - let expected_magic_bytes = get_stacks_canonical_magic_bytes(&config.bitcoin_network); - let pox_config = get_canonical_pox_config(&config.bitcoin_network); + let expected_magic_bytes = get_stacks_canonical_magic_bytes(&network); + let pox_config = get_canonical_pox_config(&network); ctx.try_log(|logger| slog::debug!(logger, "Standardizing Bitcoin block {}", block.hash,)); diff --git a/components/chainhook-event-observer/src/observer/mod.rs b/components/chainhook-event-observer/src/observer/mod.rs index c1c271b..0bf5d99 100644 --- a/components/chainhook-event-observer/src/observer/mod.rs +++ b/components/chainhook-event-observer/src/observer/mod.rs @@ -152,6 +152,7 @@ impl EventObserverConfig { username: self.bitcoin_node_username.clone(), password: self.bitcoin_node_password.clone(), rpc_url: self.bitcoin_node_rpc_url.clone(), + network: self.bitcoin_network.clone(), }; bitcoin_config } @@ -224,6 +225,7 @@ pub struct BitcoinConfig { pub username: String, pub password: String, pub rpc_url: String, + pub network: BitcoinNetwork, } #[derive(Debug, Clone)] @@ -507,7 +509,8 @@ pub async fn start_observer_commands_handler( break; } ObserverCommand::ProcessBitcoinBlock(block_data) => { - let new_block = standardize_bitcoin_block(&config, block_data, &ctx)?; + let new_block = + standardize_bitcoin_block(block_data, &config.bitcoin_network, &ctx)?; bitcoin_block_store.insert(new_block.block_identifier.clone(), new_block); } ObserverCommand::CacheBitcoinBlock(block) => { @@ -528,7 +531,9 @@ pub async fn start_observer_commands_handler( match bitcoin_block_store.get_mut(&header.block_identifier) { Some(block) => { if let Err(e) = update_hord_db_and_augment_bitcoin_block( - block, &config, &ctx, + block, + &config.get_cache_path_buf(), + &ctx, ) { ctx.try_log(|logger| { slog::error!( @@ -612,7 +617,9 @@ pub async fn start_observer_commands_handler( match bitcoin_block_store.get_mut(&header.block_identifier) { Some(block) => { if let Err(e) = update_hord_db_and_augment_bitcoin_block( - block, &config, &ctx, + block, + &config.get_cache_path_buf(), + &ctx, ) { ctx.try_log(|logger| { slog::error!(