diff --git a/README.md b/README.md index ab9bb82..eb86f1d 100644 --- a/README.md +++ b/README.md @@ -372,6 +372,16 @@ The current `stacks` predicates supports the following `if_this` constructs: } } +// Get any stacks block matching constraints +// `block_height` mandatory argument admits: +// - `equals`, `higher_than`, `lower_than`, `between`: integer type. +{ + "if_this": { + "scope": "block_height", + "higher_than": 10000 + } +} + // Get any transaction related to a given fungible token asset identifier // `asset-identifier` mandatory argument admits: // - string type, fully qualifying the asset identifier to observe. example: `ST1PQHQKV0RJXZFY1DGX8MNSNYVE3VGZJSRTPGZGM.cbtc-sip10::cbtc` diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index 0f0f304..801567b 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -7,7 +7,7 @@ use crate::scan::stacks::scan_stacks_chain_with_predicate; use chainhook_event_observer::chainhooks::types::ChainhookFullSpecification; use chainhook_event_observer::indexer::ordinals::db::{ build_bitcoin_traversal_local_storage, open_readonly_ordinals_db_conn, - retrieve_satoshi_point_using_local_storage, + retrieve_satoshi_point_using_local_storage, open_readwrite_ordinals_db_conn, initialize_ordinal_state_storage, }; use chainhook_event_observer::indexer::ordinals::ord::height::Height; use chainhook_event_observer::observer::BitcoinConfig; @@ -294,8 +294,9 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { index: cmd.block_height, hash: "".into(), }; - let storage_conn = - open_readonly_ordinals_db_conn(&config.expected_cache_path()).unwrap(); + + let storage_conn = open_readonly_ordinals_db_conn(&config.expected_cache_path(), &ctx).unwrap(); + let (block_height, offset) = retrieve_satoshi_point_using_local_storage( &storage_conn, &block_identifier, @@ -318,9 +319,11 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { rpc_url: config.network.bitcoin_node_rpc_url.clone(), }; + let storage_conn = initialize_ordinal_state_storage(&config.expected_cache_path(), &ctx); + let _ = build_bitcoin_traversal_local_storage( &bitcoin_config, - &config.expected_cache_path(), + &storage_conn, cmd.start_block, cmd.end_block, &ctx, diff --git a/components/chainhook-cli/src/scan/bitcoin.rs b/components/chainhook-cli/src/scan/bitcoin.rs index 6551ec0..01cec96 100644 --- a/components/chainhook-cli/src/scan/bitcoin.rs +++ b/components/chainhook-cli/src/scan/bitcoin.rs @@ -13,9 +13,8 @@ use chainhook_event_observer::indexer::bitcoin::{ retrieve_block_hash, retrieve_full_block_breakdown_with_retry, }; use chainhook_event_observer::indexer::ordinals::db::{ - get_default_ordinals_db_file_path, initialize_ordinal_state_storage, - open_readonly_ordinals_db_conn, retrieve_satoshi_point_using_local_storage, - write_compacted_block_to_index, CompactedBlock, + initialize_ordinal_state_storage, open_readonly_ordinals_db_conn, + retrieve_satoshi_point_using_local_storage, write_compacted_block_to_index, CompactedBlock, }; use chainhook_event_observer::indexer::ordinals::ord::indexing::entry::Entry; use chainhook_event_observer::indexer::ordinals::ord::indexing::{ @@ -193,7 +192,8 @@ pub async fn scan_bitcoin_chain_with_predicate( ctx_.expect_logger(), "Retrieving satoshi point for {}", transaction.transaction_identifier.hash ); - let storage_conn = open_readonly_ordinals_db_conn(&cache_path).unwrap(); + + let storage_conn = open_readonly_ordinals_db_conn(&cache_path, &ctx_).unwrap(); let res = retrieve_satoshi_point_using_local_storage( &storage_conn, &block_identifier, @@ -237,10 +237,9 @@ pub async fn scan_bitcoin_chain_with_predicate( .expect("unable to detach thread"); let ctx_ = ctx.clone(); - let db_file = get_default_ordinals_db_file_path(&config.expected_cache_path()); + let conn = initialize_ordinal_state_storage(&config.expected_cache_path(), &ctx_); let handle_3 = hiro_system_kit::thread_named("Ordinal ingestion") .spawn(move || { - let conn = initialize_ordinal_state_storage(&db_file, &ctx_); while let Ok(Some((height, compacted_block))) = cache_block_rx.recv() { info!(ctx_.expect_logger(), "Caching block #{height}"); write_compacted_block_to_index(height, &compacted_block, &conn, &ctx_); diff --git a/components/chainhook-event-observer/src/chainhooks/bitcoin/mod.rs b/components/chainhook-event-observer/src/chainhooks/bitcoin/mod.rs index 00026a1..2280f77 100644 --- a/components/chainhook-event-observer/src/chainhooks/bitcoin/mod.rs +++ b/components/chainhook-event-observer/src/chainhooks/bitcoin/mod.rs @@ -406,6 +406,16 @@ impl BitcoinPredicateType { } false } + BitcoinPredicateType::Protocol(Protocols::Ordinal( + OrdinalOperations::InscriptionTransfered, + )) => { + for op in tx.metadata.ordinal_operations.iter() { + if let OrdinalOperation::InscriptionTransfered(_) = op { + return true; + } + } + false + } } } } diff --git a/components/chainhook-event-observer/src/chainhooks/types.rs b/components/chainhook-event-observer/src/chainhooks/types.rs index adc79e9..bcd85a1 100644 --- a/components/chainhook-event-observer/src/chainhooks/types.rs +++ b/components/chainhook-event-observer/src/chainhooks/types.rs @@ -516,6 +516,7 @@ pub enum StacksOperations { #[serde(rename_all = "snake_case")] pub enum OrdinalOperations { InscriptionRevealed, + InscriptionTransfered, } pub fn get_stacks_canonical_magic_bytes(network: &BitcoinNetwork) -> [u8; 2] { diff --git a/components/chainhook-event-observer/src/indexer/ordinals/db/mod.rs b/components/chainhook-event-observer/src/indexer/ordinals/db/mod.rs index 99523e0..e65ba69 100644 --- a/components/chainhook-event-observer/src/indexer/ordinals/db/mod.rs +++ b/components/chainhook-event-observer/src/indexer/ordinals/db/mod.rs @@ -1,6 +1,8 @@ use std::{path::PathBuf, time::Duration}; -use chainhook_types::{BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier}; +use chainhook_types::{ + BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier, +}; use hiro_system_kit::slog; use rand::RngCore; use rusqlite::{Connection, OpenFlags, ToSql}; @@ -14,15 +16,26 @@ use crate::{ utils::Context, }; -pub fn get_default_ordinals_db_file_path(base_dir: &PathBuf) -> PathBuf { +fn get_default_ordinals_db_file_path(base_dir: &PathBuf) -> PathBuf { let mut destination_path = base_dir.clone(); - destination_path.push("bitcoin_block_traversal-readonly.sqlite"); + destination_path.push("bitcoin_block_traversal.sqlite"); destination_path } -pub fn open_readonly_ordinals_db_conn(base_dir: &PathBuf) -> Result { +pub fn open_readonly_ordinals_db_conn( + base_dir: &PathBuf, + ctx: &Context, +) -> Result { let path = get_default_ordinals_db_file_path(&base_dir); - let conn = open_existing_readonly_db(&path); + let conn = open_existing_readonly_db(&path, ctx); + Ok(conn) +} + +pub fn open_readwrite_ordinals_db_conn( + base_dir: &PathBuf, + ctx: &Context, +) -> Result { + let conn = create_or_open_readwrite_db(&base_dir, ctx); Ok(conn) } @@ -65,12 +78,13 @@ pub fn initialize_ordinal_state_storage(path: &PathBuf, ctx: &Context) -> Connec conn } -fn create_or_open_readwrite_db(path: &PathBuf, ctx: &Context) -> Connection { - let open_flags = match std::fs::metadata(path) { +fn create_or_open_readwrite_db(cache_path: &PathBuf, ctx: &Context) -> Connection { + let path = get_default_ordinals_db_file_path(&cache_path); + let open_flags = match std::fs::metadata(&path) { Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { // need to create - if let Some(dirp) = PathBuf::from(path).parent() { + if let Some(dirp) = PathBuf::from(&path).parent() { std::fs::create_dir_all(dirp).unwrap_or_else(|e| { ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string())); }); @@ -86,7 +100,15 @@ fn create_or_open_readwrite_db(path: &PathBuf, ctx: &Context) -> Connection { } }; - let conn = Connection::open_with_flags(path, open_flags).unwrap(); + let conn = loop { + match Connection::open_with_flags(&path, open_flags) { + Ok(conn) => break conn, + Err(e) => { + ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string())); + } + }; + std::thread::sleep(std::time::Duration::from_secs(1)); + }; // db.profile(Some(trace_profile)); // db.busy_handler(Some(tx_busy_handler))?; conn.pragma_update(None, "journal_mode", &"WAL").unwrap(); @@ -94,7 +116,7 @@ fn create_or_open_readwrite_db(path: &PathBuf, ctx: &Context) -> Connection { conn } -fn open_existing_readonly_db(path: &PathBuf) -> Connection { +fn open_existing_readonly_db(path: &PathBuf, ctx: &Context) -> Connection { let open_flags = match std::fs::metadata(path) { Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -109,8 +131,16 @@ fn open_existing_readonly_db(path: &PathBuf) -> Connection { } }; - let conn = Connection::open_with_flags(path, open_flags).unwrap(); - conn + let conn = loop { + match Connection::open_with_flags(path, open_flags) { + Ok(conn) => break conn, + Err(e) => { + ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string())); + } + }; + std::thread::sleep(std::time::Duration::from_secs(1)); + }; + return conn; } #[derive(Debug, Serialize, Deserialize)] @@ -155,6 +185,39 @@ impl CompactedBlock { CompactedBlock(((coinbase_txid, coinbase_value), txs)) } + pub fn from_standardized_block(block: &BitcoinBlockData) -> CompactedBlock { + let mut txs = vec![]; + let mut coinbase_value = 0; + let coinbase_txid = { + let txid = + hex::decode(&block.transactions[0].transaction_identifier.hash[2..]).unwrap(); + [txid[0], txid[1], txid[2], txid[3]] + }; + for coinbase_output in block.transactions[0].metadata.outputs.iter() { + coinbase_value += coinbase_output.value; + } + for tx in block.transactions.iter().skip(1) { + let mut inputs = vec![]; + for input in tx.metadata.inputs.iter() { + let txin = hex::decode(&input.previous_output.txid[2..]).unwrap(); + + inputs.push(( + [txin[0], txin[1], txin[2], txin[3]], + input.previous_output.block_height as u32, + input.previous_output.vout as u16, + input.previous_output.value, + )); + } + let mut outputs = vec![]; + for output in tx.metadata.outputs.iter() { + outputs.push(output.value); + } + let txid = hex::decode(&tx.transaction_identifier.hash[2..]).unwrap(); + txs.push(([txid[0], txid[1], txid[2], txid[3]], inputs, outputs)); + } + CompactedBlock(((coinbase_txid, coinbase_value), txs)) + } + pub fn from_hex_bytes(bytes: &str) -> CompactedBlock { let bytes = hex::decode(&bytes).unwrap(); let value = ciborium::de::from_reader(&bytes[..]).unwrap(); @@ -327,7 +390,7 @@ pub fn write_compacted_block_to_index( pub async fn build_bitcoin_traversal_local_storage( bitcoin_config: &BitcoinConfig, - cache_path: &PathBuf, + storage_conn: &Connection, start_block: u64, end_block: u64, ctx: &Context, @@ -351,14 +414,12 @@ pub async fn build_bitcoin_traversal_local_storage( let future = retrieve_block_hash(&config, &block_height); match hiro_system_kit::nestable_block_on(future) { Ok(block_hash) => { - err_count = 0; - block_hash_tx.send(Some((block_cursor, block_hash))); + let _ = block_hash_tx.send(Some((block_cursor, block_hash))); break; } Err(e) => { err_count += 1; let delay = (err_count + (rng.next_u64() % 3)) * 1000; - println!("retry hash:fetch in {delay}"); std::thread::sleep(std::time::Duration::from_millis(delay)); } } @@ -366,47 +427,55 @@ pub async fn build_bitcoin_traversal_local_storage( }); } - let db_file = get_default_ordinals_db_file_path(&cache_path); let bitcoin_config = bitcoin_config.clone(); let moved_ctx = ctx.clone(); - let handle = hiro_system_kit::thread_named("Block data retrieval").spawn(move || { + let block_data_tx_moved = block_data_tx.clone(); + let handle_1 = hiro_system_kit::thread_named("Block data retrieval").spawn(move || { while let Ok(Some((block_height, block_hash))) = block_hash_rx.recv() { - println!("fetch {block_height}:{block_hash}"); let moved_bitcoin_config = bitcoin_config.clone(); - let block_data_tx = block_data_tx.clone(); + let block_data_tx = block_data_tx_moved.clone(); let moved_ctx = moved_ctx.clone(); retrieve_block_data_pool.execute(move || { + moved_ctx.try_log(|logger| slog::info!(logger, "Fetching block #{block_height}")); let future = retrieve_full_block_breakdown_with_retry( &moved_bitcoin_config, &block_hash, &moved_ctx, ); let block_data = hiro_system_kit::nestable_block_on(future).unwrap(); - block_data_tx.send(Some(block_data)); + let _ = block_data_tx.send(Some(block_data)); }); - retrieve_block_data_pool.join() + let res = retrieve_block_data_pool.join(); + res } - }); + }).expect("unable to spawn thread"); - let handle = hiro_system_kit::thread_named("Block data compression").spawn(move || { + let handle_2 = hiro_system_kit::thread_named("Block data compression").spawn(move || { while let Ok(Some(block_data)) = block_data_rx.recv() { - println!("store {}:{}", block_data.height, block_data.hash); - let block_compressed_tx = block_compressed_tx.clone(); + let block_compressed_tx_moved = block_compressed_tx.clone(); compress_block_data_pool.execute(move || { let compressed_block = CompactedBlock::from_full_block(&block_data); - block_compressed_tx.send(Some((block_data.height as u32, compressed_block))); + let _ = block_compressed_tx_moved + .send(Some((block_data.height as u32, compressed_block))); }); - compress_block_data_pool.join() + let res = compress_block_data_pool.join(); + // let _ = block_compressed_tx.send(None); + res } - }); - - let conn = initialize_ordinal_state_storage(&db_file, &ctx); + }).expect("unable to spawn thread"); + let mut blocks_stored = 0; while let Ok(Some((block_height, compacted_block))) = block_compressed_rx.recv() { - ctx.try_log(|logger| slog::debug!(logger, "Storing block #{block_height}")); - - write_compacted_block_to_index(block_height, &compacted_block, &conn, &ctx); + ctx.try_log(|logger| slog::info!(logger, "Storing block #{block_height}")); + write_compacted_block_to_index(block_height, &compacted_block, &storage_conn, &ctx); + blocks_stored+= 1; + if blocks_stored == end_block - start_block { + let _ = block_data_tx.send(None); + let _ = block_hash_tx.send(None); + ctx.try_log(|logger| slog::info!(logger, "Local ordinals storage successfully seeded with #{blocks_stored} blocks")); + return Ok(()) + } } retrieve_block_hash_pool.join(); @@ -429,7 +498,12 @@ pub fn retrieve_satoshi_point_using_local_storage( let mut tx_cursor = (txid, 0); loop { - let res = retrieve_compacted_block_from_index(ordinal_block_number, &storage_conn).unwrap(); + let res = match retrieve_compacted_block_from_index(ordinal_block_number, &storage_conn) { + Some(res) => res, + None => { + return Err(format!("unable to retrieve block ##{ordinal_block_number}")); + } + }; ctx.try_log(|logger| { slog::debug!( @@ -527,11 +601,3 @@ pub fn retrieve_satoshi_point_using_local_storage( Ok((ordinal_block_number.into(), ordinal_offset)) } -// pub async fn scan_bitcoin_chain_for_ordinal_inscriptions( -// subscribers: Vec, -// first_inscription_height: u64, -// config: &Config, -// ctx: &Context, -// ) -> Result<(), String> { -// Ok(()) -// } diff --git a/components/chainhook-event-observer/src/observer/mod.rs b/components/chainhook-event-observer/src/observer/mod.rs index c7617f6..26dceba 100644 --- a/components/chainhook-event-observer/src/observer/mod.rs +++ b/components/chainhook-event-observer/src/observer/mod.rs @@ -12,9 +12,10 @@ use crate::chainhooks::types::{ use crate::indexer::bitcoin::{retrieve_full_block_breakdown_with_retry, NewBitcoinBlock}; use crate::indexer::ordinals::db::{ find_inscription_with_satoshi_id, find_inscriptions_at_wached_outpoint, - find_last_inscription_number, get_default_ordinals_db_file_path, - open_readonly_ordinals_db_conn, retrieve_satoshi_point_using_local_storage, + find_last_inscription_number, initialize_ordinal_state_storage, open_readonly_ordinals_db_conn, + open_readwrite_ordinals_db_conn, retrieve_satoshi_point_using_local_storage, scan_existing_inscriptions_id, store_new_inscription, update_transfered_inscription, + write_compacted_block_to_index, CompactedBlock, }; use crate::indexer::ordinals::ord::height::Height; use crate::indexer::ordinals::ord::{ @@ -478,7 +479,9 @@ pub async fn start_observer_commands_handler( let event_handlers = config.event_handlers.clone(); let mut chainhooks_lookup: HashMap = HashMap::new(); let networks = (&config.bitcoin_network, &config.stacks_network); - let ordinals_db_conn = open_readonly_ordinals_db_conn(&config.get_cache_path_buf())?; + // { + // let _ = initialize_ordinal_state_storage(&config.get_cache_path_buf(), &ctx); + // } loop { let command = match observer_commands_rx.recv() { Ok(cmd) => cmd, @@ -506,6 +509,9 @@ pub async fn start_observer_commands_handler( } // ObserverCommand::ProcessBitcoinBlock? ObserverCommand::PropagateBitcoinChainEvent(mut chain_event) => { + let ordinals_db_conn = + open_readonly_ordinals_db_conn(&config.get_cache_path_buf(), &ctx)?; + ctx.try_log(|logger| { slog::info!(logger, "Handling PropagateBitcoinChainEvent command") }); @@ -515,7 +521,8 @@ pub async fn start_observer_commands_handler( BitcoinChainEvent::ChainUpdatedWithBlocks(ref mut new_blocks) => { // Look for inscription transfered let storage_conn = - open_readonly_ordinals_db_conn(&config.get_cache_path_buf()).unwrap(); // TODO(lgalabru) + open_readonly_ordinals_db_conn(&config.get_cache_path_buf(), &ctx) + .unwrap(); // TODO(lgalabru) for new_block in new_blocks.new_blocks.iter_mut() { let mut coinbase_offset = 0; @@ -579,8 +586,9 @@ pub async fn start_observer_commands_handler( { let storage_rw_conn = - open_readonly_ordinals_db_conn( + open_readwrite_ordinals_db_conn( &config.get_cache_path_buf(), + &ctx, ) .unwrap(); // TODO(lgalabru) store_new_inscription( @@ -684,8 +692,9 @@ pub async fn start_observer_commands_handler( // Update watched outpoint { - let storage_rw_conn = open_readonly_ordinals_db_conn( + let storage_rw_conn = open_readwrite_ordinals_db_conn( &config.get_cache_path_buf(), + &ctx, ) .unwrap(); // TODO(lgalabru) update_transfered_inscription( @@ -723,8 +732,30 @@ pub async fn start_observer_commands_handler( coinbase_offset += new_tx.metadata.fee; } - // TODO: - // - persist compacted block in table blocks + // Persist compacted block in table blocks + { + ctx.try_log(|logger| { + slog::info!( + logger, + "Caching Bitcoin block #{} for further traversals", + new_block.block_identifier.index + ) + }); + + let compacted_block = + CompactedBlock::from_standardized_block(&new_block); + let storage_rw_conn = open_readwrite_ordinals_db_conn( + &config.get_cache_path_buf(), + &ctx, + ) + .unwrap(); // TODO(lgalabru) + write_compacted_block_to_index( + new_block.block_identifier.index as u32, + &compacted_block, + &storage_rw_conn, + &ctx, + ); + } } } BitcoinChainEvent::ChainUpdatedWithReorg(ref mut reorg) => {