feat: logic to start ingestion during indexing

This commit is contained in:
Ludo Galabru
2023-06-30 14:12:48 -04:00
parent 135297e978
commit 3c1c99df5d
5 changed files with 256 additions and 156 deletions

View File

@@ -10,22 +10,20 @@ use crate::storage::{
open_readonly_stacks_db_conn,
};
use chainhook_sdk::bitcoincore_rpc::{Auth, Client, RpcApi};
use chainhook_sdk::chainhooks::types::{
BitcoinChainhookFullSpecification, BitcoinChainhookNetworkSpecification, BitcoinPredicateType, ChainhookFullSpecification, FileHook,
HookAction, OrdinalOperations, StacksChainhookFullSpecification,
StacksChainhookNetworkSpecification, StacksPredicate, StacksPrintEventBasedPredicate,
};
use chainhook_sdk::hord::db::{
delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db, find_last_block_inserted,
find_lazy_block_at_block_height, find_watched_satpoint_for_inscription, initialize_hord_db,
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
delete_data_in_hord_db, find_last_block_inserted, find_lazy_block_at_block_height,
find_watched_satpoint_for_inscription, initialize_hord_db, open_readonly_hord_db_conn,
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db, retrieve_satoshi_point_using_lazy_storage,
};
use chainhook_sdk::hord::{
new_traversals_lazy_cache, retrieve_inscribed_satoshi_points_from_block,
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data, HordConfig, Storage,
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data, Storage,
};
use chainhook_sdk::indexer;
use chainhook_sdk::indexer::bitcoin::{
@@ -474,35 +472,6 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
info!(ctx.expect_logger(), "Starting service...",);
if !cmd.hord_disabled {
info!(
ctx.expect_logger(),
"Ordinal indexing is enabled by default hord, checking index... (use --no-hord to disable ordinals)"
);
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
if start_block == 0 {
info!(
ctx.expect_logger(),
"Initializing hord indexing from block #{}", start_block
);
} else {
info!(
ctx.expect_logger(),
"Resuming hord indexing from block #{}", start_block
);
}
perform_hord_db_update(
start_block,
end_block,
&config.get_hord_config(),
&config,
&ctx,
)
.await?;
}
}
let mut service = Service::new(config, ctx);
return service.run(predicates, cmd.hord_disabled).await;
}
@@ -673,7 +642,8 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
));
}
};
// TODO: if a stacks.rocksdb is present, use it.
// TODO: update Stacks archive file if required.
scan_stacks_chainstate_via_csv_using_predicate(
&predicate_spec,
&mut config,
@@ -731,11 +701,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
find_last_block_inserted(&hord_db_conn) as u64
};
if cmd.block_height > tip_height {
perform_hord_db_update(
crate::hord::perform_hord_db_update(
tip_height,
cmd.block_height,
&config.get_hord_config(),
&config,
None,
&ctx,
)
.await?;
@@ -802,11 +773,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
let tip_height = find_last_block_inserted(&blocks_db_conn) as u64;
let _end_at = match cmd.block_height {
Some(block_height) if block_height > tip_height => {
perform_hord_db_update(
crate::hord::perform_hord_db_update(
tip_height,
block_height,
&config.get_hord_config(),
&config,
None,
&ctx,
)
.await?;
@@ -860,7 +832,9 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
Command::Hord(HordCommand::Db(subcmd)) => match subcmd {
HordDbCommand::Sync(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
if let Some((start_block, end_block)) =
crate::hord::should_sync_hord_db(&config, &ctx)?
{
if start_block == 0 {
info!(
ctx.expect_logger(),
@@ -872,11 +846,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
"Resuming hord indexing from block #{}", start_block
);
}
perform_hord_db_update(
crate::hord::perform_hord_db_update(
start_block,
end_block,
&config.get_hord_config(),
&config,
None,
&ctx,
)
.await?;
@@ -902,11 +877,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
)?;
}
// Update data
perform_hord_db_update(
crate::hord::perform_hord_db_update(
cmd.start_block,
cmd.end_block,
&config.get_hord_config(),
&config,
None,
&ctx,
)
.await?;
@@ -1023,87 +999,6 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
Ok(())
}
pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
);
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
Ok(con) => con,
Err(message) => {
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
}
};
let start_block = match open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
{
Ok(blocks_db) => find_last_block_inserted(&blocks_db) as u64,
Err(err) => {
warn!(ctx.expect_logger(), "{}", err);
0
}
};
if start_block == 0 {
let _ = initialize_hord_db(&config.expected_cache_path(), &ctx);
}
let end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};
if start_block < end_block {
Ok(Some((start_block, end_block)))
} else {
Ok(None)
}
}
pub async fn perform_hord_db_update(
start_block: u64,
end_block: u64,
hord_config: &HordConfig,
config: &Config,
ctx: &Context,
) -> Result<(), String> {
info!(
ctx.expect_logger(),
"Syncing hord_db: {} blocks to download ({start_block}: {end_block})",
end_block - start_block + 1
);
let bitcoin_config = BitcoinConfig {
username: config.network.bitcoind_rpc_username.clone(),
password: config.network.bitcoind_rpc_password.clone(),
rpc_url: config.network.bitcoind_rpc_url.clone(),
network: config.network.bitcoin_network.clone(),
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
};
let blocks_db = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let _ = fetch_and_cache_blocks_in_hord_db(
&bitcoin_config,
&blocks_db,
&inscriptions_db_conn_rw,
start_block,
end_block,
hord_config,
&ctx,
)
.await?;
Ok(())
}
#[allow(dead_code)]
pub fn install_ctrlc_handler(terminate_tx: Sender<DigestingCommand>, ctx: Context) {
ctrlc::set_handler(move || {

View File

@@ -0,0 +1,113 @@
use std::sync::mpsc::Sender;
use chainhook_sdk::{
bitcoincore_rpc::{Auth, Client, RpcApi},
hord::{
db::{
fetch_and_cache_blocks_in_hord_db, find_last_block_inserted, initialize_hord_db,
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db, find_latest_inscription_block_height, open_readonly_hord_db_conn,
},
HordConfig,
},
observer::BitcoinConfig,
utils::Context,
};
use chainhook_types::BitcoinBlockData;
use crate::config::Config;
pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
);
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
Ok(con) => con,
Err(message) => {
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
}
};
let mut start_block = match open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
{
Ok(blocks_db) => find_last_block_inserted(&blocks_db) as u64,
Err(err) => {
warn!(ctx.expect_logger(), "{}", err);
0
}
};
if start_block == 0 {
let _ = initialize_hord_db(&config.expected_cache_path(), &ctx);
}
let inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;
match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)? {
Some(height) => {
start_block = start_block.min(height);
}
None => {
start_block = start_block.min(config.get_hord_config().first_inscription_height);
}
};
let end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};
if start_block < end_block {
Ok(Some((start_block, end_block)))
} else {
Ok(None)
}
}
pub async fn perform_hord_db_update(
start_block: u64,
end_block: u64,
hord_config: &HordConfig,
config: &Config,
block_post_processor: Option<Sender<BitcoinBlockData>>,
ctx: &Context,
) -> Result<(), String> {
info!(
ctx.expect_logger(),
"Syncing hord_db: {} blocks to download ({start_block}: {end_block})",
end_block - start_block + 1
);
let bitcoin_config = BitcoinConfig {
username: config.network.bitcoind_rpc_username.clone(),
password: config.network.bitcoind_rpc_password.clone(),
rpc_url: config.network.bitcoind_rpc_url.clone(),
network: config.network.bitcoin_network.clone(),
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
};
let blocks_db = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let _ = fetch_and_cache_blocks_in_hord_db(
&bitcoin_config,
&blocks_db,
&inscriptions_db_conn_rw,
start_block,
end_block,
hord_config,
block_post_processor,
&ctx,
)
.await?;
Ok(())
}

View File

@@ -16,6 +16,7 @@ pub mod archive;
pub mod block;
pub mod cli;
pub mod config;
pub mod hord;
pub mod scan;
pub mod service;
pub mod storage;

View File

@@ -25,7 +25,7 @@ use chainhook_sdk::indexer::bitcoin::{
};
use chainhook_sdk::observer::{gather_proofs, EventObserverConfig};
use chainhook_sdk::utils::{file_append, send_request, Context};
use chainhook_types::{BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData};
use chainhook_types::{BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData};
use std::collections::{BTreeMap, HashMap};
pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
@@ -137,7 +137,10 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
};
for (transaction_identifier, traversal_result) in local_traverals.into_iter() {
traversals.insert(
(transaction_identifier, traversal_result.input_index),
(
transaction_identifier,
traversal_result.inscription_input_index,
),
traversal_result,
);
}
@@ -170,15 +173,21 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
);
}
let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});
match process_block_with_predicates(
block,
&vec![&predicate_spec],
&event_observer_config,
ctx,
)
.await
{
Ok(actions) => actions_triggered += actions,
Err(_) => err_count += 1,
}
let (predicates_triggered, _predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(&chain_event, vec![&predicate_spec], ctx);
occurrences_found += predicates_triggered.len() as u64;
if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
if let PredicatesApi::On(ref api_config) = config.http_api {
if blocks_scanned % 50 == 0 {
@@ -199,15 +208,6 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
}
}
match execute_predicates_action(predicates_triggered, &event_observer_config, &ctx).await {
Ok(actions) => actions_triggered += actions,
Err(_) => err_count += 1,
}
if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
if cursor == end_block && floating_end_block {
end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks - 1,
@@ -236,6 +236,24 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
Ok(())
}
pub async fn process_block_with_predicates(
block: BitcoinBlockData,
predicates: &Vec<&BitcoinChainhookSpecification>,
event_observer_config: &EventObserverConfig,
ctx: &Context,
) -> Result<u32, ()> {
let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});
let (predicates_triggered, _predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(&chain_event, predicates, ctx);
execute_predicates_action(predicates_triggered, &event_observer_config, &ctx).await
}
pub async fn execute_predicates_action<'a>(
hits: Vec<BitcoinTriggerChainhook<'a>>,
config: &EventObserverConfig,

