mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-13 16:19:01 +08:00
feat: share traversals_cache over 10 blocks spans
This commit is contained in:
@@ -23,7 +23,7 @@ use chainhook_event_observer::hord::db::{
|
||||
open_readwrite_hord_db_conn_rocks_db, retrieve_satoshi_point_using_local_storage,
|
||||
};
|
||||
use chainhook_event_observer::hord::{
|
||||
retrieve_inscribed_satoshi_points_from_block,
|
||||
new_traversals_cache, retrieve_inscribed_satoshi_points_from_block,
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data, Storage,
|
||||
};
|
||||
use chainhook_event_observer::indexer;
|
||||
@@ -633,14 +633,13 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
};
|
||||
|
||||
let transaction_identifier = TransactionIdentifier { hash: txid.clone() };
|
||||
let hasher = FxBuildHasher::default();
|
||||
let cache = Arc::new(DashMap::with_hasher(hasher));
|
||||
let traversals_cache = new_traversals_cache();
|
||||
let traversal = retrieve_satoshi_point_using_local_storage(
|
||||
&hord_db_conn,
|
||||
&block_identifier,
|
||||
&transaction_identifier,
|
||||
0,
|
||||
cache,
|
||||
Arc::new(traversals_cache),
|
||||
&ctx,
|
||||
)?;
|
||||
info!(
|
||||
@@ -655,10 +654,13 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
let block =
|
||||
fetch_and_standardize_block(cmd.block_height, &bitcoin_config, &ctx)
|
||||
.await?;
|
||||
let traversals = retrieve_inscribed_satoshi_points_from_block(
|
||||
let traversals_cache = Arc::new(new_traversals_cache());
|
||||
|
||||
let _traversals = retrieve_inscribed_satoshi_points_from_block(
|
||||
&block,
|
||||
None,
|
||||
&config.expected_cache_path(),
|
||||
&traversals_cache,
|
||||
&ctx,
|
||||
);
|
||||
// info!(
|
||||
|
||||
@@ -27,6 +27,7 @@ use crate::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
new_traversals_cache,
|
||||
ord::{height::Height, sat::Sat},
|
||||
update_hord_db_and_augment_bitcoin_block,
|
||||
};
|
||||
@@ -955,6 +956,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
let mut cursor = start_block as usize;
|
||||
let mut inbox = HashMap::new();
|
||||
let mut num_writes = 0;
|
||||
let traversals_cache = Arc::new(new_traversals_cache());
|
||||
|
||||
while let Ok(Some((block_height, compacted_block, raw_block))) = block_compressed_rx.recv() {
|
||||
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &ctx);
|
||||
@@ -1002,6 +1004,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
&inscriptions_db_conn_rw,
|
||||
false,
|
||||
&hord_db_path,
|
||||
&traversals_cache,
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
@@ -1031,6 +1034,10 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if num_writes % 10 == 0 {
|
||||
traversals_cache.clear();
|
||||
}
|
||||
|
||||
if num_writes % 5000 == 0 {
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(logger, "Flushing DB to disk ({num_writes} inserts)");
|
||||
@@ -1079,7 +1086,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
block_identifier: &BlockIdentifier,
|
||||
transaction_identifier: &TransactionIdentifier,
|
||||
inscription_number: u64,
|
||||
cache: Arc<
|
||||
traversals_cache: Arc<
|
||||
DashMap<
|
||||
(u32, [u8; 8]),
|
||||
(Vec<([u8; 8], u32, u16, u64)>, Vec<u64>),
|
||||
@@ -1116,7 +1123,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(cached_tx) = cache.get(&(ordinal_block_number, tx_cursor.0)) {
|
||||
if let Some(cached_tx) = traversals_cache.get(&(ordinal_block_number, tx_cursor.0)) {
|
||||
let (inputs, outputs) = cached_tx.value();
|
||||
let mut next_found_in_cache = false;
|
||||
|
||||
@@ -1282,7 +1289,8 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
// });
|
||||
|
||||
if sats_out < sats_in {
|
||||
cache.insert((ordinal_block_number, txid_n), (inputs.clone(), outputs));
|
||||
traversals_cache
|
||||
.insert((ordinal_block_number, txid_n), (inputs.clone(), outputs));
|
||||
ordinal_offset = sats_out - (sats_in - txin_value);
|
||||
ordinal_block_number = *block_height;
|
||||
|
||||
|
||||
@@ -9,12 +9,14 @@ use chainhook_types::{
|
||||
OrdinalOperation, TransactionIdentifier,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use fxhash::{FxBuildHasher, FxHasher};
|
||||
use hiro_system_kit::slog;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use rocksdb::DB;
|
||||
use rusqlite::Connection;
|
||||
use std::collections::{BTreeMap, HashMap, VecDeque};
|
||||
use std::hash::BuildHasherDefault;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::Arc;
|
||||
@@ -162,10 +164,24 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn new_traversals_cache(
|
||||
) -> DashMap<(u32, [u8; 8]), (Vec<([u8; 8], u32, u16, u64)>, Vec<u64>), BuildHasherDefault<FxHasher>>
|
||||
{
|
||||
let hasher = FxBuildHasher::default();
|
||||
DashMap::with_hasher(hasher)
|
||||
}
|
||||
|
||||
pub fn retrieve_inscribed_satoshi_points_from_block(
|
||||
block: &BitcoinBlockData,
|
||||
inscriptions_db_conn: Option<&Connection>,
|
||||
hord_db_path: &PathBuf,
|
||||
traversals_cache: &Arc<
|
||||
DashMap<
|
||||
(u32, [u8; 8]),
|
||||
(Vec<([u8; 8], u32, u16, u64)>, Vec<u64>),
|
||||
BuildHasherDefault<FxHasher>,
|
||||
>,
|
||||
>,
|
||||
ctx: &Context,
|
||||
) -> HashMap<TransactionIdentifier, TraversalResult> {
|
||||
let mut transactions_ids = vec![];
|
||||
@@ -202,14 +218,12 @@ pub fn retrieve_inscribed_satoshi_points_from_block(
|
||||
|
||||
let mut rng = thread_rng();
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
let hasher = fxhash::FxBuildHasher::default();
|
||||
let shared_cache = Arc::new(DashMap::with_hasher(hasher));
|
||||
for transaction_id in transactions_ids.into_iter() {
|
||||
let moved_traversal_tx = traversal_tx.clone();
|
||||
let moved_ctx = ctx.clone();
|
||||
let block_identifier = block.block_identifier.clone();
|
||||
let moved_hord_db_path = hord_db_path.clone();
|
||||
let cache = shared_cache.clone();
|
||||
let local_cache = traversals_cache.clone();
|
||||
traversal_data_pool.execute(move || loop {
|
||||
match open_readonly_hord_db_conn_rocks_db(&moved_hord_db_path, &moved_ctx) {
|
||||
Ok(blocks_db) => {
|
||||
@@ -218,7 +232,7 @@ pub fn retrieve_inscribed_satoshi_points_from_block(
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
0,
|
||||
cache,
|
||||
local_cache,
|
||||
&moved_ctx,
|
||||
);
|
||||
let _ = moved_traversal_tx.send((transaction_id, traversal));
|
||||
@@ -262,7 +276,6 @@ pub fn retrieve_inscribed_satoshi_points_from_block(
|
||||
}
|
||||
}
|
||||
let _ = traversal_data_pool.join();
|
||||
std::thread::spawn(move || drop(shared_cache));
|
||||
}
|
||||
|
||||
traversals
|
||||
@@ -274,6 +287,13 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
write_block: bool,
|
||||
hord_db_path: &PathBuf,
|
||||
traversals_cache: &Arc<
|
||||
DashMap<
|
||||
(u32, [u8; 8]),
|
||||
(Vec<([u8; 8], u32, u16, u64)>, Vec<u64>),
|
||||
BuildHasherDefault<FxHasher>,
|
||||
>,
|
||||
>,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
if write_block {
|
||||
@@ -299,6 +319,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
&new_block,
|
||||
Some(inscriptions_db_conn_rw),
|
||||
hord_db_path,
|
||||
traversals_cache,
|
||||
ctx,
|
||||
);
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::chainhooks::types::{
|
||||
ChainhookConfig, ChainhookFullSpecification, ChainhookSpecification,
|
||||
};
|
||||
|
||||
use crate::hord::new_traversals_cache;
|
||||
#[cfg(feature = "ordinals")]
|
||||
use crate::hord::{
|
||||
db::{open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db},
|
||||
@@ -695,21 +696,25 @@ pub async fn start_observer_commands_handler(
|
||||
match bitcoin_block_store.get_mut(&header.block_identifier) {
|
||||
Some(block) => {
|
||||
#[cfg(feature = "ordinals")]
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
block,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
true,
|
||||
&config.get_cache_path_buf(),
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
{
|
||||
let traversals_cache = Arc::new(new_traversals_cache());
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
block,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
true,
|
||||
&config.get_cache_path_buf(),
|
||||
&traversals_cache,
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
logger,
|
||||
"Unable to insert bitcoin block {} in hord_db: {e}",
|
||||
block.block_identifier.index
|
||||
)
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
new_blocks.push(block.clone());
|
||||
}
|
||||
@@ -833,20 +838,24 @@ pub async fn start_observer_commands_handler(
|
||||
match bitcoin_block_store.get_mut(&header.block_identifier) {
|
||||
Some(block) => {
|
||||
#[cfg(feature = "ordinals")]
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
block,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
true,
|
||||
&config.get_cache_path_buf(),
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
logger,
|
||||
"Unable to apply bitcoin block {} with hord_db: {e}", block.block_identifier.index
|
||||
)
|
||||
});
|
||||
{
|
||||
let traversals_cache = Arc::new(new_traversals_cache());
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
block,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
true,
|
||||
&config.get_cache_path_buf(),
|
||||
&traversals_cache,
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
logger,
|
||||
"Unable to apply bitcoin block {} with hord_db: {e}", block.block_identifier.index
|
||||
)
|
||||
});
|
||||
}
|
||||
}
|
||||
blocks_to_apply.push(block.clone());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user