feat: use caching on streamed blocks

This commit is contained in:
Ludo Galabru
2023-05-10 08:25:29 -04:00
parent b567322859
commit 784e9a0830

View File

@@ -596,6 +596,7 @@ pub async fn start_observer_commands_handler(
let mut chainhooks_lookup: HashMap<String, ApiKey> = HashMap::new();
let networks = (&config.bitcoin_network, &config.stacks_network);
let mut bitcoin_block_store: HashMap<BlockIdentifier, BitcoinBlockData> = HashMap::new();
let traversals_cache = Arc::new(new_traversals_cache());
loop {
let command = match observer_commands_rx.recv() {
@@ -642,12 +643,12 @@ pub async fn start_observer_commands_handler(
ctx.try_log(|logger| {
slog::info!(logger, "Handling PropagateBitcoinChainEvent command")
});
let mut confirmed_blocks = vec![];
// Update Chain event before propagation
let chain_event = match blockchain_event {
BlockchainEvent::BlockchainUpdatedWithHeaders(data) => {
let mut new_blocks = vec![];
let mut confirmed_blocks = vec![];
#[cfg(feature = "ordinals")]
let blocks_db = match open_readwrite_hord_db_conn_rocks_db(
@@ -697,7 +698,6 @@ pub async fn start_observer_commands_handler(
Some(block) => {
#[cfg(feature = "ordinals")]
{
let traversals_cache = Arc::new(new_traversals_cache());
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block,
&blocks_db,
@@ -750,14 +750,13 @@ pub async fn start_observer_commands_handler(
BitcoinChainEvent::ChainUpdatedWithBlocks(
BitcoinChainUpdatedWithBlocksData {
new_blocks,
confirmed_blocks,
confirmed_blocks: confirmed_blocks.clone(),
},
)
}
BlockchainEvent::BlockchainUpdatedWithReorg(data) => {
let mut blocks_to_apply = vec![];
let mut blocks_to_rollback = vec![];
let mut confirmed_blocks = vec![];
let blocks_ids_to_rollback = data
.headers_to_rollback
@@ -774,6 +773,15 @@ pub async fn start_observer_commands_handler(
slog::info!(logger, "Bitcoin reorg detected, will rollback blocks {} and apply blocks {}", blocks_ids_to_rollback.join(", "), blocks_ids_to_apply.join(", "))
});
ctx.try_log(|logger| {
slog::info!(
logger,
"Flushing traversals_cache ({} entries)",
traversals_cache.len()
)
});
traversals_cache.clear();
#[cfg(feature = "ordinals")]
let blocks_db = match open_readwrite_hord_db_conn_rocks_db(
&config.get_cache_path_buf(),
@@ -854,7 +862,6 @@ pub async fn start_observer_commands_handler(
Some(block) => {
#[cfg(feature = "ordinals")]
{
let traversals_cache = Arc::new(new_traversals_cache());
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block,
&blocks_db,
@@ -906,7 +913,7 @@ pub async fn start_observer_commands_handler(
BitcoinChainEvent::ChainUpdatedWithReorg(BitcoinChainUpdatedWithReorgData {
blocks_to_apply,
blocks_to_rollback,
confirmed_blocks,
confirmed_blocks: confirmed_blocks.clone(),
})
}
};
@@ -1064,6 +1071,19 @@ pub async fn start_observer_commands_handler(
let _ = send_request(request, 3, 1, &ctx).await;
}
for block in confirmed_blocks.into_iter() {
if block.block_identifier.index % 24 == 0 {
ctx.try_log(|logger| {
slog::info!(
logger,
"Flushing traversals_cache ({} entries)",
traversals_cache.len()
)
});
traversals_cache.clear();
}
}
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::BitcoinChainEvent(chain_event));
}