mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 08:34:17 +08:00
fix: clean up rocksdb connections during rollbacks (#420)
* fix: only one rocksdb conn * unify context * improve logs * clean up connections * revert sequencing * improve logs * revert logs
This commit is contained in:
@@ -8,7 +8,7 @@ use std::{
|
||||
use chainhook_postgres::{pg_begin, pg_pool_client};
|
||||
use chainhook_sdk::utils::Context;
|
||||
use chainhook_types::{BitcoinBlockData, TransactionIdentifier};
|
||||
use crossbeam_channel::{Sender, TryRecvError};
|
||||
use crossbeam_channel::TryRecvError;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use fxhash::FxHasher;
|
||||
@@ -33,11 +33,7 @@ use crate::{
|
||||
sequence_cursor::SequenceCursor,
|
||||
},
|
||||
},
|
||||
db::{
|
||||
blocks::{self, open_blocks_db_with_retry},
|
||||
cursor::TransactionBytesCursor,
|
||||
ordinals_pg,
|
||||
},
|
||||
db::{blocks::open_blocks_db_with_retry, cursor::TransactionBytesCursor, ordinals_pg},
|
||||
service::PgConnectionPools,
|
||||
try_crit, try_debug, try_info,
|
||||
utils::monitoring::PrometheusMonitoring,
|
||||
@@ -55,7 +51,6 @@ pub fn start_inscription_indexing_processor(
|
||||
config: &Config,
|
||||
pg_pools: &PgConnectionPools,
|
||||
ctx: &Context,
|
||||
post_processor: Option<Sender<BitcoinBlockData>>,
|
||||
prometheus: &PrometheusMonitoring,
|
||||
) -> PostProcessorController {
|
||||
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
|
||||
@@ -122,7 +117,6 @@ pub fn start_inscription_indexing_processor(
|
||||
&mut sequence_cursor,
|
||||
&cache_l2,
|
||||
&mut brc20_cache,
|
||||
&post_processor,
|
||||
&prometheus,
|
||||
&config,
|
||||
&pg_pools,
|
||||
@@ -155,12 +149,11 @@ pub fn start_inscription_indexing_processor(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_blocks(
|
||||
async fn process_blocks(
|
||||
next_blocks: &mut Vec<BitcoinBlockData>,
|
||||
sequence_cursor: &mut SequenceCursor,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
|
||||
brc20_cache: &mut Option<Brc20MemoryCache>,
|
||||
post_processor: &Option<Sender<BitcoinBlockData>>,
|
||||
prometheus: &PrometheusMonitoring,
|
||||
config: &Config,
|
||||
pg_pools: &PgConnectionPools,
|
||||
@@ -172,14 +165,7 @@ pub async fn process_blocks(
|
||||
for _cursor in 0..next_blocks.len() {
|
||||
let mut block = next_blocks.remove(0);
|
||||
|
||||
// Invalidate and recompute cursor when crossing the jubilee height
|
||||
let jubilee_height =
|
||||
get_jubilee_block_height(&get_bitcoin_network(&block.metadata.network));
|
||||
if block.block_identifier.index == jubilee_height {
|
||||
sequence_cursor.reset();
|
||||
}
|
||||
|
||||
process_block(
|
||||
index_block(
|
||||
&mut block,
|
||||
&next_blocks,
|
||||
sequence_cursor,
|
||||
@@ -193,15 +179,12 @@ pub async fn process_blocks(
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(post_processor_tx) = post_processor {
|
||||
let _ = post_processor_tx.send(block.clone());
|
||||
}
|
||||
updated_blocks.push(block);
|
||||
}
|
||||
Ok(updated_blocks)
|
||||
}
|
||||
|
||||
pub async fn process_block(
|
||||
pub async fn index_block(
|
||||
block: &mut BitcoinBlockData,
|
||||
next_blocks: &Vec<BitcoinBlockData>,
|
||||
sequence_cursor: &mut SequenceCursor,
|
||||
@@ -217,6 +200,13 @@ pub async fn process_block(
|
||||
let block_height = block.block_identifier.index;
|
||||
try_info!(ctx, "Indexing block #{block_height}");
|
||||
|
||||
// Invalidate and recompute cursor when crossing the jubilee height
|
||||
if block.block_identifier.index
|
||||
== get_jubilee_block_height(&get_bitcoin_network(&block.metadata.network))
|
||||
{
|
||||
sequence_cursor.reset();
|
||||
}
|
||||
|
||||
{
|
||||
let mut ord_client = pg_pool_client(&pg_pools.ordinals).await?;
|
||||
let ord_tx = pg_begin(&mut ord_client).await?;
|
||||
@@ -233,16 +223,11 @@ pub async fn process_block(
|
||||
config,
|
||||
ctx,
|
||||
)?;
|
||||
let inner_ctx = if config.logs.ordinals_internals {
|
||||
ctx.clone()
|
||||
} else {
|
||||
Context::empty()
|
||||
};
|
||||
if has_inscription_reveals {
|
||||
augment_block_with_inscriptions(block, sequence_cursor, cache_l1, &ord_tx, &inner_ctx)
|
||||
augment_block_with_inscriptions(block, sequence_cursor, cache_l1, &ord_tx, ctx)
|
||||
.await?;
|
||||
}
|
||||
augment_block_with_transfers(block, &ord_tx, &inner_ctx).await?;
|
||||
augment_block_with_transfers(block, &ord_tx, ctx).await?;
|
||||
|
||||
// Write data
|
||||
ordinals_pg::insert_block(block, &ord_tx).await?;
|
||||
@@ -294,15 +279,6 @@ pub async fn rollback_block(
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
try_info!(ctx, "Rolling back block #{block_height}");
|
||||
// Drop from blocks DB.
|
||||
let blocks_db = open_blocks_db_with_retry(true, &config, ctx);
|
||||
blocks::delete_blocks_in_block_range(
|
||||
block_height as u32,
|
||||
block_height as u32,
|
||||
&blocks_db,
|
||||
&ctx,
|
||||
);
|
||||
// Drop from postgres.
|
||||
{
|
||||
let mut ord_client = pg_pool_client(&pg_pools.ordinals).await?;
|
||||
let ord_tx = pg_begin(&mut ord_client).await?;
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::core::meta_protocols::brc20::cache::{brc20_new_cache, Brc20MemoryCach
|
||||
use crate::core::pipeline::bitcoind_download_blocks;
|
||||
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
|
||||
use crate::core::pipeline::processors::inscription_indexing::{
|
||||
process_block, rollback_block, start_inscription_indexing_processor,
|
||||
index_block, rollback_block, start_inscription_indexing_processor,
|
||||
};
|
||||
use crate::core::protocol::sequence_cursor::SequenceCursor;
|
||||
use crate::core::{
|
||||
@@ -11,7 +11,7 @@ use crate::core::{
|
||||
should_sync_rocks_db,
|
||||
};
|
||||
use crate::db::blocks::{
|
||||
find_missing_blocks, insert_entry_in_blocks, open_blocks_db_with_retry, run_compaction,
|
||||
self, find_missing_blocks, open_blocks_db_with_retry, run_compaction,
|
||||
};
|
||||
use crate::db::cursor::{BlockBytesCursor, TransactionBytesCursor};
|
||||
use crate::db::ordinals_pg;
|
||||
@@ -22,8 +22,8 @@ use chainhook_postgres::{pg_begin, pg_pool, pg_pool_client};
|
||||
use chainhook_sdk::observer::{
|
||||
start_event_observer, BitcoinBlockDataCached, ObserverEvent, ObserverSidecar,
|
||||
};
|
||||
use chainhook_types::BlockIdentifier;
|
||||
use chainhook_sdk::utils::{BlockHeights, Context};
|
||||
use chainhook_types::BlockIdentifier;
|
||||
use crossbeam_channel::select;
|
||||
use dashmap::DashMap;
|
||||
use deadpool_postgres::Pool;
|
||||
@@ -303,7 +303,6 @@ impl Service {
|
||||
&self.config,
|
||||
&self.pg_pools,
|
||||
&self.ctx,
|
||||
None,
|
||||
&self.prometheus,
|
||||
);
|
||||
try_info!(
|
||||
@@ -332,7 +331,7 @@ impl Service {
|
||||
|
||||
pub async fn chainhook_sidecar_mutate_blocks(
|
||||
blocks_to_mutate: &mut Vec<BitcoinBlockDataCached>,
|
||||
blocks_ids_to_rollback: &Vec<BlockIdentifier>,
|
||||
block_ids_to_rollback: &Vec<BlockIdentifier>,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
|
||||
brc20_cache: &mut Option<Brc20MemoryCache>,
|
||||
prometheus: &PrometheusMonitoring,
|
||||
@@ -340,55 +339,64 @@ pub async fn chainhook_sidecar_mutate_blocks(
|
||||
pg_pools: &PgConnectionPools,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
|
||||
|
||||
for block_id_to_rollback in blocks_ids_to_rollback.iter() {
|
||||
rollback_block(block_id_to_rollback.index, config, pg_pools, ctx).await?;
|
||||
}
|
||||
|
||||
for cache in blocks_to_mutate.iter_mut() {
|
||||
let block_bytes = match BlockBytesCursor::from_standardized_block(&cache.block) {
|
||||
Ok(block_bytes) => block_bytes,
|
||||
Err(e) => {
|
||||
try_error!(
|
||||
ctx,
|
||||
"Unable to compress block #{}: #{}",
|
||||
cache.block.block_identifier.index,
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
insert_entry_in_blocks(
|
||||
cache.block.block_identifier.index as u32,
|
||||
&block_bytes,
|
||||
true,
|
||||
&blocks_db_rw,
|
||||
&ctx,
|
||||
);
|
||||
if block_ids_to_rollback.len() > 0 {
|
||||
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
|
||||
for block_id in block_ids_to_rollback.iter() {
|
||||
blocks::delete_blocks_in_block_range(
|
||||
block_id.index as u32,
|
||||
block_id.index as u32,
|
||||
&blocks_db_rw,
|
||||
&ctx,
|
||||
);
|
||||
rollback_block(block_id.index, config, pg_pools, ctx).await?;
|
||||
}
|
||||
blocks_db_rw
|
||||
.flush()
|
||||
.map_err(|e| format!("error inserting block to rocksdb: {e}"))?;
|
||||
.map_err(|e| format!("error dropping rollback blocks from rocksdb: {e}"))?;
|
||||
}
|
||||
|
||||
if !cache.processed_by_sidecar {
|
||||
let mut cache_l1 = BTreeMap::new();
|
||||
let mut sequence_cursor = SequenceCursor::new();
|
||||
process_block(
|
||||
&mut cache.block,
|
||||
&vec![],
|
||||
&mut sequence_cursor,
|
||||
&mut cache_l1,
|
||||
&cache_l2,
|
||||
brc20_cache.as_mut(),
|
||||
prometheus,
|
||||
&config,
|
||||
pg_pools,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
cache.processed_by_sidecar = true;
|
||||
for cached_block in blocks_to_mutate.iter_mut() {
|
||||
if cached_block.processed_by_sidecar {
|
||||
continue;
|
||||
}
|
||||
let block_bytes = match BlockBytesCursor::from_standardized_block(&cached_block.block) {
|
||||
Ok(block_bytes) => block_bytes,
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"Unable to compress block #{}: #{e}",
|
||||
cached_block.block.block_identifier.index
|
||||
));
|
||||
}
|
||||
};
|
||||
{
|
||||
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
|
||||
blocks::insert_entry_in_blocks(
|
||||
cached_block.block.block_identifier.index as u32,
|
||||
&block_bytes,
|
||||
true,
|
||||
&blocks_db_rw,
|
||||
&ctx,
|
||||
);
|
||||
blocks_db_rw
|
||||
.flush()
|
||||
.map_err(|e| format!("error inserting block to rocksdb: {e}"))?;
|
||||
}
|
||||
let mut cache_l1 = BTreeMap::new();
|
||||
let mut sequence_cursor = SequenceCursor::new();
|
||||
index_block(
|
||||
&mut cached_block.block,
|
||||
&vec![],
|
||||
&mut sequence_cursor,
|
||||
&mut cache_l1,
|
||||
&cache_l2,
|
||||
brc20_cache.as_mut(),
|
||||
prometheus,
|
||||
&config,
|
||||
pg_pools,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
cached_block.processed_by_sidecar = true;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user