mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 22:43:06 +08:00
feat: ability to target blocks
This commit is contained in:
@@ -1,13 +1,7 @@
|
||||
use crate::config::{Config, PredicatesApi};
|
||||
use crate::core::protocol::sequencing::{
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx,
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
|
||||
};
|
||||
use crate::core::pipeline::processors::inscription_indexing::re_augment_block_with_ordinals_operations;
|
||||
use crate::core::{self, get_inscriptions_revealed_in_block};
|
||||
use crate::db::{
|
||||
find_all_inscriptions_in_block, get_any_entry_in_ordinal_activities,
|
||||
open_readonly_hord_db_conn, InscriptionHeigthHint,
|
||||
};
|
||||
use crate::db::{get_any_entry_in_ordinal_activities, open_readonly_hord_db_conn};
|
||||
use crate::download::download_ordinals_dataset_if_required;
|
||||
use crate::service::{
|
||||
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
|
||||
@@ -19,7 +13,7 @@ use chainhook_sdk::chainhooks::bitcoin::{
|
||||
evaluate_bitcoin_chainhooks_on_chain_event, handle_bitcoin_hook_action,
|
||||
BitcoinChainhookOccurrence, BitcoinTriggerChainhook,
|
||||
};
|
||||
use chainhook_sdk::chainhooks::types::{BitcoinChainhookSpecification, BitcoinPredicateType};
|
||||
use chainhook_sdk::chainhooks::types::BitcoinChainhookSpecification;
|
||||
use chainhook_sdk::indexer::bitcoin::{
|
||||
build_http_client, download_and_parse_block_with_retry, retrieve_block_hash_with_retry,
|
||||
};
|
||||
@@ -27,7 +21,7 @@ use chainhook_sdk::observer::{gather_proofs, EventObserverConfig};
|
||||
use chainhook_sdk::types::{
|
||||
BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData,
|
||||
};
|
||||
use chainhook_sdk::utils::{file_append, send_request, Context};
|
||||
use chainhook_sdk::utils::{file_append, send_request, BlockHeights, Context};
|
||||
use std::collections::HashMap;
|
||||
|
||||
// TODO(lgalabru): Re-introduce support for blocks[] !!! gracefully handle hints for non consecutive blocks
|
||||
@@ -49,72 +43,69 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
|
||||
}
|
||||
};
|
||||
let mut floating_end_block = false;
|
||||
|
||||
let start_block = match predicate_spec.start_block {
|
||||
Some(start_block) => start_block,
|
||||
None => {
|
||||
return Err(
|
||||
"Bitcoin chainhook specification must include a field start_block in replay mode"
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let (mut end_block, floating_end_block) = match predicate_spec.end_block {
|
||||
Some(end_block) => (end_block, false),
|
||||
None => match bitcoin_rpc.get_blockchain_info() {
|
||||
Ok(result) => (result.blocks - 1, true),
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"unable to retrieve Bitcoin chain tip ({})",
|
||||
e.to_string()
|
||||
));
|
||||
let mut block_heights_to_scan = if let Some(ref blocks) = predicate_spec.blocks {
|
||||
BlockHeights::Blocks(blocks.clone()).get_sorted_entries()
|
||||
} else {
|
||||
let start_block = match predicate_spec.start_block {
|
||||
Some(start_block) => start_block,
|
||||
None => {
|
||||
return Err(
|
||||
"Bitcoin chainhook specification must include a field start_block in replay mode"
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
},
|
||||
};
|
||||
let (end_block, update_end_block) = match predicate_spec.end_block {
|
||||
Some(end_block) => (end_block, false),
|
||||
None => match bitcoin_rpc.get_blockchain_info() {
|
||||
Ok(result) => (result.blocks - 1, true),
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"unable to retrieve Bitcoin chain tip ({})",
|
||||
e.to_string()
|
||||
));
|
||||
}
|
||||
},
|
||||
};
|
||||
floating_end_block = update_end_block;
|
||||
BlockHeights::BlockRange(start_block, end_block).get_sorted_entries()
|
||||
};
|
||||
|
||||
// Are we dealing with an ordinals-based predicate?
|
||||
// If so, we could use the ordinal storage to provide a set of hints.
|
||||
let mut inscriptions_db_conn = None;
|
||||
|
||||
if let BitcoinPredicateType::OrdinalsProtocol(_) = &predicate_spec.predicate {
|
||||
inscriptions_db_conn = Some(open_readonly_hord_db_conn(
|
||||
&config.expected_cache_path(),
|
||||
ctx,
|
||||
)?);
|
||||
}
|
||||
let inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Starting predicate evaluation on Bitcoin blocks",
|
||||
);
|
||||
|
||||
let mut blocks_scanned = 0;
|
||||
let mut actions_triggered = 0;
|
||||
let occurrences_found = 0u64;
|
||||
let mut err_count = 0;
|
||||
|
||||
let event_observer_config = config.get_event_observer_config();
|
||||
let bitcoin_config = event_observer_config.get_bitcoin_config();
|
||||
let mut traversals = HashMap::new();
|
||||
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
|
||||
let mut number_of_blocks_scanned = 0;
|
||||
let mut number_of_blocks_sent = 0u64;
|
||||
let http_client = build_http_client();
|
||||
|
||||
let mut cursor = start_block.saturating_sub(1);
|
||||
while let Some(current_block_height) = block_heights_to_scan.pop_front() {
|
||||
number_of_blocks_scanned += 1;
|
||||
|
||||
let mut inscription_height_hint = InscriptionHeigthHint::new();
|
||||
|
||||
while cursor <= end_block {
|
||||
cursor += 1;
|
||||
blocks_scanned += 1;
|
||||
|
||||
if let Some(ref inscriptions_db_conn) = inscriptions_db_conn {
|
||||
if !get_any_entry_in_ordinal_activities(&cursor, &inscriptions_db_conn, &ctx) {
|
||||
continue;
|
||||
}
|
||||
if !get_any_entry_in_ordinal_activities(¤t_block_height, &inscriptions_db_conn, &ctx)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let block_hash =
|
||||
retrieve_block_hash_with_retry(&http_client, &cursor, &bitcoin_config, ctx).await?;
|
||||
let block_hash = retrieve_block_hash_with_retry(
|
||||
&http_client,
|
||||
¤t_block_height,
|
||||
&bitcoin_config,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let block_breakdown =
|
||||
download_and_parse_block_with_retry(&http_client, &block_hash, &bitcoin_config, ctx)
|
||||
.await?;
|
||||
@@ -127,49 +118,26 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
Err((e, _)) => {
|
||||
warn!(
|
||||
ctx.expect_logger(),
|
||||
"Unable to standardize block#{} {}: {}", cursor, block_hash, e
|
||||
"Unable to standardize block#{} {}: {}", current_block_height, block_hash, e
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(ref mut inscriptions_db_conn) = inscriptions_db_conn {
|
||||
// Evaluating every single block is required for also keeping track of transfers.
|
||||
let local_traverals =
|
||||
find_all_inscriptions_in_block(&cursor, &inscriptions_db_conn, &ctx);
|
||||
for (key, traversal_result) in local_traverals.into_iter() {
|
||||
traversals.insert(key, traversal_result);
|
||||
}
|
||||
let empty_ctx = Context::empty();
|
||||
re_augment_block_with_ordinals_operations(&mut block, &inscriptions_db_conn, &empty_ctx);
|
||||
|
||||
let transaction = inscriptions_db_conn.transaction().unwrap();
|
||||
let empty_ctx = Context::empty();
|
||||
let _ = update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
|
||||
&mut block,
|
||||
&transaction,
|
||||
&traversals,
|
||||
&mut inscription_height_hint,
|
||||
&empty_ctx,
|
||||
)?;
|
||||
let inscriptions_revealed = get_inscriptions_revealed_in_block(&block)
|
||||
.iter()
|
||||
.map(|d| d.inscription_number.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
let _ = update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
|
||||
&mut block,
|
||||
&transaction,
|
||||
&empty_ctx,
|
||||
)?;
|
||||
|
||||
let inscriptions_revealed = get_inscriptions_revealed_in_block(&block)
|
||||
.iter()
|
||||
.map(|d| d.inscription_number.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Processing block #{} through {} predicate (inscriptions revealed: [{}])",
|
||||
cursor,
|
||||
predicate_spec.uuid,
|
||||
inscriptions_revealed.join(", ")
|
||||
);
|
||||
}
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Processing block #{current_block_height} through {} predicate (inscriptions revealed: [{}])",
|
||||
predicate_spec.uuid,
|
||||
inscriptions_revealed.join(", ")
|
||||
);
|
||||
|
||||
match process_block_with_predicates(
|
||||
block,
|
||||
@@ -188,12 +156,12 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
}
|
||||
|
||||
if let PredicatesApi::On(ref api_config) = config.http_api {
|
||||
if blocks_scanned % 50 == 0 {
|
||||
if number_of_blocks_scanned % 50 == 0 {
|
||||
let status = PredicateStatus::Scanning(ScanningData {
|
||||
start_block,
|
||||
end_block,
|
||||
cursor,
|
||||
occurrences_found,
|
||||
number_of_blocks_to_scan,
|
||||
number_of_blocks_scanned,
|
||||
number_of_blocks_sent,
|
||||
current_block_height,
|
||||
});
|
||||
let mut predicates_db_conn =
|
||||
open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
|
||||
@@ -206,9 +174,13 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
}
|
||||
}
|
||||
|
||||
if cursor == end_block && floating_end_block {
|
||||
end_block = match bitcoin_rpc.get_blockchain_info() {
|
||||
Ok(result) => result.blocks - 1,
|
||||
if block_heights_to_scan.is_empty() && floating_end_block {
|
||||
match bitcoin_rpc.get_blockchain_info() {
|
||||
Ok(result) => {
|
||||
for entry in (current_block_height + 1)..result.blocks {
|
||||
block_heights_to_scan.push_back(entry);
|
||||
}
|
||||
}
|
||||
Err(_e) => {
|
||||
continue;
|
||||
}
|
||||
@@ -217,15 +189,15 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
}
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
|
||||
"{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
|
||||
);
|
||||
|
||||
if let PredicatesApi::On(ref api_config) = config.http_api {
|
||||
let status = PredicateStatus::Scanning(ScanningData {
|
||||
start_block,
|
||||
end_block,
|
||||
cursor,
|
||||
occurrences_found,
|
||||
number_of_blocks_to_scan,
|
||||
number_of_blocks_scanned,
|
||||
number_of_blocks_sent,
|
||||
current_block_height: 0,
|
||||
});
|
||||
let mut predicates_db_conn = open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
|
||||
update_predicate_status(&predicate_spec.key(), status, &mut predicates_db_conn, &ctx)
|
||||
|
||||
@@ -595,10 +595,10 @@ pub enum PredicateStatus {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ScanningData {
|
||||
pub start_block: u64,
|
||||
pub cursor: u64,
|
||||
pub end_block: u64,
|
||||
pub occurrences_found: u64,
|
||||
pub number_of_blocks_to_scan: u64,
|
||||
pub number_of_blocks_scanned: u64,
|
||||
pub number_of_blocks_sent: u64,
|
||||
pub current_block_height: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
||||
Reference in New Issue
Block a user