diff --git a/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs b/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs index ebb8e27..773cfcb 100644 --- a/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs +++ b/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs @@ -8,7 +8,7 @@ use std::{ use chainhook_postgres::{pg_begin, pg_pool_client}; use chainhook_sdk::utils::Context; use chainhook_types::{BitcoinBlockData, TransactionIdentifier}; -use crossbeam_channel::{Sender, TryRecvError}; +use crossbeam_channel::TryRecvError; use dashmap::DashMap; use fxhash::FxHasher; @@ -33,11 +33,7 @@ use crate::{ sequence_cursor::SequenceCursor, }, }, - db::{ - blocks::{self, open_blocks_db_with_retry}, - cursor::TransactionBytesCursor, - ordinals_pg, - }, + db::{blocks::open_blocks_db_with_retry, cursor::TransactionBytesCursor, ordinals_pg}, service::PgConnectionPools, try_crit, try_debug, try_info, utils::monitoring::PrometheusMonitoring, @@ -55,7 +51,6 @@ pub fn start_inscription_indexing_processor( config: &Config, pg_pools: &PgConnectionPools, ctx: &Context, - post_processor: Option>, prometheus: &PrometheusMonitoring, ) -> PostProcessorController { let (commands_tx, commands_rx) = crossbeam_channel::bounded::(2); @@ -122,7 +117,6 @@ pub fn start_inscription_indexing_processor( &mut sequence_cursor, &cache_l2, &mut brc20_cache, - &post_processor, &prometheus, &config, &pg_pools, @@ -155,12 +149,11 @@ pub fn start_inscription_indexing_processor( } } -pub async fn process_blocks( +async fn process_blocks( next_blocks: &mut Vec, sequence_cursor: &mut SequenceCursor, cache_l2: &Arc>>, brc20_cache: &mut Option, - post_processor: &Option>, prometheus: &PrometheusMonitoring, config: &Config, pg_pools: &PgConnectionPools, @@ -172,14 +165,7 @@ pub async fn process_blocks( for _cursor in 0..next_blocks.len() { let mut block = next_blocks.remove(0); - // Invalidate and recompute cursor when crossing the jubilee height - let jubilee_height = - get_jubilee_block_height(&get_bitcoin_network(&block.metadata.network)); - if block.block_identifier.index == jubilee_height { - sequence_cursor.reset(); - } - - process_block( + index_block( &mut block, &next_blocks, sequence_cursor, @@ -193,15 +179,12 @@ pub async fn process_blocks( ) .await?; - if let Some(post_processor_tx) = post_processor { - let _ = post_processor_tx.send(block.clone()); - } updated_blocks.push(block); } Ok(updated_blocks) } -pub async fn process_block( +pub async fn index_block( block: &mut BitcoinBlockData, next_blocks: &Vec, sequence_cursor: &mut SequenceCursor, @@ -217,6 +200,13 @@ pub async fn process_block( let block_height = block.block_identifier.index; try_info!(ctx, "Indexing block #{block_height}"); + // Invalidate and recompute cursor when crossing the jubilee height + if block.block_identifier.index + == get_jubilee_block_height(&get_bitcoin_network(&block.metadata.network)) + { + sequence_cursor.reset(); + } + { let mut ord_client = pg_pool_client(&pg_pools.ordinals).await?; let ord_tx = pg_begin(&mut ord_client).await?; @@ -233,16 +223,11 @@ pub async fn process_block( config, ctx, )?; - let inner_ctx = if config.logs.ordinals_internals { - ctx.clone() - } else { - Context::empty() - }; if has_inscription_reveals { - augment_block_with_inscriptions(block, sequence_cursor, cache_l1, &ord_tx, &inner_ctx) + augment_block_with_inscriptions(block, sequence_cursor, cache_l1, &ord_tx, ctx) .await?; } - augment_block_with_transfers(block, &ord_tx, &inner_ctx).await?; + augment_block_with_transfers(block, &ord_tx, ctx).await?; // Write data ordinals_pg::insert_block(block, &ord_tx).await?; @@ -294,15 +279,6 @@ pub async fn rollback_block( ctx: &Context, ) -> Result<(), String> { try_info!(ctx, "Rolling back block #{block_height}"); - // Drop from blocks DB. - let blocks_db = open_blocks_db_with_retry(true, &config, ctx); - blocks::delete_blocks_in_block_range( - block_height as u32, - block_height as u32, - &blocks_db, - &ctx, - ); - // Drop from postgres. { let mut ord_client = pg_pool_client(&pg_pools.ordinals).await?; let ord_tx = pg_begin(&mut ord_client).await?; diff --git a/components/ordhook-core/src/service/mod.rs b/components/ordhook-core/src/service/mod.rs index 3edc98a..94aef9f 100644 --- a/components/ordhook-core/src/service/mod.rs +++ b/components/ordhook-core/src/service/mod.rs @@ -3,7 +3,7 @@ use crate::core::meta_protocols::brc20::cache::{brc20_new_cache, Brc20MemoryCach use crate::core::pipeline::bitcoind_download_blocks; use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor; use crate::core::pipeline::processors::inscription_indexing::{ - process_block, rollback_block, start_inscription_indexing_processor, + index_block, rollback_block, start_inscription_indexing_processor, }; use crate::core::protocol::sequence_cursor::SequenceCursor; use crate::core::{ @@ -11,7 +11,7 @@ use crate::core::{ should_sync_rocks_db, }; use crate::db::blocks::{ - find_missing_blocks, insert_entry_in_blocks, open_blocks_db_with_retry, run_compaction, + self, find_missing_blocks, open_blocks_db_with_retry, run_compaction, }; use crate::db::cursor::{BlockBytesCursor, TransactionBytesCursor}; use crate::db::ordinals_pg; @@ -22,8 +22,8 @@ use chainhook_postgres::{pg_begin, pg_pool, pg_pool_client}; use chainhook_sdk::observer::{ start_event_observer, BitcoinBlockDataCached, ObserverEvent, ObserverSidecar, }; -use chainhook_types::BlockIdentifier; use chainhook_sdk::utils::{BlockHeights, Context}; +use chainhook_types::BlockIdentifier; use crossbeam_channel::select; use dashmap::DashMap; use deadpool_postgres::Pool; @@ -303,7 +303,6 @@ impl Service { &self.config, &self.pg_pools, &self.ctx, - None, &self.prometheus, ); try_info!( @@ -332,7 +331,7 @@ impl Service { pub async fn chainhook_sidecar_mutate_blocks( blocks_to_mutate: &mut Vec, - blocks_ids_to_rollback: &Vec, + block_ids_to_rollback: &Vec, cache_l2: &Arc>>, brc20_cache: &mut Option, prometheus: &PrometheusMonitoring, @@ -340,55 +339,64 @@ pub async fn chainhook_sidecar_mutate_blocks( pg_pools: &PgConnectionPools, ctx: &Context, ) -> Result<(), String> { - let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx); - - for block_id_to_rollback in blocks_ids_to_rollback.iter() { - rollback_block(block_id_to_rollback.index, config, pg_pools, ctx).await?; - } - - for cache in blocks_to_mutate.iter_mut() { - let block_bytes = match BlockBytesCursor::from_standardized_block(&cache.block) { - Ok(block_bytes) => block_bytes, - Err(e) => { - try_error!( - ctx, - "Unable to compress block #{}: #{}", - cache.block.block_identifier.index, - e.to_string() - ); - continue; - } - }; - - insert_entry_in_blocks( - cache.block.block_identifier.index as u32, - &block_bytes, - true, - &blocks_db_rw, - &ctx, - ); + if block_ids_to_rollback.len() > 0 { + let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx); + for block_id in block_ids_to_rollback.iter() { + blocks::delete_blocks_in_block_range( + block_id.index as u32, + block_id.index as u32, + &blocks_db_rw, + &ctx, + ); + rollback_block(block_id.index, config, pg_pools, ctx).await?; + } blocks_db_rw .flush() - .map_err(|e| format!("error inserting block to rocksdb: {e}"))?; + .map_err(|e| format!("error dropping rollback blocks from rocksdb: {e}"))?; + } - if !cache.processed_by_sidecar { - let mut cache_l1 = BTreeMap::new(); - let mut sequence_cursor = SequenceCursor::new(); - process_block( - &mut cache.block, - &vec![], - &mut sequence_cursor, - &mut cache_l1, - &cache_l2, - brc20_cache.as_mut(), - prometheus, - &config, - pg_pools, - &ctx, - ) - .await?; - cache.processed_by_sidecar = true; + for cached_block in blocks_to_mutate.iter_mut() { + if cached_block.processed_by_sidecar { + continue; } + let block_bytes = match BlockBytesCursor::from_standardized_block(&cached_block.block) { + Ok(block_bytes) => block_bytes, + Err(e) => { + return Err(format!( + "Unable to compress block #{}: #{e}", + cached_block.block.block_identifier.index + )); + } + }; + { + let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx); + blocks::insert_entry_in_blocks( + cached_block.block.block_identifier.index as u32, + &block_bytes, + true, + &blocks_db_rw, + &ctx, + ); + blocks_db_rw + .flush() + .map_err(|e| format!("error inserting block to rocksdb: {e}"))?; + } + let mut cache_l1 = BTreeMap::new(); + let mut sequence_cursor = SequenceCursor::new(); + index_block( + &mut cached_block.block, + &vec![], + &mut sequence_cursor, + &mut cache_l1, + &cache_l2, + brc20_cache.as_mut(), + prometheus, + &config, + pg_pools, + &ctx, + ) + .await?; + cached_block.processed_by_sidecar = true; } Ok(()) }