feat: multithread traversals

This commit is contained in:
Ludo Galabru
2023-03-27 21:32:38 -04:00
parent 65afd77492
commit fba5c89a48
5 changed files with 106 additions and 59 deletions

View File

@@ -486,16 +486,16 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
hash: "".into(),
};
let (block_height, offset, ordinal_number, hops) =
retrieve_satoshi_point_using_local_storage(
&hord_db_conn,
&block_identifier,
&transaction_identifier,
&ctx,
)?;
let traversal = retrieve_satoshi_point_using_local_storage(
&hord_db_conn,
&block_identifier,
&transaction_identifier,
&ctx,
)?;
info!(
ctx.expect_logger(),
"Satoshi #{ordinal_number} was minted in block #{block_height} at offset {offset} and was transferred {hops} times.",
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times.",
traversal.ordinal_number, traversal.ordinal_block_number, traversal.ordinal_offset, traversal.transfers
);
}
FindCommand::Inscription(cmd) => {
@@ -588,8 +588,9 @@ pub async fn perform_hord_db_update(
&rw_hord_db_conn,
start_block,
end_block,
&ctx,
network_threads,
&config.expected_cache_path(),
&ctx,
)
.await?;

View File

@@ -114,8 +114,9 @@ pub async fn scan_bitcoin_chain_with_predicate(
&rw_hord_db_conn,
start_block,
end_block,
&ctx,
8,
&config.expected_cache_path(),
&ctx,
)
.await?;

View File

@@ -557,8 +557,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
rw_hord_db_conn: &Connection,
start_block: u64,
end_block: u64,
ctx: &Context,
network_thread: usize,
hord_db_path: &PathBuf,
ctx: &Context,
) -> Result<(), String> {
let number_of_blocks_to_process = end_block - start_block + 1;
let retrieve_block_hash_pool = ThreadPool::new(network_thread);
@@ -673,8 +674,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
&mut new_block,
&rw_hord_db_conn,
&ctx,
false,
&hord_db_path,
&ctx,
) {
ctx.try_log(|logger| {
slog::error!(logger, "Unable to augment bitcoin block with hord_db: {e}",)
@@ -702,12 +704,19 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
Ok(())
}
pub struct TraversalResult {
pub ordinal_block_number: u64,
pub ordinal_offset: u64,
pub ordinal_number: u64,
pub transfers: u32,
}
pub fn retrieve_satoshi_point_using_local_storage(
hord_db_conn: &Connection,
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
ctx: &Context,
) -> Result<(u64, u64, u64, u32), String> {
) -> Result<TraversalResult, String> {
ctx.try_log(|logger| {
slog::info!(
logger,
@@ -781,7 +790,6 @@ pub fn retrieve_satoshi_point_using_local_storage(
if sats_in >= total_out {
ordinal_offset = total_out - (sats_in - txin_value);
ordinal_block_number = block_height;
// println!("{h}: {blockhash} -> {} [in:{} , out: {}] {}/{vout} (input #{in_index}) {compounded_offset}", transaction.txid, transaction.vin.len(), transaction.vout.len(), txid);
tx_cursor = (txin, vout as usize);
break;
}
@@ -848,10 +856,10 @@ pub fn retrieve_satoshi_point_using_local_storage(
let height = Height(ordinal_block_number.into());
let ordinal_number = height.starting_sat().0 + ordinal_offset;
Ok((
ordinal_block_number.into(),
Ok(TraversalResult {
ordinal_block_number: ordinal_block_number.into(),
ordinal_offset,
ordinal_number,
hops,
))
transfers: hops,
})
}

View File

@@ -4,10 +4,15 @@ pub mod ord;
use bitcoincore_rpc::bitcoin::hashes::hex::FromHex;
use bitcoincore_rpc::bitcoin::{Address, Network, Script};
use chainhook_types::{BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation};
use chainhook_types::{
BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
};
use hiro_system_kit::slog;
use rusqlite::Connection;
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::path::PathBuf;
use std::sync::mpsc::channel;
use threadpool::ThreadPool;
use crate::{
hord::{
@@ -22,7 +27,10 @@ use crate::{
utils::Context,
};
use self::db::{remove_entry_from_blocks, remove_entry_from_inscriptions};
use self::db::{
open_readonly_hord_db_conn, remove_entry_from_blocks, remove_entry_from_inscriptions,
TraversalResult,
};
pub fn revert_hord_db_with_augmented_bitcoin_block(
block: &BitcoinBlockData,
@@ -64,8 +72,9 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
pub fn update_hord_db_and_augment_bitcoin_block(
new_block: &mut BitcoinBlockData,
rw_hord_db_conn: &Connection,
ctx: &Context,
write_block: bool,
hord_db_path: &PathBuf,
ctx: &Context,
) -> Result<(), String> {
if write_block {
ctx.try_log(|logger| {
@@ -92,6 +101,49 @@ pub fn update_hord_db_and_augment_bitcoin_block(
.clone();
let first_sat_post_subsidy = Height(new_block.block_identifier.index).starting_sat().0;
let mut transactions_ids = vec![];
for new_tx in new_block.transactions.iter_mut().skip(1) {
// Have a new inscription been revealed, if so, are looking at a re-inscription
for ordinal_event in
new_tx.metadata.ordinal_operations.iter_mut()
{
if let OrdinalOperation::InscriptionRevealed(_) = ordinal_event {
transactions_ids.push(new_tx.transaction_identifier.clone());
}
}
}
let expected_traversals = transactions_ids.len();
let (traversal_tx, traversal_rx) = channel::<(TransactionIdentifier, TraversalResult)>();
let traversal_data_pool = ThreadPool::new(10);
for transaction_id in transactions_ids.into_iter() {
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = ctx.clone();
let block_identifier = new_block.block_identifier.clone();
let hord_db_path = hord_db_path.clone();
traversal_data_pool.execute(move || {
let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &moved_ctx).unwrap();
let traversal = retrieve_satoshi_point_using_local_storage(
&hord_db_conn,
&block_identifier,
&transaction_id,
&moved_ctx,
)
.unwrap();
let _ = moved_traversal_tx.send((transaction_id, traversal));
});
}
let mut traversals = HashMap::new();
let mut traversals_received = 0;
while let Ok((transaction_identifier, traversal_result)) = traversal_rx.recv() {
traversals_received += 1;
traversals.insert(transaction_identifier, traversal_result);
if traversals_received == expected_traversals {
break;
}
}
for new_tx in new_block.transactions.iter_mut().skip(1) {
let mut ordinals_events_indexes_to_discard = VecDeque::new();
// Have a new inscription been revealed, if so, are looking at a re-inscription
@@ -99,53 +151,36 @@ pub fn update_hord_db_and_augment_bitcoin_block(
new_tx.metadata.ordinal_operations.iter_mut().enumerate()
{
if let OrdinalOperation::InscriptionRevealed(inscription) = ordinal_event {
let (
ordinal_block_height,
ordinal_offset,
ordinal_number,
transfers_pre_inscription,
) = {
// Are we looking at a re-inscription?
let res = retrieve_satoshi_point_using_local_storage(
&rw_hord_db_conn,
&new_block.block_identifier,
&new_tx.transaction_identifier,
&ctx,
);
match res {
Ok(res) => res,
Err(e) => {
ctx.try_log(|logger| {
slog::error!(
logger,
"unable to retrieve satoshi point: {}",
e.to_string()
);
});
continue;
}
let traversal = match traversals.get(&new_tx.transaction_identifier) {
Some(traversal) => traversal,
None => {
ctx.try_log(|logger| {
slog::error!(logger, "unable to retrieve satoshi point",);
});
continue;
}
};
if let Some(_entry) =
find_inscription_with_ordinal_number(&ordinal_number, &rw_hord_db_conn, &ctx)
{
if let Some(_entry) = find_inscription_with_ordinal_number(
&traversal.ordinal_number,
&rw_hord_db_conn,
&ctx,
) {
ctx.try_log(|logger| {
slog::warn!(
logger,
"Transaction {} in block {} is overriding an existing inscription {}",
new_tx.transaction_identifier.hash,
new_block.block_identifier.index,
ordinal_number
traversal.ordinal_number
);
});
ordinals_events_indexes_to_discard.push_front(ordinal_event_index);
} else {
inscription.ordinal_offset = ordinal_offset;
inscription.ordinal_block_height = ordinal_block_height;
inscription.ordinal_number = ordinal_number;
inscription.transfers_pre_inscription = transfers_pre_inscription;
inscription.ordinal_offset = traversal.ordinal_offset;
inscription.ordinal_block_height = traversal.ordinal_block_number;
inscription.ordinal_number = traversal.ordinal_number;
inscription.transfers_pre_inscription = traversal.transfers;
inscription.inscription_number =
match find_latest_inscription_number(&rw_hord_db_conn, &ctx) {
Ok(inscription_number) => inscription_number,
@@ -167,7 +202,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
new_tx.transaction_identifier.hash,
new_block.block_identifier.index,
inscription.content_type,
ordinal_number
traversal.ordinal_number
);
});

View File

@@ -553,8 +553,9 @@ pub async fn start_observer_commands_handler(
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block,
&rw_hord_db_conn,
&ctx,
true,
&config.get_cache_path_buf(),
&ctx,
) {
ctx.try_log(|logger| {
slog::error!(
@@ -663,8 +664,9 @@ pub async fn start_observer_commands_handler(
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block,
&rw_hord_db_conn,
&ctx,
true,
&config.get_cache_path_buf(),
&ctx,
) {
ctx.try_log(|logger| {
slog::error!(