feat: ability to track updates when scanning bitcoin (+refactor)

This commit is contained in:
Ludo Galabru
2023-06-12 15:49:12 -04:00
parent 7d9e179464
commit 9e54bfff35
9 changed files with 176 additions and 182 deletions

View File

@@ -896,7 +896,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
},
Command::Stacks(subcmd) => match subcmd {
StacksCommand::Db(StacksDbCommand::GetBlock(cmd)) => {
let mut config = Config::default(false, false, false, &cmd.config_path)?;
let config = Config::default(false, false, false, &cmd.config_path)?;
let stacks_db = open_readonly_stacks_db_conn(&config.expected_cache_path(), &ctx)
.expect("unable to read stacks_db");
match get_stacks_block_at_block_height(cmd.block_height, true, 10, &stacks_db) {

View File

@@ -1,5 +1,9 @@
use crate::archive::download_ordinals_dataset_if_required;
use crate::config::Config;
use crate::config::{Config, PredicatesApi};
use crate::service::{
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
ScanningData,
};
use chainhook_sdk::bitcoincore_rpc::RpcApi;
use chainhook_sdk::bitcoincore_rpc::{Auth, Client};
use chainhook_sdk::chainhooks::bitcoin::{
@@ -8,10 +12,9 @@ use chainhook_sdk::chainhooks::bitcoin::{
};
use chainhook_sdk::chainhooks::types::{BitcoinChainhookSpecification, BitcoinPredicateType};
use chainhook_sdk::hord::db::{
fetch_and_cache_blocks_in_hord_db, find_all_inscriptions, find_last_block_inserted,
find_lazy_block_at_block_height, open_readonly_hord_db_conn,
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db,
fetch_and_cache_blocks_in_hord_db, find_all_inscriptions_in_block, find_last_block_inserted,
find_lazy_block_at_block_height, get_any_entry_in_ordinal_activities,
open_readonly_hord_db_conn, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
};
use chainhook_sdk::hord::{
get_inscriptions_revealed_in_block,
@@ -71,31 +74,13 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
// Are we dealing with an ordinals-based predicate?
// If so, we could use the ordinal storage to provide a set of hints.
let mut inscriptions_cache = BTreeMap::new();
let mut is_predicate_evaluating_ordinals = false;
let mut hord_blocks_requires_update = false;
let mut inscriptions_db_conn = None;
if let BitcoinPredicateType::OrdinalsProtocol(_) = &predicate_spec.predicate {
is_predicate_evaluating_ordinals = true;
if let Ok(inscriptions_db_conn) =
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)
{
inscriptions_cache = find_all_inscriptions(&inscriptions_db_conn);
// Will we have to update the blocks table?
if let Ok(blocks_db) =
open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
{
if find_lazy_block_at_block_height(end_block as u32, 3, &blocks_db).is_none() {
hord_blocks_requires_update = true;
}
}
}
}
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
// Do we need a seeded hord db?
if is_predicate_evaluating_ordinals && inscriptions_cache.is_empty() {
// Do we need to update the blocks table first?
if hord_blocks_requires_update {
if find_lazy_block_at_block_height(end_block as u32, 3, &blocks_db_rw).is_none() {
// Count how many entries in the table
// Compute the right interval
// Start the build local storage routine
@@ -103,16 +88,13 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
// TODO: make sure that we have a contiguous chain
// check_compacted_blocks_chain_integrity(&hord_db_conn);
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
let start_block = find_last_block_inserted(&blocks_db_rw) as u64;
if start_block < end_block {
warn!(
ctx.expect_logger(),
"Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks",
(end_block - start_block)
);
ctx.expect_logger(),
"Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks",
(end_block - start_block)
);
let inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), ctx)?;
@@ -126,10 +108,13 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
&ctx,
)
.await?;
inscriptions_cache = find_all_inscriptions(&inscriptions_db_conn_rw);
}
}
inscriptions_db_conn = Some(open_readonly_hord_db_conn(
&config.expected_cache_path(),
ctx,
)?);
}
info!(
@@ -139,21 +124,48 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
let mut blocks_scanned = 0;
let mut actions_triggered = 0;
let mut occurrences_found = 0u64;
let mut err_count = 0;
let event_observer_config = config.get_event_observer_config();
let bitcoin_config = event_observer_config.get_bitcoin_config();
let mut traversals = HashMap::new();
if is_predicate_evaluating_ordinals {
let hord_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;
let mut storage = Storage::Memory(BTreeMap::new());
let mut cursor = start_block.saturating_sub(1);
while cursor <= end_block {
cursor += 1;
let mut storage = Storage::Memory(BTreeMap::new());
let mut cursor = start_block.saturating_sub(1);
while cursor <= end_block {
cursor += 1;
blocks_scanned += 1;
let block_hash = retrieve_block_hash_with_retry(&cursor, &bitcoin_config, ctx).await?;
let block_breakdown =
download_and_parse_block_with_retry(&block_hash, &bitcoin_config, ctx).await?;
let mut block = match indexer::bitcoin::standardize_bitcoin_block(
block_breakdown,
&event_observer_config.bitcoin_network,
ctx,
) {
Ok(data) => data,
Err(e) => {
warn!(
ctx.expect_logger(),
"Unable to standardize block#{} {}: {}", cursor, block_hash, e
);
continue;
}
};
if let Some(ref inscriptions_db_conn) = inscriptions_db_conn {
if !get_any_entry_in_ordinal_activities(&cursor, &inscriptions_db_conn, &ctx) {}
// Evaluating every single block is required for also keeping track of transfers.
let local_traverals = match inscriptions_cache.remove(&cursor) {
let local_traverals = match find_all_inscriptions_in_block(
&cursor,
&inscriptions_db_conn,
)
.remove(&cursor)
{
Some(entry) => entry,
None => vec![],
};
@@ -161,31 +173,11 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
traversals.insert(transaction_identifier, traversal_result);
}
blocks_scanned += 1;
let block_hash = retrieve_block_hash_with_retry(&cursor, &bitcoin_config, ctx).await?;
let block_breakdown =
download_and_parse_block_with_retry(&block_hash, &bitcoin_config, ctx).await?;
let mut block = match indexer::bitcoin::standardize_bitcoin_block(
block_breakdown,
&event_observer_config.bitcoin_network,
ctx,
) {
Ok(data) => data,
Err(e) => {
warn!(
ctx.expect_logger(),
"Unable to standardize block#{} {}: {}", cursor, block_hash, e
);
continue;
}
};
let _ = update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
&mut block,
&mut storage,
&traversals,
&hord_db_conn,
&inscriptions_db_conn,
&ctx,
)?;
@@ -200,19 +192,6 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();
let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});
let (predicates_triggered, _predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(
&chain_event,
vec![&predicate_spec],
ctx,
);
info!(
ctx.expect_logger(),
"Processing block #{} through {} predicate (inscriptions revealed: [{}])",
@@ -220,89 +199,53 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
predicate_spec.uuid,
inscriptions_revealed.join(", ")
);
match execute_predicates_action(predicates_triggered, &event_observer_config, &ctx)
.await
{
Ok(actions) => actions_triggered += actions,
Err(_) => err_count += 1,
}
if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
if cursor == end_block && floating_end_block {
end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks - 1,
Err(_e) => {
continue;
}
};
}
}
} else {
let use_scan_to_seed_hord_db = true;
if use_scan_to_seed_hord_db {
// Start ingestion pipeline
}
let mut cursor = start_block.saturating_sub(1);
while cursor <= end_block {
cursor += 1;
blocks_scanned += 1;
let block_hash = retrieve_block_hash_with_retry(&cursor, &bitcoin_config, ctx).await?;
let block_breakdown =
download_and_parse_block_with_retry(&block_hash, &bitcoin_config, ctx).await?;
let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});
let block = match indexer::bitcoin::standardize_bitcoin_block(
block_breakdown,
&event_observer_config.bitcoin_network,
ctx,
) {
Ok(data) => data,
Err(e) => {
warn!(
ctx.expect_logger(),
"Unable to standardize block#{} {}: {}", cursor, block_hash, e
);
let (predicates_triggered, _predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(&chain_event, vec![&predicate_spec], ctx);
occurrences_found += predicates_triggered.len() as u64;
if let PredicatesApi::On(ref api_config) = config.http_api {
if blocks_scanned % 50 == 0 {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
});
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
update_predicate_status(
&predicate_spec.key(),
status,
&mut predicates_db_conn,
&ctx,
)
}
}
match execute_predicates_action(predicates_triggered, &event_observer_config, &ctx).await {
Ok(actions) => actions_triggered += actions,
Err(_) => err_count += 1,
}
if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
if cursor == end_block && floating_end_block {
end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks - 1,
Err(_e) => {
continue;
}
};
let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});
let (predicates_triggered, _predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(
&chain_event,
vec![&predicate_spec],
ctx,
);
match execute_predicates_action(predicates_triggered, &event_observer_config, &ctx)
.await
{
Ok(actions) => actions_triggered += actions,
Err(_) => err_count += 1,
}
if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
if cursor == end_block && floating_end_block {
end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks - 1,
Err(_e) => {
continue;
}
};
}
}
}
info!(
@@ -310,6 +253,17 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
);
if let PredicatesApi::On(ref api_config) = config.http_api {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
});
let mut predicates_db_conn = open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
update_predicate_status(&predicate_spec.key(), status, &mut predicates_db_conn, &ctx)
}
Ok(())
}

