fix: clean up rocksdb connections during rollbacks (#420)

* fix: only one rocksdb conn

* unify context

* improve logs

* clean up connections

* revert sequencing

* improve logs

* revert logs
This commit is contained in:
Rafael Cárdenas
2025-02-13 09:01:24 -06:00
committed by GitHub
parent 48b3ccef48
commit 216cd52c0e
2 changed files with 72 additions and 88 deletions

View File

@@ -8,7 +8,7 @@ use std::{
use chainhook_postgres::{pg_begin, pg_pool_client};
use chainhook_sdk::utils::Context;
use chainhook_types::{BitcoinBlockData, TransactionIdentifier};
use crossbeam_channel::{Sender, TryRecvError};
use crossbeam_channel::TryRecvError;
use dashmap::DashMap;
use fxhash::FxHasher;
@@ -33,11 +33,7 @@ use crate::{
sequence_cursor::SequenceCursor,
},
},
db::{
blocks::{self, open_blocks_db_with_retry},
cursor::TransactionBytesCursor,
ordinals_pg,
},
db::{blocks::open_blocks_db_with_retry, cursor::TransactionBytesCursor, ordinals_pg},
service::PgConnectionPools,
try_crit, try_debug, try_info,
utils::monitoring::PrometheusMonitoring,
@@ -55,7 +51,6 @@ pub fn start_inscription_indexing_processor(
config: &Config,
pg_pools: &PgConnectionPools,
ctx: &Context,
post_processor: Option<Sender<BitcoinBlockData>>,
prometheus: &PrometheusMonitoring,
) -> PostProcessorController {
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
@@ -122,7 +117,6 @@ pub fn start_inscription_indexing_processor(
&mut sequence_cursor,
&cache_l2,
&mut brc20_cache,
&post_processor,
&prometheus,
&config,
&pg_pools,
@@ -155,12 +149,11 @@ pub fn start_inscription_indexing_processor(
}
}
pub async fn process_blocks(
async fn process_blocks(
next_blocks: &mut Vec<BitcoinBlockData>,
sequence_cursor: &mut SequenceCursor,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
brc20_cache: &mut Option<Brc20MemoryCache>,
post_processor: &Option<Sender<BitcoinBlockData>>,
prometheus: &PrometheusMonitoring,
config: &Config,
pg_pools: &PgConnectionPools,
@@ -172,14 +165,7 @@ pub async fn process_blocks(
for _cursor in 0..next_blocks.len() {
let mut block = next_blocks.remove(0);
// Invalidate and recompute cursor when crossing the jubilee height
let jubilee_height =
get_jubilee_block_height(&get_bitcoin_network(&block.metadata.network));
if block.block_identifier.index == jubilee_height {
sequence_cursor.reset();
}
process_block(
index_block(
&mut block,
&next_blocks,
sequence_cursor,
@@ -193,15 +179,12 @@ pub async fn process_blocks(
)
.await?;
if let Some(post_processor_tx) = post_processor {
let _ = post_processor_tx.send(block.clone());
}
updated_blocks.push(block);
}
Ok(updated_blocks)
}
pub async fn process_block(
pub async fn index_block(
block: &mut BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
sequence_cursor: &mut SequenceCursor,
@@ -217,6 +200,13 @@ pub async fn process_block(
let block_height = block.block_identifier.index;
try_info!(ctx, "Indexing block #{block_height}");
// Invalidate and recompute cursor when crossing the jubilee height
if block.block_identifier.index
== get_jubilee_block_height(&get_bitcoin_network(&block.metadata.network))
{
sequence_cursor.reset();
}
{
let mut ord_client = pg_pool_client(&pg_pools.ordinals).await?;
let ord_tx = pg_begin(&mut ord_client).await?;
@@ -233,16 +223,11 @@ pub async fn process_block(
config,
ctx,
)?;
let inner_ctx = if config.logs.ordinals_internals {
ctx.clone()
} else {
Context::empty()
};
if has_inscription_reveals {
augment_block_with_inscriptions(block, sequence_cursor, cache_l1, &ord_tx, &inner_ctx)
augment_block_with_inscriptions(block, sequence_cursor, cache_l1, &ord_tx, ctx)
.await?;
}
augment_block_with_transfers(block, &ord_tx, &inner_ctx).await?;
augment_block_with_transfers(block, &ord_tx, ctx).await?;
// Write data
ordinals_pg::insert_block(block, &ord_tx).await?;
@@ -294,15 +279,6 @@ pub async fn rollback_block(
ctx: &Context,
) -> Result<(), String> {
try_info!(ctx, "Rolling back block #{block_height}");
// Drop from blocks DB.
let blocks_db = open_blocks_db_with_retry(true, &config, ctx);
blocks::delete_blocks_in_block_range(
block_height as u32,
block_height as u32,
&blocks_db,
&ctx,
);
// Drop from postgres.
{
let mut ord_client = pg_pool_client(&pg_pools.ordinals).await?;
let ord_tx = pg_begin(&mut ord_client).await?;

View File

@@ -3,7 +3,7 @@ use crate::core::meta_protocols::brc20::cache::{brc20_new_cache, Brc20MemoryCach
use crate::core::pipeline::bitcoind_download_blocks;
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
use crate::core::pipeline::processors::inscription_indexing::{
process_block, rollback_block, start_inscription_indexing_processor,
index_block, rollback_block, start_inscription_indexing_processor,
};
use crate::core::protocol::sequence_cursor::SequenceCursor;
use crate::core::{
@@ -11,7 +11,7 @@ use crate::core::{
should_sync_rocks_db,
};
use crate::db::blocks::{
find_missing_blocks, insert_entry_in_blocks, open_blocks_db_with_retry, run_compaction,
self, find_missing_blocks, open_blocks_db_with_retry, run_compaction,
};
use crate::db::cursor::{BlockBytesCursor, TransactionBytesCursor};
use crate::db::ordinals_pg;
@@ -22,8 +22,8 @@ use chainhook_postgres::{pg_begin, pg_pool, pg_pool_client};
use chainhook_sdk::observer::{
start_event_observer, BitcoinBlockDataCached, ObserverEvent, ObserverSidecar,
};
use chainhook_types::BlockIdentifier;
use chainhook_sdk::utils::{BlockHeights, Context};
use chainhook_types::BlockIdentifier;
use crossbeam_channel::select;
use dashmap::DashMap;
use deadpool_postgres::Pool;
@@ -303,7 +303,6 @@ impl Service {
&self.config,
&self.pg_pools,
&self.ctx,
None,
&self.prometheus,
);
try_info!(
@@ -332,7 +331,7 @@ impl Service {
pub async fn chainhook_sidecar_mutate_blocks(
blocks_to_mutate: &mut Vec<BitcoinBlockDataCached>,
blocks_ids_to_rollback: &Vec<BlockIdentifier>,
block_ids_to_rollback: &Vec<BlockIdentifier>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
brc20_cache: &mut Option<Brc20MemoryCache>,
prometheus: &PrometheusMonitoring,
@@ -340,55 +339,64 @@ pub async fn chainhook_sidecar_mutate_blocks(
pg_pools: &PgConnectionPools,
ctx: &Context,
) -> Result<(), String> {
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
for block_id_to_rollback in blocks_ids_to_rollback.iter() {
rollback_block(block_id_to_rollback.index, config, pg_pools, ctx).await?;
}
for cache in blocks_to_mutate.iter_mut() {
let block_bytes = match BlockBytesCursor::from_standardized_block(&cache.block) {
Ok(block_bytes) => block_bytes,
Err(e) => {
try_error!(
ctx,
"Unable to compress block #{}: #{}",
cache.block.block_identifier.index,
e.to_string()
);
continue;
}
};
insert_entry_in_blocks(
cache.block.block_identifier.index as u32,
&block_bytes,
true,
&blocks_db_rw,
&ctx,
);
if block_ids_to_rollback.len() > 0 {
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
for block_id in block_ids_to_rollback.iter() {
blocks::delete_blocks_in_block_range(
block_id.index as u32,
block_id.index as u32,
&blocks_db_rw,
&ctx,
);
rollback_block(block_id.index, config, pg_pools, ctx).await?;
}
blocks_db_rw
.flush()
.map_err(|e| format!("error inserting block to rocksdb: {e}"))?;
.map_err(|e| format!("error dropping rollback blocks from rocksdb: {e}"))?;
}
if !cache.processed_by_sidecar {
let mut cache_l1 = BTreeMap::new();
let mut sequence_cursor = SequenceCursor::new();
process_block(
&mut cache.block,
&vec![],
&mut sequence_cursor,
&mut cache_l1,
&cache_l2,
brc20_cache.as_mut(),
prometheus,
&config,
pg_pools,
&ctx,
)
.await?;
cache.processed_by_sidecar = true;
for cached_block in blocks_to_mutate.iter_mut() {
if cached_block.processed_by_sidecar {
continue;
}
let block_bytes = match BlockBytesCursor::from_standardized_block(&cached_block.block) {
Ok(block_bytes) => block_bytes,
Err(e) => {
return Err(format!(
"Unable to compress block #{}: #{e}",
cached_block.block.block_identifier.index
));
}
};
{
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
blocks::insert_entry_in_blocks(
cached_block.block.block_identifier.index as u32,
&block_bytes,
true,
&blocks_db_rw,
&ctx,
);
blocks_db_rw
.flush()
.map_err(|e| format!("error inserting block to rocksdb: {e}"))?;
}
let mut cache_l1 = BTreeMap::new();
let mut sequence_cursor = SequenceCursor::new();
index_block(
&mut cached_block.block,
&vec![],
&mut sequence_cursor,
&mut cache_l1,
&cache_l2,
brc20_cache.as_mut(),
prometheus,
&config,
pg_pools,
&ctx,
)
.await?;
cached_block.processed_by_sidecar = true;
}
Ok(())
}