mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-15 08:58:29 +08:00
feat: multithread traversals
This commit is contained in:
@@ -486,16 +486,16 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
hash: "".into(),
|
||||
};
|
||||
|
||||
let (block_height, offset, ordinal_number, hops) =
|
||||
retrieve_satoshi_point_using_local_storage(
|
||||
&hord_db_conn,
|
||||
&block_identifier,
|
||||
&transaction_identifier,
|
||||
&ctx,
|
||||
)?;
|
||||
let traversal = retrieve_satoshi_point_using_local_storage(
|
||||
&hord_db_conn,
|
||||
&block_identifier,
|
||||
&transaction_identifier,
|
||||
&ctx,
|
||||
)?;
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Satoshi #{ordinal_number} was minted in block #{block_height} at offset {offset} and was transferred {hops} times.",
|
||||
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times.",
|
||||
traversal.ordinal_number, traversal.ordinal_block_number, traversal.ordinal_offset, traversal.transfers
|
||||
);
|
||||
}
|
||||
FindCommand::Inscription(cmd) => {
|
||||
@@ -588,8 +588,9 @@ pub async fn perform_hord_db_update(
|
||||
&rw_hord_db_conn,
|
||||
start_block,
|
||||
end_block,
|
||||
&ctx,
|
||||
network_threads,
|
||||
&config.expected_cache_path(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -114,8 +114,9 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
&rw_hord_db_conn,
|
||||
start_block,
|
||||
end_block,
|
||||
&ctx,
|
||||
8,
|
||||
&config.expected_cache_path(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -557,8 +557,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
rw_hord_db_conn: &Connection,
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
ctx: &Context,
|
||||
network_thread: usize,
|
||||
hord_db_path: &PathBuf,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
let number_of_blocks_to_process = end_block - start_block + 1;
|
||||
let retrieve_block_hash_pool = ThreadPool::new(network_thread);
|
||||
@@ -673,8 +674,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
&mut new_block,
|
||||
&rw_hord_db_conn,
|
||||
&ctx,
|
||||
false,
|
||||
&hord_db_path,
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(logger, "Unable to augment bitcoin block with hord_db: {e}",)
|
||||
@@ -702,12 +704,19 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct TraversalResult {
|
||||
pub ordinal_block_number: u64,
|
||||
pub ordinal_offset: u64,
|
||||
pub ordinal_number: u64,
|
||||
pub transfers: u32,
|
||||
}
|
||||
|
||||
pub fn retrieve_satoshi_point_using_local_storage(
|
||||
hord_db_conn: &Connection,
|
||||
block_identifier: &BlockIdentifier,
|
||||
transaction_identifier: &TransactionIdentifier,
|
||||
ctx: &Context,
|
||||
) -> Result<(u64, u64, u64, u32), String> {
|
||||
) -> Result<TraversalResult, String> {
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
@@ -781,7 +790,6 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
if sats_in >= total_out {
|
||||
ordinal_offset = total_out - (sats_in - txin_value);
|
||||
ordinal_block_number = block_height;
|
||||
// println!("{h}: {blockhash} -> {} [in:{} , out: {}] {}/{vout} (input #{in_index}) {compounded_offset}", transaction.txid, transaction.vin.len(), transaction.vout.len(), txid);
|
||||
tx_cursor = (txin, vout as usize);
|
||||
break;
|
||||
}
|
||||
@@ -848,10 +856,10 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
let height = Height(ordinal_block_number.into());
|
||||
let ordinal_number = height.starting_sat().0 + ordinal_offset;
|
||||
|
||||
Ok((
|
||||
ordinal_block_number.into(),
|
||||
Ok(TraversalResult {
|
||||
ordinal_block_number: ordinal_block_number.into(),
|
||||
ordinal_offset,
|
||||
ordinal_number,
|
||||
hops,
|
||||
))
|
||||
transfers: hops,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,10 +4,15 @@ pub mod ord;
|
||||
|
||||
use bitcoincore_rpc::bitcoin::hashes::hex::FromHex;
|
||||
use bitcoincore_rpc::bitcoin::{Address, Network, Script};
|
||||
use chainhook_types::{BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation};
|
||||
use chainhook_types::{
|
||||
BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
|
||||
};
|
||||
use hiro_system_kit::slog;
|
||||
use rusqlite::Connection;
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::mpsc::channel;
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
use crate::{
|
||||
hord::{
|
||||
@@ -22,7 +27,10 @@ use crate::{
|
||||
utils::Context,
|
||||
};
|
||||
|
||||
use self::db::{remove_entry_from_blocks, remove_entry_from_inscriptions};
|
||||
use self::db::{
|
||||
open_readonly_hord_db_conn, remove_entry_from_blocks, remove_entry_from_inscriptions,
|
||||
TraversalResult,
|
||||
};
|
||||
|
||||
pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
block: &BitcoinBlockData,
|
||||
@@ -64,8 +72,9 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
new_block: &mut BitcoinBlockData,
|
||||
rw_hord_db_conn: &Connection,
|
||||
ctx: &Context,
|
||||
write_block: bool,
|
||||
hord_db_path: &PathBuf,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
if write_block {
|
||||
ctx.try_log(|logger| {
|
||||
@@ -92,6 +101,49 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
.clone();
|
||||
let first_sat_post_subsidy = Height(new_block.block_identifier.index).starting_sat().0;
|
||||
|
||||
let mut transactions_ids = vec![];
|
||||
for new_tx in new_block.transactions.iter_mut().skip(1) {
|
||||
// Have a new inscription been revealed, if so, are looking at a re-inscription
|
||||
for ordinal_event in
|
||||
new_tx.metadata.ordinal_operations.iter_mut()
|
||||
{
|
||||
if let OrdinalOperation::InscriptionRevealed(_) = ordinal_event {
|
||||
transactions_ids.push(new_tx.transaction_identifier.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
let expected_traversals = transactions_ids.len();
|
||||
let (traversal_tx, traversal_rx) = channel::<(TransactionIdentifier, TraversalResult)>();
|
||||
let traversal_data_pool = ThreadPool::new(10);
|
||||
|
||||
for transaction_id in transactions_ids.into_iter() {
|
||||
let moved_traversal_tx = traversal_tx.clone();
|
||||
let moved_ctx = ctx.clone();
|
||||
let block_identifier = new_block.block_identifier.clone();
|
||||
let hord_db_path = hord_db_path.clone();
|
||||
traversal_data_pool.execute(move || {
|
||||
let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &moved_ctx).unwrap();
|
||||
let traversal = retrieve_satoshi_point_using_local_storage(
|
||||
&hord_db_conn,
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
&moved_ctx,
|
||||
)
|
||||
.unwrap();
|
||||
let _ = moved_traversal_tx.send((transaction_id, traversal));
|
||||
});
|
||||
}
|
||||
|
||||
let mut traversals = HashMap::new();
|
||||
let mut traversals_received = 0;
|
||||
while let Ok((transaction_identifier, traversal_result)) = traversal_rx.recv() {
|
||||
traversals_received += 1;
|
||||
traversals.insert(transaction_identifier, traversal_result);
|
||||
if traversals_received == expected_traversals {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for new_tx in new_block.transactions.iter_mut().skip(1) {
|
||||
let mut ordinals_events_indexes_to_discard = VecDeque::new();
|
||||
// Have a new inscription been revealed, if so, are looking at a re-inscription
|
||||
@@ -99,53 +151,36 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
new_tx.metadata.ordinal_operations.iter_mut().enumerate()
|
||||
{
|
||||
if let OrdinalOperation::InscriptionRevealed(inscription) = ordinal_event {
|
||||
let (
|
||||
ordinal_block_height,
|
||||
ordinal_offset,
|
||||
ordinal_number,
|
||||
transfers_pre_inscription,
|
||||
) = {
|
||||
// Are we looking at a re-inscription?
|
||||
let res = retrieve_satoshi_point_using_local_storage(
|
||||
&rw_hord_db_conn,
|
||||
&new_block.block_identifier,
|
||||
&new_tx.transaction_identifier,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
match res {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
logger,
|
||||
"unable to retrieve satoshi point: {}",
|
||||
e.to_string()
|
||||
);
|
||||
});
|
||||
continue;
|
||||
}
|
||||
let traversal = match traversals.get(&new_tx.transaction_identifier) {
|
||||
Some(traversal) => traversal,
|
||||
None => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(logger, "unable to retrieve satoshi point",);
|
||||
});
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(_entry) =
|
||||
find_inscription_with_ordinal_number(&ordinal_number, &rw_hord_db_conn, &ctx)
|
||||
{
|
||||
if let Some(_entry) = find_inscription_with_ordinal_number(
|
||||
&traversal.ordinal_number,
|
||||
&rw_hord_db_conn,
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::warn!(
|
||||
logger,
|
||||
"Transaction {} in block {} is overriding an existing inscription {}",
|
||||
new_tx.transaction_identifier.hash,
|
||||
new_block.block_identifier.index,
|
||||
ordinal_number
|
||||
traversal.ordinal_number
|
||||
);
|
||||
});
|
||||
ordinals_events_indexes_to_discard.push_front(ordinal_event_index);
|
||||
} else {
|
||||
inscription.ordinal_offset = ordinal_offset;
|
||||
inscription.ordinal_block_height = ordinal_block_height;
|
||||
inscription.ordinal_number = ordinal_number;
|
||||
inscription.transfers_pre_inscription = transfers_pre_inscription;
|
||||
inscription.ordinal_offset = traversal.ordinal_offset;
|
||||
inscription.ordinal_block_height = traversal.ordinal_block_number;
|
||||
inscription.ordinal_number = traversal.ordinal_number;
|
||||
inscription.transfers_pre_inscription = traversal.transfers;
|
||||
inscription.inscription_number =
|
||||
match find_latest_inscription_number(&rw_hord_db_conn, &ctx) {
|
||||
Ok(inscription_number) => inscription_number,
|
||||
@@ -167,7 +202,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
new_tx.transaction_identifier.hash,
|
||||
new_block.block_identifier.index,
|
||||
inscription.content_type,
|
||||
ordinal_number
|
||||
traversal.ordinal_number
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
@@ -553,8 +553,9 @@ pub async fn start_observer_commands_handler(
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
block,
|
||||
&rw_hord_db_conn,
|
||||
&ctx,
|
||||
true,
|
||||
&config.get_cache_path_buf(),
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
@@ -663,8 +664,9 @@ pub async fn start_observer_commands_handler(
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
block,
|
||||
&rw_hord_db_conn,
|
||||
&ctx,
|
||||
true,
|
||||
&config.get_cache_path_buf(),
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
|
||||
Reference in New Issue
Block a user