mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 16:52:57 +08:00
feat: prototype warmup
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
use crate::archive::download_ordinals_dataset_if_required;
|
||||
use crate::config::generator::generate_config;
|
||||
use crate::config::Config;
|
||||
use crate::hord::ordinals::start_ordinals_number_processor;
|
||||
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
|
||||
use crate::service::Service;
|
||||
|
||||
@@ -155,15 +156,18 @@ struct ScanInscriptionCommand {
|
||||
#[derive(Subcommand, PartialEq, Clone, Debug)]
|
||||
enum RepairCommand {
|
||||
/// Rewrite blocks hord db
|
||||
#[clap(name = "blocks", bin_name = "blocks")]
|
||||
Blocks(RepairBlocksCommand),
|
||||
#[clap(name = "rocksdb", bin_name = "rocksdb")]
|
||||
Rocksdb(RepairStorageCommand),
|
||||
/// Rewrite blocks hord db
|
||||
#[clap(name = "sqlite", bin_name = "sqlite")]
|
||||
Sqlite(RepairStorageCommand),
|
||||
/// Rewrite blocks hord db
|
||||
#[clap(name = "transfers", bin_name = "transfers")]
|
||||
Transfers(RepairTransfersCommand),
|
||||
}
|
||||
|
||||
#[derive(Parser, PartialEq, Clone, Debug)]
|
||||
struct RepairBlocksCommand {
|
||||
struct RepairStorageCommand {
|
||||
/// Starting block
|
||||
pub start_block: u64,
|
||||
/// Starting block
|
||||
@@ -655,7 +659,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
&transaction_identifier,
|
||||
0,
|
||||
0,
|
||||
Arc::new(traversals_cache),
|
||||
&Arc::new(traversals_cache),
|
||||
&ctx,
|
||||
)?;
|
||||
info!(
|
||||
@@ -721,12 +725,39 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
}
|
||||
}
|
||||
Command::Repair(subcmd) => match subcmd {
|
||||
RepairCommand::Blocks(cmd) => {
|
||||
RepairCommand::Rocksdb(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
let mut hord_config = config.get_hord_config();
|
||||
hord_config.network_thread_max = cmd.network_threads;
|
||||
|
||||
rebuild_rocks_db(&config, cmd.start_block, cmd.end_block, &ctx).await?
|
||||
rebuild_rocks_db(
|
||||
&config,
|
||||
cmd.start_block,
|
||||
cmd.end_block,
|
||||
hord_config.first_inscription_height,
|
||||
None,
|
||||
&ctx,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
RepairCommand::Sqlite(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
let mut hord_config = config.get_hord_config();
|
||||
hord_config.network_thread_max = cmd.network_threads;
|
||||
|
||||
let (tx, handle) = start_ordinals_number_processor(&config, ctx);
|
||||
|
||||
rebuild_rocks_db(
|
||||
&config,
|
||||
cmd.start_block,
|
||||
cmd.end_block,
|
||||
hord_config.first_inscription_height,
|
||||
Some(tx),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let _ = handle.join();
|
||||
}
|
||||
RepairCommand::Transfers(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
|
||||
@@ -6,12 +6,13 @@ use std::{
|
||||
};
|
||||
|
||||
use chainhook_sdk::{
|
||||
bitcoincore_rpc_json::bitcoin::blockdata::block,
|
||||
indexer::bitcoin::{
|
||||
build_http_client, download_block, download_block_with_retry, parse_downloaded_block,
|
||||
parse_fetched_block, retrieve_block_hash_with_retry, try_download_block_bytes_with_retry,
|
||||
},
|
||||
types::{
|
||||
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData,
|
||||
BitcoinBlockData, BitcoinNetwork, BlockIdentifier, OrdinalInscriptionRevealData,
|
||||
OrdinalInscriptionTransferData, TransactionIdentifier,
|
||||
},
|
||||
};
|
||||
@@ -31,7 +32,10 @@ use chainhook_sdk::{
|
||||
indexer::bitcoin::BitcoinBlockFullBreakdown, observer::BitcoinConfig, utils::Context,
|
||||
};
|
||||
|
||||
use crate::hord::{new_traversals_lazy_cache, update_hord_db_and_augment_bitcoin_block};
|
||||
use crate::hord::{
|
||||
new_traversals_lazy_cache, update_hord_db_and_augment_bitcoin_block,
|
||||
update_hord_db_and_augment_bitcoin_block_v3,
|
||||
};
|
||||
use crate::ord::{height::Height, sat::Sat};
|
||||
use crate::{
|
||||
config::Config,
|
||||
@@ -217,7 +221,7 @@ fn rocks_db_default_options() -> rocksdb::Options {
|
||||
// we recommend setting max_open_files to -1, which means infinity.
|
||||
// This option will preload all filter and index blocks and will not need to maintain LRU of files.
|
||||
// Setting max_open_files to -1 will get you the best possible performance.
|
||||
opts.set_max_open_files(2048);
|
||||
opts.set_max_open_files(4096);
|
||||
opts
|
||||
}
|
||||
|
||||
@@ -1223,7 +1227,299 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
|
||||
transaction_identifier: &TransactionIdentifier,
|
||||
inscription_input_index: usize,
|
||||
inscription_number: i64,
|
||||
traversals_cache: Arc<
|
||||
traversals_cache: &Arc<
|
||||
DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>,
|
||||
>,
|
||||
ctx: &Context,
|
||||
) -> Result<TraversalResult, String> {
|
||||
let mut inscription_offset_intra_output = 0;
|
||||
let mut inscription_output_index: usize = 0;
|
||||
let mut ordinal_offset = 0;
|
||||
let mut ordinal_block_number = block_identifier.index as u32;
|
||||
let txid = transaction_identifier.get_8_hash_bytes();
|
||||
|
||||
let mut blocks_db = open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
|
||||
|
||||
let (sats_ranges, inscription_offset_cross_outputs) = match traversals_cache
|
||||
.get(&(block_identifier.index as u32, txid.clone()))
|
||||
{
|
||||
Some(entry) => {
|
||||
let tx = entry.value();
|
||||
(
|
||||
tx.get_sat_ranges(),
|
||||
tx.get_cumulated_sats_in_until_input_index(inscription_input_index),
|
||||
)
|
||||
}
|
||||
None => {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
match find_lazy_block_at_block_height(
|
||||
ordinal_block_number,
|
||||
3,
|
||||
false,
|
||||
&blocks_db,
|
||||
&ctx,
|
||||
) {
|
||||
None => {
|
||||
if attempt < 3 {
|
||||
attempt += 1;
|
||||
blocks_db =
|
||||
open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
|
||||
} else {
|
||||
return Err(format!("block #{ordinal_block_number} not in database"));
|
||||
}
|
||||
}
|
||||
Some(block) => match block.find_and_serialize_transaction_with_txid(&txid) {
|
||||
Some(tx) => {
|
||||
let sats_ranges = tx.get_sat_ranges();
|
||||
let inscription_offset_cross_outputs =
|
||||
tx.get_cumulated_sats_in_until_input_index(inscription_input_index);
|
||||
traversals_cache.insert((ordinal_block_number, txid.clone()), tx);
|
||||
break (sats_ranges, inscription_offset_cross_outputs);
|
||||
}
|
||||
None => return Err(format!("txid not in block #{ordinal_block_number}")),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (i, (min, max)) in sats_ranges.into_iter().enumerate() {
|
||||
if inscription_offset_cross_outputs >= min && inscription_offset_cross_outputs < max {
|
||||
inscription_output_index = i;
|
||||
inscription_offset_intra_output = inscription_offset_cross_outputs - min;
|
||||
}
|
||||
}
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Computing ordinal number for Satoshi point {} ({}:0 -> {}:{}/{}) (block #{})",
|
||||
transaction_identifier.hash,
|
||||
inscription_input_index,
|
||||
inscription_output_index,
|
||||
inscription_offset_intra_output,
|
||||
inscription_offset_cross_outputs,
|
||||
block_identifier.index
|
||||
)
|
||||
});
|
||||
|
||||
let mut tx_cursor: ([u8; 8], usize) = (txid, inscription_input_index);
|
||||
let mut hops: u32 = 0;
|
||||
|
||||
loop {
|
||||
hops += 1;
|
||||
if hops as u64 > block_identifier.index {
|
||||
return Err(format!(
|
||||
"Unable to process transaction {} detected after {hops} iterations. Manual investigation required",
|
||||
transaction_identifier.hash
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(cached_tx) = traversals_cache.get(&(ordinal_block_number, tx_cursor.0)) {
|
||||
let tx = cached_tx.value();
|
||||
|
||||
let mut next_found_in_cache = false;
|
||||
let mut sats_out = 0;
|
||||
for (index, output_value) in tx.outputs.iter().enumerate() {
|
||||
if index == tx_cursor.1 {
|
||||
break;
|
||||
}
|
||||
sats_out += output_value;
|
||||
}
|
||||
sats_out += ordinal_offset;
|
||||
|
||||
let mut sats_in = 0;
|
||||
for input in tx.inputs.iter() {
|
||||
sats_in += input.txin_value;
|
||||
|
||||
if sats_out < sats_in {
|
||||
ordinal_offset = sats_out - (sats_in - input.txin_value);
|
||||
ordinal_block_number = input.block_height;
|
||||
tx_cursor = (input.txin.clone(), input.vout as usize);
|
||||
next_found_in_cache = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if next_found_in_cache {
|
||||
continue;
|
||||
}
|
||||
|
||||
if sats_in == 0 {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
logger,
|
||||
"Transaction {} is originating from a non spending transaction",
|
||||
transaction_identifier.hash
|
||||
)
|
||||
});
|
||||
return Ok(TraversalResult {
|
||||
inscription_number: 0,
|
||||
ordinal_number: 0,
|
||||
transfers: 0,
|
||||
inscription_input_index,
|
||||
transaction_identifier_inscription: transaction_identifier.clone(),
|
||||
transfer_data: TransferData {
|
||||
inscription_offset_intra_output,
|
||||
transaction_identifier_location: transaction_identifier.clone(),
|
||||
output_index: inscription_output_index,
|
||||
tx_index: 0,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let lazy_block = {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
match find_lazy_block_at_block_height(
|
||||
ordinal_block_number,
|
||||
3,
|
||||
false,
|
||||
&blocks_db,
|
||||
&ctx,
|
||||
) {
|
||||
Some(block) => break block,
|
||||
None => {
|
||||
if attempt < 3 {
|
||||
attempt += 1;
|
||||
blocks_db =
|
||||
open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
|
||||
} else {
|
||||
return Err(format!("block #{ordinal_block_number} not in database"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let coinbase_txid = lazy_block.get_coinbase_txid();
|
||||
let txid = tx_cursor.0;
|
||||
|
||||
// evaluate exit condition: did we reach the **final** coinbase transaction
|
||||
if coinbase_txid.eq(&txid) {
|
||||
let subsidy = Height(ordinal_block_number.into()).subsidy();
|
||||
if ordinal_offset < subsidy {
|
||||
// Great!
|
||||
break;
|
||||
}
|
||||
|
||||
// loop over the transaction fees to detect the right range
|
||||
let mut accumulated_fees = subsidy;
|
||||
|
||||
for tx in lazy_block.iter_tx() {
|
||||
let mut total_in = 0;
|
||||
for input in tx.inputs.iter() {
|
||||
total_in += input.txin_value;
|
||||
}
|
||||
|
||||
let mut total_out = 0;
|
||||
for output_value in tx.outputs.iter() {
|
||||
total_out += output_value;
|
||||
}
|
||||
|
||||
let fee = total_in - total_out;
|
||||
if accumulated_fees + fee > ordinal_offset {
|
||||
// We are looking at the right transaction
|
||||
// Retraverse the inputs to select the index to be picked
|
||||
let offset_within_fee = ordinal_offset - accumulated_fees;
|
||||
total_out += offset_within_fee;
|
||||
let mut sats_in = 0;
|
||||
|
||||
for input in tx.inputs.into_iter() {
|
||||
sats_in += input.txin_value;
|
||||
|
||||
if sats_in > total_out {
|
||||
ordinal_offset = total_out - (sats_in - input.txin_value);
|
||||
ordinal_block_number = input.block_height;
|
||||
tx_cursor = (input.txin.clone(), input.vout as usize);
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
accumulated_fees += fee;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// isolate the target transaction
|
||||
let lazy_tx = match lazy_block.find_and_serialize_transaction_with_txid(&txid) {
|
||||
Some(entry) => entry,
|
||||
None => unreachable!(),
|
||||
};
|
||||
|
||||
let mut sats_out = 0;
|
||||
for (index, output_value) in lazy_tx.outputs.iter().enumerate() {
|
||||
if index == tx_cursor.1 {
|
||||
break;
|
||||
}
|
||||
sats_out += output_value;
|
||||
}
|
||||
sats_out += ordinal_offset;
|
||||
|
||||
let mut sats_in = 0;
|
||||
for input in lazy_tx.inputs.iter() {
|
||||
sats_in += input.txin_value;
|
||||
|
||||
if sats_out < sats_in {
|
||||
traversals_cache.insert((ordinal_block_number, tx_cursor.0), lazy_tx.clone());
|
||||
ordinal_offset = sats_out - (sats_in - input.txin_value);
|
||||
ordinal_block_number = input.block_height;
|
||||
tx_cursor = (input.txin.clone(), input.vout as usize);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if sats_in == 0 {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
logger,
|
||||
"Transaction {} is originating from a non spending transaction",
|
||||
transaction_identifier.hash
|
||||
)
|
||||
});
|
||||
return Ok(TraversalResult {
|
||||
inscription_number: 0,
|
||||
ordinal_number: 0,
|
||||
transfers: 0,
|
||||
inscription_input_index,
|
||||
transaction_identifier_inscription: transaction_identifier.clone(),
|
||||
transfer_data: TransferData {
|
||||
inscription_offset_intra_output,
|
||||
transaction_identifier_location: transaction_identifier.clone(),
|
||||
output_index: inscription_output_index,
|
||||
tx_index: 0,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let height = Height(ordinal_block_number.into());
|
||||
let ordinal_number = height.starting_sat().0 + ordinal_offset + inscription_offset_intra_output;
|
||||
|
||||
Ok(TraversalResult {
|
||||
inscription_number,
|
||||
ordinal_number,
|
||||
transfers: hops,
|
||||
inscription_input_index,
|
||||
transaction_identifier_inscription: transaction_identifier.clone(),
|
||||
transfer_data: TransferData {
|
||||
inscription_offset_intra_output,
|
||||
transaction_identifier_location: transaction_identifier.clone(),
|
||||
output_index: inscription_output_index,
|
||||
tx_index: 0,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub fn retrieve_satoshi_point_using_lazy_storage_v3(
|
||||
blocks_db_dir: &PathBuf,
|
||||
block_identifier: &BlockIdentifier,
|
||||
transaction_identifier: &TransactionIdentifier,
|
||||
inscription_input_index: usize,
|
||||
inscription_number: i64,
|
||||
traversals_cache: &Arc<
|
||||
DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>,
|
||||
>,
|
||||
ctx: &Context,
|
||||
@@ -1858,10 +2154,72 @@ impl<'a> Iterator for LazyBlockTransactionIterator<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn process_blocks(
|
||||
mut raw_blocks: Vec<BitcoinBlockFullBreakdown>,
|
||||
bitcoin_network: BitcoinNetwork,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
|
||||
inscriptions_db_conn_rw: &mut Connection,
|
||||
hord_config: &HordConfig,
|
||||
ctx: &Context,
|
||||
) {
|
||||
let mut next_blocks = vec![];
|
||||
let mut cache_l1 = HashMap::new();
|
||||
|
||||
for raw_block in raw_blocks.drain(..) {
|
||||
let block =
|
||||
match hord::parse_ordinals_and_standardize_block(raw_block, &bitcoin_network, &ctx) {
|
||||
Ok(block) => block,
|
||||
Err((e, _)) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(logger, "Unable to standardize bitcoin block: {e}",)
|
||||
});
|
||||
// TODO(lgalabru): panic?
|
||||
continue;
|
||||
}
|
||||
};
|
||||
next_blocks.push(block);
|
||||
}
|
||||
|
||||
for _cursor in 0..next_blocks.len() {
|
||||
let mut block = next_blocks.remove(0);
|
||||
let _ = process_block(
|
||||
&mut block,
|
||||
&next_blocks,
|
||||
&mut cache_l1,
|
||||
cache_l2,
|
||||
inscriptions_db_conn_rw,
|
||||
hord_config,
|
||||
ctx,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn process_block(
|
||||
block: &mut BitcoinBlockData,
|
||||
next_blocks: &Vec<BitcoinBlockData>,
|
||||
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
|
||||
inscriptions_db_conn_rw: &mut Connection,
|
||||
hord_config: &HordConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
update_hord_db_and_augment_bitcoin_block_v3(
|
||||
block,
|
||||
next_blocks,
|
||||
cache_l1,
|
||||
cache_l2,
|
||||
inscriptions_db_conn_rw,
|
||||
hord_config,
|
||||
ctx,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn rebuild_rocks_db(
|
||||
config: &Config,
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
start_sequencing_blocks_at_height: u64,
|
||||
blocks_post_processor: Option<Sender<Vec<BitcoinBlockFullBreakdown>>>,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
// let guard = pprof::ProfilerGuardBuilder::default()
|
||||
@@ -1880,10 +2238,6 @@ pub async fn rebuild_rocks_db(
|
||||
|
||||
let hord_config = config.get_hord_config();
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(logger, "Generating report");
|
||||
});
|
||||
|
||||
let number_of_blocks_to_process = end_block - start_block + 1;
|
||||
let (block_req_lim, block_process_lim) = (128, 128);
|
||||
|
||||
@@ -1938,12 +2292,14 @@ pub async fn rebuild_rocks_db(
|
||||
|
||||
let blocks_db_rw = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let cloned_ctx = ctx.clone();
|
||||
let ingestion_thread = hiro_system_kit::thread_named("Block data ingestion")
|
||||
|
||||
let _storage_thread = hiro_system_kit::thread_named("Block data ingestion")
|
||||
.spawn(move || {
|
||||
let mut blocks_stored = 0;
|
||||
let mut num_writes = 0;
|
||||
|
||||
while let Ok(Some((block_height, compacted_block, _raw_block))) =
|
||||
let mut inbox = HashMap::new();
|
||||
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
|
||||
while let Ok(Some((block_height, compacted_block, raw_block))) =
|
||||
block_compressed_rx.recv()
|
||||
{
|
||||
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &cloned_ctx);
|
||||
@@ -1953,7 +2309,25 @@ pub async fn rebuild_rocks_db(
|
||||
// In the context of ordinals, we're constrained to process blocks sequentially
|
||||
// Blocks are processed by a threadpool and could be coming out of order.
|
||||
// Inbox block for later if the current block is not the one we should be
|
||||
// processing.
|
||||
// processing.
|
||||
if block_height as u64 >= start_sequencing_blocks_at_height {
|
||||
inbox.insert(block_height as u64, raw_block);
|
||||
let mut chunk = Vec::new();
|
||||
while let Some(next_block) = inbox.remove(&inbox_cursor) {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Dequeuing block #{inbox_cursor} for processing (# blocks inboxed: {})",
|
||||
inbox.len()
|
||||
)
|
||||
});
|
||||
chunk.push(next_block);
|
||||
inbox_cursor += 1;
|
||||
}
|
||||
if let Some(ref tx) = blocks_post_processor {
|
||||
let _ = tx.send(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
// Should we start look for inscriptions data in blocks?
|
||||
cloned_ctx.try_log(|logger| {
|
||||
|
||||
@@ -35,6 +35,7 @@ use crate::db::{
|
||||
find_inscription_with_ordinal_number, find_inscriptions_at_wached_outpoint,
|
||||
get_any_entry_in_ordinal_activities, insert_entry_in_blocks, insert_entry_in_inscriptions,
|
||||
insert_transfer_in_locations_tx, retrieve_satoshi_point_using_lazy_storage,
|
||||
retrieve_satoshi_point_using_lazy_storage_v3,
|
||||
};
|
||||
use crate::ord::height::Height;
|
||||
|
||||
@@ -321,7 +322,7 @@ pub fn retrieve_inscribed_satoshi_points_from_block(
|
||||
&transaction_id,
|
||||
input_index,
|
||||
0,
|
||||
local_cache,
|
||||
&local_cache,
|
||||
&moved_ctx,
|
||||
);
|
||||
let _ = moved_traversal_tx.send(traversal);
|
||||
@@ -1095,6 +1096,279 @@ fn test_ordinal_inscription_parsing() {
|
||||
println!("{:?}", inscription);
|
||||
}
|
||||
|
||||
pub fn get_transactions_to_process(
|
||||
block: &BitcoinBlockData,
|
||||
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
inscriptions_db_conn: &mut Connection,
|
||||
ctx: &Context,
|
||||
) -> Vec<(TransactionIdentifier, usize)> {
|
||||
let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![];
|
||||
|
||||
for tx in block.transactions.iter().skip(1) {
|
||||
// Have a new inscription been revealed, if so, are looking at a re-inscription
|
||||
for ordinal_event in tx.metadata.ordinal_operations.iter() {
|
||||
let (inscription_data, _is_cursed) = match ordinal_event {
|
||||
OrdinalOperation::InscriptionRevealed(inscription_data) => {
|
||||
(inscription_data, false)
|
||||
}
|
||||
OrdinalOperation::CursedInscriptionRevealed(inscription_data) => {
|
||||
(inscription_data, false)
|
||||
}
|
||||
OrdinalOperation::InscriptionTransferred(_) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if cache_l1.contains_key(&(
|
||||
tx.transaction_identifier.clone(),
|
||||
inscription_data.inscription_input_index,
|
||||
)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(Some((traversal, _))) = find_inscription_with_id(
|
||||
&inscription_data.inscription_id,
|
||||
&inscriptions_db_conn,
|
||||
ctx,
|
||||
) {
|
||||
cache_l1.insert(
|
||||
(
|
||||
tx.transaction_identifier.clone(),
|
||||
inscription_data.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
} else {
|
||||
// Enqueue for traversals
|
||||
transactions_ids.push((
|
||||
tx.transaction_identifier.clone(),
|
||||
inscription_data.inscription_input_index,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
transactions_ids
|
||||
}
|
||||
|
||||
pub fn retrieve_inscribed_satoshi_points_from_block_v3(
|
||||
block: &BitcoinBlockData,
|
||||
next_blocks: &Vec<BitcoinBlockData>,
|
||||
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
|
||||
inscriptions_db_conn: &mut Connection,
|
||||
hord_config: &HordConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
let mut transactions_ids =
|
||||
get_transactions_to_process(block, cache_l1, inscriptions_db_conn, ctx);
|
||||
|
||||
if !transactions_ids.is_empty() {
|
||||
let expected_traversals = transactions_ids.len();
|
||||
let (traversal_tx, traversal_rx) = channel();
|
||||
let traversal_data_pool = ThreadPool::new(hord_config.ingestion_thread_max);
|
||||
|
||||
let mut tx_thread_pool = vec![];
|
||||
for thread_index in 0..hord_config.ingestion_thread_max {
|
||||
let (tx, rx) = channel();
|
||||
tx_thread_pool.push(tx);
|
||||
|
||||
let moved_traversal_tx = traversal_tx.clone();
|
||||
let moved_ctx = ctx.clone();
|
||||
let block_identifier = block.block_identifier.clone();
|
||||
let moved_hord_db_path = hord_config.db_path.clone();
|
||||
let local_cache = cache_l2.clone();
|
||||
|
||||
traversal_data_pool.execute(move || {
|
||||
while let Ok(Some((transaction_id, input_index, prioritary))) = rx.recv() {
|
||||
let traversal: Result<TraversalResult, String> =
|
||||
retrieve_satoshi_point_using_lazy_storage_v3(
|
||||
&moved_hord_db_path,
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
input_index,
|
||||
0,
|
||||
&local_cache,
|
||||
&moved_ctx,
|
||||
);
|
||||
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let mut rng = thread_rng();
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
let mut priority_queue = VecDeque::new();
|
||||
let mut warmup_queue = VecDeque::new();
|
||||
|
||||
for (transaction_id, input_index) in transactions_ids.into_iter() {
|
||||
priority_queue.push_back((transaction_id, input_index, true));
|
||||
}
|
||||
|
||||
// Feed each workers with 2 workitems each
|
||||
for thread_index in 0..hord_config.ingestion_thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
|
||||
}
|
||||
for thread_index in 0..hord_config.ingestion_thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
|
||||
}
|
||||
|
||||
let mut next_block_iter = next_blocks.iter();
|
||||
let mut traversals_received = 0;
|
||||
while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() {
|
||||
if prioritary {
|
||||
traversals_received += 1;
|
||||
}
|
||||
match traversal_result {
|
||||
Ok(traversal) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}).",
|
||||
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
|
||||
)
|
||||
});
|
||||
cache_l1.insert(
|
||||
(
|
||||
traversal.transaction_identifier_inscription.clone(),
|
||||
traversal.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(logger, "Unable to compute inscription's Satoshi: {e}",)
|
||||
});
|
||||
}
|
||||
}
|
||||
if traversals_received == expected_traversals {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(w) = priority_queue.pop_front() {
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(w) = warmup_queue.pop_front() {
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(block) = next_block_iter.next() {
|
||||
let mut transactions_ids =
|
||||
get_transactions_to_process(block, cache_l1, inscriptions_db_conn, ctx);
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
for (transaction_id, input_index) in transactions_ids.into_iter() {
|
||||
warmup_queue.push_back((transaction_id, input_index, false));
|
||||
}
|
||||
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for thread_index in 0..hord_config.ingestion_thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(None);
|
||||
}
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Block data compression").spawn(move || {
|
||||
let _ = traversal_data_pool.join();
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update_hord_db_and_augment_bitcoin_block_v3(
|
||||
new_block: &mut BitcoinBlockData,
|
||||
next_blocks: &Vec<BitcoinBlockData>,
|
||||
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
|
||||
inscriptions_db_conn_rw: &mut Connection,
|
||||
hord_config: &HordConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
let _ = retrieve_inscribed_satoshi_points_from_block_v3(
|
||||
&new_block,
|
||||
&next_blocks,
|
||||
cache_l1,
|
||||
cache_l2,
|
||||
inscriptions_db_conn_rw,
|
||||
&hord_config,
|
||||
ctx,
|
||||
);
|
||||
|
||||
let discard_changes: bool = get_any_entry_in_ordinal_activities(
|
||||
&new_block.block_identifier.index,
|
||||
inscriptions_db_conn_rw,
|
||||
ctx,
|
||||
);
|
||||
|
||||
let inner_ctx = if discard_changes {
|
||||
Context::empty()
|
||||
} else {
|
||||
ctx.clone()
|
||||
};
|
||||
|
||||
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
|
||||
let any_inscription_revealed =
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
|
||||
new_block,
|
||||
&transaction,
|
||||
&cache_l1,
|
||||
&inner_ctx,
|
||||
)?;
|
||||
|
||||
// Have inscriptions been transfered?
|
||||
let any_inscription_transferred =
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
|
||||
new_block,
|
||||
&transaction,
|
||||
&inner_ctx,
|
||||
)?;
|
||||
|
||||
if !any_inscription_revealed && !any_inscription_transferred {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if discard_changes {
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Ignoring updates for block #{}, activities present in database",
|
||||
new_block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
} else {
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Saving updates for block {}",
|
||||
new_block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
transaction.commit().unwrap();
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Updates saved for block {}",
|
||||
new_block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
let inscriptions_revealed = get_inscriptions_revealed_in_block(&new_block)
|
||||
.iter()
|
||||
.map(|d| d.inscription_number.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Block #{} processed through hord, revealing {} inscriptions [{}]",
|
||||
new_block.block_identifier.index,
|
||||
inscriptions_revealed.len(),
|
||||
inscriptions_revealed.join(", ")
|
||||
)
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
|
||||
block: &mut BitcoinBlockData,
|
||||
hord_db_tx: &Transaction,
|
||||
@@ -1260,7 +1534,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_t
|
||||
pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
|
||||
block: &mut BitcoinBlockData,
|
||||
transaction: &Transaction,
|
||||
traversals: &HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
cache_l1: &HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
ctx: &Context,
|
||||
) -> Result<bool, String> {
|
||||
let mut storage_updated = false;
|
||||
@@ -1309,7 +1583,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
|
||||
};
|
||||
|
||||
let transaction_identifier = new_tx.transaction_identifier.clone();
|
||||
let traversal = match traversals
|
||||
let traversal = match cache_l1
|
||||
.get(&(transaction_identifier, inscription.inscription_input_index))
|
||||
{
|
||||
Some(traversal) => traversal,
|
||||
|
||||
@@ -1,30 +1,54 @@
|
||||
use std::sync::mpsc::{channel, Sender};
|
||||
use std::{
|
||||
sync::{
|
||||
mpsc::{channel, Sender},
|
||||
Arc,
|
||||
},
|
||||
thread::JoinHandle,
|
||||
};
|
||||
|
||||
use super::HordConfig;
|
||||
use chainhook_sdk::{indexer::bitcoin::BitcoinBlockFullBreakdown, utils::Context};
|
||||
|
||||
pub struct Batch {
|
||||
pub jobs: Vec<Job>,
|
||||
pub jobs_count: u32,
|
||||
}
|
||||
use crate::{
|
||||
config::Config,
|
||||
db::{open_readwrite_hord_db_conn, process_blocks},
|
||||
};
|
||||
|
||||
pub struct Job {
|
||||
pub inputs: JobInputs,
|
||||
pub result: Result<JobResult, String>,
|
||||
}
|
||||
use super::new_traversals_lazy_cache;
|
||||
|
||||
pub struct JobInputs {}
|
||||
|
||||
pub struct JobResult {}
|
||||
|
||||
pub fn start_ordinals_number_processor(_config: &HordConfig) -> Sender<Batch> {
|
||||
pub fn start_ordinals_number_processor(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
) -> (Sender<Vec<BitcoinBlockFullBreakdown>>, JoinHandle<()>) {
|
||||
// let mut batches = HashMap::new();
|
||||
|
||||
let (tx, _rx) = channel();
|
||||
let (tx, rx) = channel::<Vec<BitcoinBlockFullBreakdown>>();
|
||||
// let (inner_tx, inner_rx) = channel();
|
||||
|
||||
hiro_system_kit::thread_named("Batch receiver")
|
||||
.spawn(move || {})
|
||||
let config = config.clone();
|
||||
let ctx = ctx.clone();
|
||||
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Batch receiver")
|
||||
.spawn(move || {
|
||||
let cache_l2 = Arc::new(new_traversals_lazy_cache(1024));
|
||||
let mut inscriptions_db_conn_rw =
|
||||
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
|
||||
let hord_config = config.get_hord_config();
|
||||
while let Ok(raw_blocks) = rx.recv() {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Processing {} blocks",
|
||||
raw_blocks.len()
|
||||
);
|
||||
process_blocks(
|
||||
raw_blocks,
|
||||
config.network.bitcoin_network.clone(),
|
||||
&cache_l2,
|
||||
&mut inscriptions_db_conn_rw,
|
||||
&hord_config,
|
||||
&ctx,
|
||||
)
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
tx
|
||||
(tx, handle)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user