View File

@@ -248,13 +248,14 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
ctx.expect_logger(),
"{blocks_scanned} blocks scanned, {occurrences_found} occurrences found"
);
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
});
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
});
update_predicate_status(&predicate_spec.key(), status, predicates_db_conn, &ctx)
}
Ok(last_block_scanned)

View File

@@ -28,7 +28,7 @@ pub fn start_stacks_scan_runloop(
ctx: &Context,
) {
let stacks_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_stacks_scans);
while let Ok(mut predicate_spec) = stacks_scan_op_rx.recv() {
while let Ok(predicate_spec) = stacks_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx.clone();
@@ -84,7 +84,6 @@ pub fn start_stacks_scan_runloop(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", last_block_scanned.index
);
predicate_spec.end_block = Some(last_block_scanned.index);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
));
@@ -120,6 +119,21 @@ pub fn start_bitcoin_scan_runloop(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
// Update predicate status in redis
if let PredicatesApi::On(ref api_config) = moved_config.http_api {
let status = PredicateStatus::Interrupted(format!(
"Unable to evaluate predicate on Bitcoin chainstate: {e}"
));
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &moved_ctx);
update_predicate_status(
&predicate_spec.key(),
status,
&mut predicates_db_conn,
&moved_ctx,
);
}
return;
}
};

