feat: ability to target blocks

This commit is contained in:
Ludo Galabru
2023-08-03 02:12:14 +02:00
parent 8464b82931
commit f6be49e24d
2 changed files with 80 additions and 108 deletions

View File

@@ -1,13 +1,7 @@
use crate::config::{Config, PredicatesApi};
use crate::core::protocol::sequencing::{
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx,
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
};
use crate::core::pipeline::processors::inscription_indexing::re_augment_block_with_ordinals_operations;
use crate::core::{self, get_inscriptions_revealed_in_block};
use crate::db::{
find_all_inscriptions_in_block, get_any_entry_in_ordinal_activities,
open_readonly_hord_db_conn, InscriptionHeigthHint,
};
use crate::db::{get_any_entry_in_ordinal_activities, open_readonly_hord_db_conn};
use crate::download::download_ordinals_dataset_if_required;
use crate::service::{
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
@@ -19,7 +13,7 @@ use chainhook_sdk::chainhooks::bitcoin::{
evaluate_bitcoin_chainhooks_on_chain_event, handle_bitcoin_hook_action,
BitcoinChainhookOccurrence, BitcoinTriggerChainhook,
};
use chainhook_sdk::chainhooks::types::{BitcoinChainhookSpecification, BitcoinPredicateType};
use chainhook_sdk::chainhooks::types::BitcoinChainhookSpecification;
use chainhook_sdk::indexer::bitcoin::{
build_http_client, download_and_parse_block_with_retry, retrieve_block_hash_with_retry,
};
@@ -27,7 +21,7 @@ use chainhook_sdk::observer::{gather_proofs, EventObserverConfig};
use chainhook_sdk::types::{
BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData,
};
use chainhook_sdk::utils::{file_append, send_request, Context};
use chainhook_sdk::utils::{file_append, send_request, BlockHeights, Context};
use std::collections::HashMap;
// TODO(lgalabru): Re-introduce support for blocks[] !!! gracefully handle hints for non consecutive blocks
@@ -49,72 +43,69 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
}
};
let mut floating_end_block = false;
let start_block = match predicate_spec.start_block {
Some(start_block) => start_block,
None => {
return Err(
"Bitcoin chainhook specification must include a field start_block in replay mode"
.into(),
);
}
};
let (mut end_block, floating_end_block) = match predicate_spec.end_block {
Some(end_block) => (end_block, false),
None => match bitcoin_rpc.get_blockchain_info() {
Ok(result) => (result.blocks - 1, true),
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
let mut block_heights_to_scan = if let Some(ref blocks) = predicate_spec.blocks {
BlockHeights::Blocks(blocks.clone()).get_sorted_entries()
} else {
let start_block = match predicate_spec.start_block {
Some(start_block) => start_block,
None => {
return Err(
"Bitcoin chainhook specification must include a field start_block in replay mode"
.into(),
);
}
},
};
let (end_block, update_end_block) = match predicate_spec.end_block {
Some(end_block) => (end_block, false),
None => match bitcoin_rpc.get_blockchain_info() {
Ok(result) => (result.blocks - 1, true),
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
},
};
floating_end_block = update_end_block;
BlockHeights::BlockRange(start_block, end_block).get_sorted_entries()
};
// Are we dealing with an ordinals-based predicate?
// If so, we could use the ordinal storage to provide a set of hints.
let mut inscriptions_db_conn = None;
if let BitcoinPredicateType::OrdinalsProtocol(_) = &predicate_spec.predicate {
inscriptions_db_conn = Some(open_readonly_hord_db_conn(
&config.expected_cache_path(),
ctx,
)?);
}
let inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
"Starting predicate evaluation on Bitcoin blocks",
);
let mut blocks_scanned = 0;
let mut actions_triggered = 0;
let occurrences_found = 0u64;
let mut err_count = 0;
let event_observer_config = config.get_event_observer_config();
let bitcoin_config = event_observer_config.get_bitcoin_config();
let mut traversals = HashMap::new();
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
let mut number_of_blocks_scanned = 0;
let mut number_of_blocks_sent = 0u64;
let http_client = build_http_client();
let mut cursor = start_block.saturating_sub(1);
while let Some(current_block_height) = block_heights_to_scan.pop_front() {
number_of_blocks_scanned += 1;
let mut inscription_height_hint = InscriptionHeigthHint::new();
while cursor <= end_block {
cursor += 1;
blocks_scanned += 1;
if let Some(ref inscriptions_db_conn) = inscriptions_db_conn {
if !get_any_entry_in_ordinal_activities(&cursor, &inscriptions_db_conn, &ctx) {
continue;
}
if !get_any_entry_in_ordinal_activities(&current_block_height, &inscriptions_db_conn, &ctx)
{
continue;
}
let block_hash =
retrieve_block_hash_with_retry(&http_client, &cursor, &bitcoin_config, ctx).await?;
let block_hash = retrieve_block_hash_with_retry(
&http_client,
&current_block_height,
&bitcoin_config,
ctx,
)
.await?;
let block_breakdown =
download_and_parse_block_with_retry(&http_client, &block_hash, &bitcoin_config, ctx)
.await?;
@@ -127,49 +118,26 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
Err((e, _)) => {
warn!(
ctx.expect_logger(),
"Unable to standardize block#{} {}: {}", cursor, block_hash, e
"Unable to standardize block#{} {}: {}", current_block_height, block_hash, e
);
continue;
}
};
if let Some(ref mut inscriptions_db_conn) = inscriptions_db_conn {
// Evaluating every single block is required for also keeping track of transfers.
let local_traverals =
find_all_inscriptions_in_block(&cursor, &inscriptions_db_conn, &ctx);
for (key, traversal_result) in local_traverals.into_iter() {
traversals.insert(key, traversal_result);
}
let empty_ctx = Context::empty();
re_augment_block_with_ordinals_operations(&mut block, &inscriptions_db_conn, &empty_ctx);
let transaction = inscriptions_db_conn.transaction().unwrap();
let empty_ctx = Context::empty();
let _ = update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
&mut block,
&transaction,
&traversals,
&mut inscription_height_hint,
&empty_ctx,
)?;
let inscriptions_revealed = get_inscriptions_revealed_in_block(&block)
.iter()
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();
let _ = update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
&mut block,
&transaction,
&empty_ctx,
)?;
let inscriptions_revealed = get_inscriptions_revealed_in_block(&block)
.iter()
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();
info!(
ctx.expect_logger(),
"Processing block #{} through {} predicate (inscriptions revealed: [{}])",
cursor,
predicate_spec.uuid,
inscriptions_revealed.join(", ")
);
}
info!(
ctx.expect_logger(),
"Processing block #{current_block_height} through {} predicate (inscriptions revealed: [{}])",
predicate_spec.uuid,
inscriptions_revealed.join(", ")
);
match process_block_with_predicates(
block,
@@ -188,12 +156,12 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
}
if let PredicatesApi::On(ref api_config) = config.http_api {
if blocks_scanned % 50 == 0 {
if number_of_blocks_scanned % 50 == 0 {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
number_of_blocks_to_scan,
number_of_blocks_scanned,
number_of_blocks_sent,
current_block_height,
});
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
@@ -206,9 +174,13 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
}
}
if cursor == end_block && floating_end_block {
end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks - 1,
if block_heights_to_scan.is_empty() && floating_end_block {
match bitcoin_rpc.get_blockchain_info() {
Ok(result) => {
for entry in (current_block_height + 1)..result.blocks {
block_heights_to_scan.push_back(entry);
}
}
Err(_e) => {
continue;
}
@@ -217,15 +189,15 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
}
info!(
ctx.expect_logger(),
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
"{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
);
if let PredicatesApi::On(ref api_config) = config.http_api {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
number_of_blocks_to_scan,
number_of_blocks_scanned,
number_of_blocks_sent,
current_block_height: 0,
});
let mut predicates_db_conn = open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
update_predicate_status(&predicate_spec.key(), status, &mut predicates_db_conn, &ctx)

View File

@@ -595,10 +595,10 @@ pub enum PredicateStatus {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScanningData {
pub start_block: u64,
pub cursor: u64,
pub end_block: u64,
pub occurrences_found: u64,
pub number_of_blocks_to_scan: u64,
pub number_of_blocks_scanned: u64,
pub number_of_blocks_sent: u64,
pub current_block_height: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]