View File

@@ -2,6 +2,8 @@ mod http_api;
mod runloops;
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::hord::should_sync_hord_db;
use crate::scan::bitcoin::process_block_with_predicates;
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
@@ -116,22 +118,71 @@ impl Service {
if !hord_disabled {
// TODO: add flag
// let _ = download_ordinals_dataset_if_required(&mut self.config, &self.ctx).await;
}
// Start chainhook event observer
let context_cloned = self.ctx.clone();
let event_observer_config_moved = event_observer_config.clone();
let observer_command_tx_moved = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("Chainhook event observer").spawn(move || {
let future = start_event_observer(
event_observer_config_moved,
observer_command_tx_moved,
observer_command_rx,
Some(observer_event_tx),
context_cloned,
info!(
self.ctx.expect_logger(),
"Ordinal indexing is enabled by default, checking index... (use --no-hord to disable ordinals)"
);
let _ = hiro_system_kit::nestable_block_on(future);
});
if let Some((start_block, end_block)) = should_sync_hord_db(&self.config, &self.ctx)? {
if start_block == 0 {
info!(
self.ctx.expect_logger(),
"Initializing hord indexing from block #{}", start_block
);
} else {
info!(
self.ctx.expect_logger(),
"Resuming hord indexing from block #{}", start_block
);
}
let (tx, rx) = channel();
let mut moved_event_observer_config = event_observer_config.clone();
let moved_ctx = self.ctx.clone();
let _ = hiro_system_kit::thread_named("Initial predicate processing")
.spawn(move || {
if let Some(mut chainhook_config) =
moved_event_observer_config.chainhook_config.take()
{
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut()
{
bitcoin_predicate.enabled = false;
}
let mut bitcoin_predicates_ref: Vec<
&chainhook_sdk::chainhooks::types::BitcoinChainhookSpecification,
> = vec![];
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter() {
bitcoin_predicates_ref.push(bitcoin_predicate);
}
while let Ok(block) = rx.recv() {
let future = process_block_with_predicates(
block,
&bitcoin_predicates_ref,
&moved_event_observer_config,
&moved_ctx,
);
let res = hiro_system_kit::nestable_block_on(future);
if let Err(_) = res {
error!(moved_ctx.expect_logger(), "Initial ingestion failing");
}
}
}
})
.expect("unable to spawn thread");
crate::hord::perform_hord_db_update(
start_block,
end_block,
&self.config.get_hord_config(),
&self.config,
Some(tx),
&self.ctx,
)
.await?;
}
}
// Stacks scan operation threadpool
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
@@ -193,6 +244,28 @@ impl Service {
);
}
BitcoinBlockSignaling::Stacks(ref _url) => {
// Start chainhook event observer
let context_cloned = self.ctx.clone();
let event_observer_config_moved = event_observer_config.clone();
let observer_command_tx_moved = observer_command_tx.clone();
let _ =
hiro_system_kit::thread_named("Chainhook event observer").spawn(move || {
let future = start_event_observer(
event_observer_config_moved,
observer_command_tx_moved,
observer_command_rx,
Some(observer_event_tx),
context_cloned,
);
let _ = hiro_system_kit::nestable_block_on(future);
});
info!(
self.ctx.expect_logger(),
"Listening on port {} for Stacks chain events",
event_observer_config
.get_stacks_node_config()
.ingestion_port
);
info!(
self.ctx.expect_logger(),
"Observing Bitcoin chain events via Stacks node"