feat: prototype warmup

This commit is contained in:
Ludo Galabru
2023-07-24 22:51:08 -04:00
parent 46832118d3
commit fa6c86fb1f
4 changed files with 743 additions and 40 deletions

View File

@@ -1,6 +1,7 @@
use crate::archive::download_ordinals_dataset_if_required;
use crate::config::generator::generate_config;
use crate::config::Config;
use crate::hord::ordinals::start_ordinals_number_processor;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::service::Service;
@@ -155,15 +156,18 @@ struct ScanInscriptionCommand {
#[derive(Subcommand, PartialEq, Clone, Debug)]
enum RepairCommand {
/// Rewrite blocks hord db
#[clap(name = "blocks", bin_name = "blocks")]
Blocks(RepairBlocksCommand),
#[clap(name = "rocksdb", bin_name = "rocksdb")]
Rocksdb(RepairStorageCommand),
/// Rewrite blocks hord db
#[clap(name = "sqlite", bin_name = "sqlite")]
Sqlite(RepairStorageCommand),
/// Rewrite blocks hord db
#[clap(name = "transfers", bin_name = "transfers")]
Transfers(RepairTransfersCommand),
}
#[derive(Parser, PartialEq, Clone, Debug)]
struct RepairBlocksCommand {
struct RepairStorageCommand {
/// Starting block
pub start_block: u64,
/// Starting block
@@ -655,7 +659,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
&transaction_identifier,
0,
0,
Arc::new(traversals_cache),
&Arc::new(traversals_cache),
&ctx,
)?;
info!(
@@ -721,12 +725,39 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
}
}
Command::Repair(subcmd) => match subcmd {
RepairCommand::Blocks(cmd) => {
RepairCommand::Rocksdb(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;
rebuild_rocks_db(&config, cmd.start_block, cmd.end_block, &ctx).await?
rebuild_rocks_db(
&config,
cmd.start_block,
cmd.end_block,
hord_config.first_inscription_height,
None,
&ctx,
)
.await?
}
RepairCommand::Sqlite(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;
let (tx, handle) = start_ordinals_number_processor(&config, ctx);
rebuild_rocks_db(
&config,
cmd.start_block,
cmd.end_block,
hord_config.first_inscription_height,
Some(tx),
&ctx,
)
.await?;
let _ = handle.join();
}
RepairCommand::Transfers(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;

View File

@@ -6,12 +6,13 @@ use std::{
};
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::blockdata::block,
indexer::bitcoin::{
build_http_client, download_block, download_block_with_retry, parse_downloaded_block,
parse_fetched_block, retrieve_block_hash_with_retry, try_download_block_bytes_with_retry,
},
types::{
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData,
BitcoinBlockData, BitcoinNetwork, BlockIdentifier, OrdinalInscriptionRevealData,
OrdinalInscriptionTransferData, TransactionIdentifier,
},
};
@@ -31,7 +32,10 @@ use chainhook_sdk::{
indexer::bitcoin::BitcoinBlockFullBreakdown, observer::BitcoinConfig, utils::Context,
};
use crate::hord::{new_traversals_lazy_cache, update_hord_db_and_augment_bitcoin_block};
use crate::hord::{
new_traversals_lazy_cache, update_hord_db_and_augment_bitcoin_block,
update_hord_db_and_augment_bitcoin_block_v3,
};
use crate::ord::{height::Height, sat::Sat};
use crate::{
config::Config,
@@ -217,7 +221,7 @@ fn rocks_db_default_options() -> rocksdb::Options {
// we recommend setting max_open_files to -1, which means infinity.
// This option will preload all filter and index blocks and will not need to maintain LRU of files.
// Setting max_open_files to -1 will get you the best possible performance.
opts.set_max_open_files(2048);
opts.set_max_open_files(4096);
opts
}
@@ -1223,7 +1227,299 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
transaction_identifier: &TransactionIdentifier,
inscription_input_index: usize,
inscription_number: i64,
traversals_cache: Arc<
traversals_cache: &Arc<
DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>,
>,
ctx: &Context,
) -> Result<TraversalResult, String> {
let mut inscription_offset_intra_output = 0;
let mut inscription_output_index: usize = 0;
let mut ordinal_offset = 0;
let mut ordinal_block_number = block_identifier.index as u32;
let txid = transaction_identifier.get_8_hash_bytes();
let mut blocks_db = open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
let (sats_ranges, inscription_offset_cross_outputs) = match traversals_cache
.get(&(block_identifier.index as u32, txid.clone()))
{
Some(entry) => {
let tx = entry.value();
(
tx.get_sat_ranges(),
tx.get_cumulated_sats_in_until_input_index(inscription_input_index),
)
}
None => {
let mut attempt = 0;
loop {
match find_lazy_block_at_block_height(
ordinal_block_number,
3,
false,
&blocks_db,
&ctx,
) {
None => {
if attempt < 3 {
attempt += 1;
blocks_db =
open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
} else {
return Err(format!("block #{ordinal_block_number} not in database"));
}
}
Some(block) => match block.find_and_serialize_transaction_with_txid(&txid) {
Some(tx) => {
let sats_ranges = tx.get_sat_ranges();
let inscription_offset_cross_outputs =
tx.get_cumulated_sats_in_until_input_index(inscription_input_index);
traversals_cache.insert((ordinal_block_number, txid.clone()), tx);
break (sats_ranges, inscription_offset_cross_outputs);
}
None => return Err(format!("txid not in block #{ordinal_block_number}")),
},
}
}
}
};
for (i, (min, max)) in sats_ranges.into_iter().enumerate() {
if inscription_offset_cross_outputs >= min && inscription_offset_cross_outputs < max {
inscription_output_index = i;
inscription_offset_intra_output = inscription_offset_cross_outputs - min;
}
}
ctx.try_log(|logger| {
slog::info!(
logger,
"Computing ordinal number for Satoshi point {} ({}:0 -> {}:{}/{}) (block #{})",
transaction_identifier.hash,
inscription_input_index,
inscription_output_index,
inscription_offset_intra_output,
inscription_offset_cross_outputs,
block_identifier.index
)
});
let mut tx_cursor: ([u8; 8], usize) = (txid, inscription_input_index);
let mut hops: u32 = 0;
loop {
hops += 1;
if hops as u64 > block_identifier.index {
return Err(format!(
"Unable to process transaction {} detected after {hops} iterations. Manual investigation required",
transaction_identifier.hash
));
}
if let Some(cached_tx) = traversals_cache.get(&(ordinal_block_number, tx_cursor.0)) {
let tx = cached_tx.value();
let mut next_found_in_cache = false;
let mut sats_out = 0;
for (index, output_value) in tx.outputs.iter().enumerate() {
if index == tx_cursor.1 {
break;
}
sats_out += output_value;
}
sats_out += ordinal_offset;
let mut sats_in = 0;
for input in tx.inputs.iter() {
sats_in += input.txin_value;
if sats_out < sats_in {
ordinal_offset = sats_out - (sats_in - input.txin_value);
ordinal_block_number = input.block_height;
tx_cursor = (input.txin.clone(), input.vout as usize);
next_found_in_cache = true;
break;
}
}
if next_found_in_cache {
continue;
}
if sats_in == 0 {
ctx.try_log(|logger| {
slog::error!(
logger,
"Transaction {} is originating from a non spending transaction",
transaction_identifier.hash
)
});
return Ok(TraversalResult {
inscription_number: 0,
ordinal_number: 0,
transfers: 0,
inscription_input_index,
transaction_identifier_inscription: transaction_identifier.clone(),
transfer_data: TransferData {
inscription_offset_intra_output,
transaction_identifier_location: transaction_identifier.clone(),
output_index: inscription_output_index,
tx_index: 0,
},
});
}
}
let lazy_block = {
let mut attempt = 0;
loop {
match find_lazy_block_at_block_height(
ordinal_block_number,
3,
false,
&blocks_db,
&ctx,
) {
Some(block) => break block,
None => {
if attempt < 3 {
attempt += 1;
blocks_db =
open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
} else {
return Err(format!("block #{ordinal_block_number} not in database"));
}
}
}
}
};
let coinbase_txid = lazy_block.get_coinbase_txid();
let txid = tx_cursor.0;
// evaluate exit condition: did we reach the **final** coinbase transaction
if coinbase_txid.eq(&txid) {
let subsidy = Height(ordinal_block_number.into()).subsidy();
if ordinal_offset < subsidy {
// Great!
break;
}
// loop over the transaction fees to detect the right range
let mut accumulated_fees = subsidy;
for tx in lazy_block.iter_tx() {
let mut total_in = 0;
for input in tx.inputs.iter() {
total_in += input.txin_value;
}
let mut total_out = 0;
for output_value in tx.outputs.iter() {
total_out += output_value;
}
let fee = total_in - total_out;
if accumulated_fees + fee > ordinal_offset {
// We are looking at the right transaction
// Retraverse the inputs to select the index to be picked
let offset_within_fee = ordinal_offset - accumulated_fees;
total_out += offset_within_fee;
let mut sats_in = 0;
for input in tx.inputs.into_iter() {
sats_in += input.txin_value;
if sats_in > total_out {
ordinal_offset = total_out - (sats_in - input.txin_value);
ordinal_block_number = input.block_height;
tx_cursor = (input.txin.clone(), input.vout as usize);
break;
}
}
break;
} else {
accumulated_fees += fee;
}
}
} else {
// isolate the target transaction
let lazy_tx = match lazy_block.find_and_serialize_transaction_with_txid(&txid) {
Some(entry) => entry,
None => unreachable!(),
};
let mut sats_out = 0;
for (index, output_value) in lazy_tx.outputs.iter().enumerate() {
if index == tx_cursor.1 {
break;
}
sats_out += output_value;
}
sats_out += ordinal_offset;
let mut sats_in = 0;
for input in lazy_tx.inputs.iter() {
sats_in += input.txin_value;
if sats_out < sats_in {
traversals_cache.insert((ordinal_block_number, tx_cursor.0), lazy_tx.clone());
ordinal_offset = sats_out - (sats_in - input.txin_value);
ordinal_block_number = input.block_height;
tx_cursor = (input.txin.clone(), input.vout as usize);
break;
}
}
if sats_in == 0 {
ctx.try_log(|logger| {
slog::error!(
logger,
"Transaction {} is originating from a non spending transaction",
transaction_identifier.hash
)
});
return Ok(TraversalResult {
inscription_number: 0,
ordinal_number: 0,
transfers: 0,
inscription_input_index,
transaction_identifier_inscription: transaction_identifier.clone(),
transfer_data: TransferData {
inscription_offset_intra_output,
transaction_identifier_location: transaction_identifier.clone(),
output_index: inscription_output_index,
tx_index: 0,
},
});
}
}
}
let height = Height(ordinal_block_number.into());
let ordinal_number = height.starting_sat().0 + ordinal_offset + inscription_offset_intra_output;
Ok(TraversalResult {
inscription_number,
ordinal_number,
transfers: hops,
inscription_input_index,
transaction_identifier_inscription: transaction_identifier.clone(),
transfer_data: TransferData {
inscription_offset_intra_output,
transaction_identifier_location: transaction_identifier.clone(),
output_index: inscription_output_index,
tx_index: 0,
},
})
}
pub fn retrieve_satoshi_point_using_lazy_storage_v3(
blocks_db_dir: &PathBuf,
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
inscription_input_index: usize,
inscription_number: i64,
traversals_cache: &Arc<
DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>,
>,
ctx: &Context,
@@ -1858,10 +2154,72 @@ impl<'a> Iterator for LazyBlockTransactionIterator<'a> {
}
}
pub fn process_blocks(
mut raw_blocks: Vec<BitcoinBlockFullBreakdown>,
bitcoin_network: BitcoinNetwork,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
inscriptions_db_conn_rw: &mut Connection,
hord_config: &HordConfig,
ctx: &Context,
) {
let mut next_blocks = vec![];
let mut cache_l1 = HashMap::new();
for raw_block in raw_blocks.drain(..) {
let block =
match hord::parse_ordinals_and_standardize_block(raw_block, &bitcoin_network, &ctx) {
Ok(block) => block,
Err((e, _)) => {
ctx.try_log(|logger| {
slog::error!(logger, "Unable to standardize bitcoin block: {e}",)
});
// TODO(lgalabru): panic?
continue;
}
};
next_blocks.push(block);
}
for _cursor in 0..next_blocks.len() {
let mut block = next_blocks.remove(0);
let _ = process_block(
&mut block,
&next_blocks,
&mut cache_l1,
cache_l2,
inscriptions_db_conn_rw,
hord_config,
ctx,
);
}
}
pub fn process_block(
block: &mut BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
inscriptions_db_conn_rw: &mut Connection,
hord_config: &HordConfig,
ctx: &Context,
) -> Result<(), String> {
update_hord_db_and_augment_bitcoin_block_v3(
block,
next_blocks,
cache_l1,
cache_l2,
inscriptions_db_conn_rw,
hord_config,
ctx,
)
}
pub async fn rebuild_rocks_db(
config: &Config,
start_block: u64,
end_block: u64,
start_sequencing_blocks_at_height: u64,
blocks_post_processor: Option<Sender<Vec<BitcoinBlockFullBreakdown>>>,
ctx: &Context,
) -> Result<(), String> {
// let guard = pprof::ProfilerGuardBuilder::default()
@@ -1880,10 +2238,6 @@ pub async fn rebuild_rocks_db(
let hord_config = config.get_hord_config();
ctx.try_log(|logger| {
slog::info!(logger, "Generating report");
});
let number_of_blocks_to_process = end_block - start_block + 1;
let (block_req_lim, block_process_lim) = (128, 128);
@@ -1938,12 +2292,14 @@ pub async fn rebuild_rocks_db(
let blocks_db_rw = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let cloned_ctx = ctx.clone();
let ingestion_thread = hiro_system_kit::thread_named("Block data ingestion")
let _storage_thread = hiro_system_kit::thread_named("Block data ingestion")
.spawn(move || {
let mut blocks_stored = 0;
let mut num_writes = 0;
while let Ok(Some((block_height, compacted_block, _raw_block))) =
let mut inbox = HashMap::new();
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
while let Ok(Some((block_height, compacted_block, raw_block))) =
block_compressed_rx.recv()
{
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &cloned_ctx);
@@ -1953,7 +2309,25 @@ pub async fn rebuild_rocks_db(
// In the context of ordinals, we're constrained to process blocks sequentially
// Blocks are processed by a threadpool and could be coming out of order.
// Inbox block for later if the current block is not the one we should be
// processing.
// processing.
if block_height as u64 >= start_sequencing_blocks_at_height {
inbox.insert(block_height as u64, raw_block);
let mut chunk = Vec::new();
while let Some(next_block) = inbox.remove(&inbox_cursor) {
cloned_ctx.try_log(|logger| {
slog::info!(
logger,
"Dequeuing block #{inbox_cursor} for processing (# blocks inboxed: {})",
inbox.len()
)
});
chunk.push(next_block);
inbox_cursor += 1;
}
if let Some(ref tx) = blocks_post_processor {
let _ = tx.send(chunk);
}
}
// Should we start look for inscriptions data in blocks?
cloned_ctx.try_log(|logger| {

View File

@@ -35,6 +35,7 @@ use crate::db::{
find_inscription_with_ordinal_number, find_inscriptions_at_wached_outpoint,
get_any_entry_in_ordinal_activities, insert_entry_in_blocks, insert_entry_in_inscriptions,
insert_transfer_in_locations_tx, retrieve_satoshi_point_using_lazy_storage,
retrieve_satoshi_point_using_lazy_storage_v3,
};
use crate::ord::height::Height;
@@ -321,7 +322,7 @@ pub fn retrieve_inscribed_satoshi_points_from_block(
&transaction_id,
input_index,
0,
local_cache,
&local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send(traversal);
@@ -1095,6 +1096,279 @@ fn test_ordinal_inscription_parsing() {
println!("{:?}", inscription);
}
pub fn get_transactions_to_process(
block: &BitcoinBlockData,
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
inscriptions_db_conn: &mut Connection,
ctx: &Context,
) -> Vec<(TransactionIdentifier, usize)> {
let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![];
for tx in block.transactions.iter().skip(1) {
// Have a new inscription been revealed, if so, are looking at a re-inscription
for ordinal_event in tx.metadata.ordinal_operations.iter() {
let (inscription_data, _is_cursed) = match ordinal_event {
OrdinalOperation::InscriptionRevealed(inscription_data) => {
(inscription_data, false)
}
OrdinalOperation::CursedInscriptionRevealed(inscription_data) => {
(inscription_data, false)
}
OrdinalOperation::InscriptionTransferred(_) => {
continue;
}
};
if cache_l1.contains_key(&(
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
)) {
continue;
}
if let Ok(Some((traversal, _))) = find_inscription_with_id(
&inscription_data.inscription_id,
&inscriptions_db_conn,
ctx,
) {
cache_l1.insert(
(
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
),
traversal,
);
} else {
// Enqueue for traversals
transactions_ids.push((
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
));
}
}
}
transactions_ids
}
pub fn retrieve_inscribed_satoshi_points_from_block_v3(
block: &BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
inscriptions_db_conn: &mut Connection,
hord_config: &HordConfig,
ctx: &Context,
) -> Result<(), String> {
let mut transactions_ids =
get_transactions_to_process(block, cache_l1, inscriptions_db_conn, ctx);
if !transactions_ids.is_empty() {
let expected_traversals = transactions_ids.len();
let (traversal_tx, traversal_rx) = channel();
let traversal_data_pool = ThreadPool::new(hord_config.ingestion_thread_max);
let mut tx_thread_pool = vec![];
for thread_index in 0..hord_config.ingestion_thread_max {
let (tx, rx) = channel();
tx_thread_pool.push(tx);
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = ctx.clone();
let block_identifier = block.block_identifier.clone();
let moved_hord_db_path = hord_config.db_path.clone();
let local_cache = cache_l2.clone();
traversal_data_pool.execute(move || {
while let Ok(Some((transaction_id, input_index, prioritary))) = rx.recv() {
let traversal: Result<TraversalResult, String> =
retrieve_satoshi_point_using_lazy_storage_v3(
&moved_hord_db_path,
&block_identifier,
&transaction_id,
input_index,
0,
&local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
}
});
}
let mut rng = thread_rng();
transactions_ids.shuffle(&mut rng);
let mut priority_queue = VecDeque::new();
let mut warmup_queue = VecDeque::new();
for (transaction_id, input_index) in transactions_ids.into_iter() {
priority_queue.push_back((transaction_id, input_index, true));
}
// Feed each workers with 2 workitems each
for thread_index in 0..hord_config.ingestion_thread_max {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
for thread_index in 0..hord_config.ingestion_thread_max {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
let mut next_block_iter = next_blocks.iter();
let mut traversals_received = 0;
while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() {
if prioritary {
traversals_received += 1;
}
match traversal_result {
Ok(traversal) => {
ctx.try_log(|logger| {
slog::info!(
logger,
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}).",
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
)
});
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
),
traversal,
);
}
Err(e) => {
ctx.try_log(|logger| {
slog::error!(logger, "Unable to compute inscription's Satoshi: {e}",)
});
}
}
if traversals_received == expected_traversals {
break;
}
if let Some(w) = priority_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(w) = warmup_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(block) = next_block_iter.next() {
let mut transactions_ids =
get_transactions_to_process(block, cache_l1, inscriptions_db_conn, ctx);
transactions_ids.shuffle(&mut rng);
for (transaction_id, input_index) in transactions_ids.into_iter() {
warmup_queue.push_back((transaction_id, input_index, false));
}
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
}
}
}
}
for thread_index in 0..hord_config.ingestion_thread_max {
let _ = tx_thread_pool[thread_index].send(None);
}
let _ = hiro_system_kit::thread_named("Block data compression").spawn(move || {
let _ = traversal_data_pool.join();
});
}
Ok(())
}
pub fn update_hord_db_and_augment_bitcoin_block_v3(
new_block: &mut BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
inscriptions_db_conn_rw: &mut Connection,
hord_config: &HordConfig,
ctx: &Context,
) -> Result<(), String> {
let _ = retrieve_inscribed_satoshi_points_from_block_v3(
&new_block,
&next_blocks,
cache_l1,
cache_l2,
inscriptions_db_conn_rw,
&hord_config,
ctx,
);
let discard_changes: bool = get_any_entry_in_ordinal_activities(
&new_block.block_identifier.index,
inscriptions_db_conn_rw,
ctx,
);
let inner_ctx = if discard_changes {
Context::empty()
} else {
ctx.clone()
};
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
let any_inscription_revealed =
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
new_block,
&transaction,
&cache_l1,
&inner_ctx,
)?;
// Have inscriptions been transfered?
let any_inscription_transferred =
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
new_block,
&transaction,
&inner_ctx,
)?;
if !any_inscription_revealed && !any_inscription_transferred {
return Ok(());
}
if discard_changes {
ctx.try_log(|logger| {
slog::info!(
logger,
"Ignoring updates for block #{}, activities present in database",
new_block.block_identifier.index,
)
});
} else {
ctx.try_log(|logger| {
slog::info!(
logger,
"Saving updates for block {}",
new_block.block_identifier.index,
)
});
transaction.commit().unwrap();
ctx.try_log(|logger| {
slog::info!(
logger,
"Updates saved for block {}",
new_block.block_identifier.index,
)
});
}
let inscriptions_revealed = get_inscriptions_revealed_in_block(&new_block)
.iter()
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();
ctx.try_log(|logger| {
slog::info!(
logger,
"Block #{} processed through hord, revealing {} inscriptions [{}]",
new_block.block_identifier.index,
inscriptions_revealed.len(),
inscriptions_revealed.join(", ")
)
});
Ok(())
}
pub fn update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
block: &mut BitcoinBlockData,
hord_db_tx: &Transaction,
@@ -1260,7 +1534,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_t
pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
block: &mut BitcoinBlockData,
transaction: &Transaction,
traversals: &HashMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l1: &HashMap<(TransactionIdentifier, usize), TraversalResult>,
ctx: &Context,
) -> Result<bool, String> {
let mut storage_updated = false;
@@ -1309,7 +1583,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
};
let transaction_identifier = new_tx.transaction_identifier.clone();
let traversal = match traversals
let traversal = match cache_l1
.get(&(transaction_identifier, inscription.inscription_input_index))
{
Some(traversal) => traversal,

View File

@@ -1,30 +1,54 @@
use std::sync::mpsc::{channel, Sender};
use std::{
sync::{
mpsc::{channel, Sender},
Arc,
},
thread::JoinHandle,
};
use super::HordConfig;
use chainhook_sdk::{indexer::bitcoin::BitcoinBlockFullBreakdown, utils::Context};
pub struct Batch {
pub jobs: Vec<Job>,
pub jobs_count: u32,
}
use crate::{
config::Config,
db::{open_readwrite_hord_db_conn, process_blocks},
};
pub struct Job {
pub inputs: JobInputs,
pub result: Result<JobResult, String>,
}
use super::new_traversals_lazy_cache;
pub struct JobInputs {}
pub struct JobResult {}
pub fn start_ordinals_number_processor(_config: &HordConfig) -> Sender<Batch> {
pub fn start_ordinals_number_processor(
config: &Config,
ctx: &Context,
) -> (Sender<Vec<BitcoinBlockFullBreakdown>>, JoinHandle<()>) {
// let mut batches = HashMap::new();
let (tx, _rx) = channel();
let (tx, rx) = channel::<Vec<BitcoinBlockFullBreakdown>>();
// let (inner_tx, inner_rx) = channel();
hiro_system_kit::thread_named("Batch receiver")
.spawn(move || {})
let config = config.clone();
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Batch receiver")
.spawn(move || {
let cache_l2 = Arc::new(new_traversals_lazy_cache(1024));
let mut inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
let hord_config = config.get_hord_config();
while let Ok(raw_blocks) = rx.recv() {
info!(
ctx.expect_logger(),
"Processing {} blocks",
raw_blocks.len()
);
process_blocks(
raw_blocks,
config.network.bitcoin_network.clone(),
&cache_l2,
&mut inscriptions_db_conn_rw,
&hord_config,
&ctx,
)
}
})
.expect("unable to spawn thread");
tx
(tx, handle)
}