View File

@@ -252,6 +252,12 @@ pub struct BitcoinChainhookSpecification {
pub enabled: bool,
}
impl BitcoinChainhookSpecification {
pub fn key(&self) -> String {
ChainhookSpecification::bitcoin_key(&self.uuid)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
#[serde(rename_all = "snake_case", tag = "chain")]
pub enum ChainhookFullSpecification {

View File

@@ -306,7 +306,7 @@ pub fn insert_entry_in_inscriptions(
}
}
pub fn insert_entry_in_transfers(
pub fn insert_entry_in_ordinal_activities(
block_height: u32,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
@@ -319,6 +319,22 @@ pub fn insert_entry_in_transfers(
}
}
pub fn get_any_entry_in_ordinal_activities(
block_height: &u64,
inscriptions_db_conn: &Connection,
_ctx: &Context,
) -> bool {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT block_height FROM transfers WHERE block_height = ?")
.unwrap();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(_)) = rows.next() {
return true;
}
false
}
pub fn update_transfered_inscription(
inscription_id: &str,
outpoint_post_transfer: &str,
@@ -454,12 +470,13 @@ pub fn find_inscription_with_id(
return None;
}
pub fn find_all_inscriptions(
pub fn find_all_inscriptions_in_block(
block_height: &u64,
inscriptions_db_conn: &Connection,
) -> BTreeMap<u64, Vec<(TransactionIdentifier, TraversalResult)>> {
let args: &[&dyn ToSql] = &[];
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT inscription_number, ordinal_number, block_height, inscription_id, offset, outpoint_to_watch FROM inscriptions ORDER BY inscription_number ASC")
.prepare("SELECT inscription_number, ordinal_number, block_height, inscription_id, offset, outpoint_to_watch FROM inscriptions where block_height = ? ORDER BY inscription_number ASC")
.unwrap();
let mut results: BTreeMap<u64, Vec<(TransactionIdentifier, TraversalResult)>> = BTreeMap::new();
let mut rows = stmt.query(args).unwrap();

View File

@@ -30,8 +30,9 @@ use crate::{
hord::{
db::{
find_inscription_with_ordinal_number, find_inscriptions_at_wached_outpoint,
insert_entry_in_blocks, insert_entry_in_inscriptions, insert_entry_in_transfers,
retrieve_satoshi_point_using_lazy_storage, update_transfered_inscription,
insert_entry_in_blocks, insert_entry_in_inscriptions,
insert_entry_in_ordinal_activities, retrieve_satoshi_point_using_lazy_storage,
update_transfered_inscription,
},
ord::height::Height,
},
@@ -404,7 +405,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
)?;
if any_inscription_revealed || any_inscription_transferred {
insert_entry_in_transfers(
insert_entry_in_ordinal_activities(
new_block.block_identifier.index as u32,
inscriptions_db_conn_rw,
ctx,
@@ -424,14 +425,14 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
block: &mut BitcoinBlockData,
storage: &mut Storage,
traversals: &HashMap<TransactionIdentifier, TraversalResult>,
inscription_db_conn: &Connection,
inscriptions_db_conn: &Connection,
ctx: &Context,
) -> Result<bool, String> {
let mut storage_updated = false;
let mut latest_inscription_number = match find_latest_inscription_number_at_block_height(
&block.block_identifier.index,
&inscription_db_conn,
&inscriptions_db_conn,
&ctx,
)? {
None => 0,
@@ -441,7 +442,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
let mut latest_cursed_inscription_number =
match find_latest_cursed_inscription_number_at_block_height(
&block.block_identifier.index,
&inscription_db_conn,
&inscriptions_db_conn,
&ctx,
)? {
None => -1,
@@ -505,7 +506,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
if let Some(_entry) = find_inscription_with_ordinal_number(
&traversal.ordinal_number,
&inscription_db_conn,
&inscriptions_db_conn,
&ctx,
) {
ctx.try_log(|logger| {

View File

@@ -1317,4 +1317,5 @@ pub async fn start_observer_commands_handler(
}
Ok(())
}
#[cfg(test)]pub mod tests;
#[cfg(test)]
pub mod tests;

View File

@@ -41,7 +41,7 @@ fn generate_test_config() -> (EventObserverConfig, ChainhookStore) {
stacks_network: StacksNetwork::Devnet,
hord_config: None,
};
let predicates = ChainhookConfig::new();
let predicates = ChainhookConfig::new();
let chainhook_store = ChainhookStore { predicates };
(config, chainhook_store)
}
@@ -122,14 +122,14 @@ fn generate_and_register_new_stacks_chainhook(
let contract_identifier = format!("{}.{}", accounts::deployer_stx_address(), contract_name);
let chainhook = stacks_chainhook_contract_call(id, &contract_identifier, None, method);
let _ = observer_commands_tx.send(ObserverCommand::RegisterPredicate(
ChainhookFullSpecification::Stacks(chainhook.clone())
ChainhookFullSpecification::Stacks(chainhook.clone()),
));
let mut chainhook = chainhook
.into_selected_network_specification(&StacksNetwork::Devnet)
.unwrap();
chainhook.enabled = true;
let _ = observer_commands_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(chainhook.clone())
ChainhookSpecification::Stacks(chainhook.clone()),
));
assert!(match observer_events_rx.recv() {
Ok(ObserverEvent::PredicateRegistered(registered_chainhook)) => {
@@ -142,7 +142,7 @@ fn generate_and_register_new_stacks_chainhook(
_ => false,
});
let _ = observer_commands_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(chainhook.clone())
ChainhookSpecification::Stacks(chainhook.clone()),
));
assert!(match observer_events_rx.recv() {
Ok(ObserverEvent::PredicateEnabled(registered_chainhook)) => {
@@ -166,14 +166,14 @@ fn generate_and_register_new_bitcoin_chainhook(
) -> BitcoinChainhookSpecification {
let chainhook = bitcoin_chainhook_p2pkh(id, &p2pkh_address, expire_after_occurrence);
let _ = observer_commands_tx.send(ObserverCommand::RegisterPredicate(
ChainhookFullSpecification::Bitcoin(chainhook.clone())
ChainhookFullSpecification::Bitcoin(chainhook.clone()),
));
let mut chainhook = chainhook
.into_selected_network_specification(&BitcoinNetwork::Regtest)
.unwrap();
chainhook.enabled = true;
let _ = observer_commands_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Bitcoin(chainhook.clone())
ChainhookSpecification::Bitcoin(chainhook.clone()),
));
assert!(match observer_events_rx.recv() {
Ok(ObserverEvent::PredicateRegistered(registered_chainhook)) => {
@@ -244,7 +244,7 @@ fn test_stacks_chainhook_register_deregister() {
Ok(ObserverEvent::PredicateEnabled(_)) => true,
_ => false,
});
// Should signal that no hook were triggered
assert!(match observer_events_rx.recv() {
Ok(ObserverEvent::HooksTriggered(len)) => {