mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-15 00:49:30 +08:00
feat: logic to start ingestion during indexing
This commit is contained in:
@@ -10,22 +10,20 @@ use crate::storage::{
|
||||
open_readonly_stacks_db_conn,
|
||||
};
|
||||
|
||||
use chainhook_sdk::bitcoincore_rpc::{Auth, Client, RpcApi};
|
||||
|
||||
use chainhook_sdk::chainhooks::types::{
|
||||
BitcoinChainhookFullSpecification, BitcoinChainhookNetworkSpecification, BitcoinPredicateType, ChainhookFullSpecification, FileHook,
|
||||
HookAction, OrdinalOperations, StacksChainhookFullSpecification,
|
||||
StacksChainhookNetworkSpecification, StacksPredicate, StacksPrintEventBasedPredicate,
|
||||
};
|
||||
use chainhook_sdk::hord::db::{
|
||||
delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db, find_last_block_inserted,
|
||||
find_lazy_block_at_block_height, find_watched_satpoint_for_inscription, initialize_hord_db,
|
||||
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
|
||||
delete_data_in_hord_db, find_last_block_inserted, find_lazy_block_at_block_height,
|
||||
find_watched_satpoint_for_inscription, initialize_hord_db, open_readonly_hord_db_conn,
|
||||
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
|
||||
open_readwrite_hord_db_conn_rocks_db, retrieve_satoshi_point_using_lazy_storage,
|
||||
};
|
||||
use chainhook_sdk::hord::{
|
||||
new_traversals_lazy_cache, retrieve_inscribed_satoshi_points_from_block,
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data, HordConfig, Storage,
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data, Storage,
|
||||
};
|
||||
use chainhook_sdk::indexer;
|
||||
use chainhook_sdk::indexer::bitcoin::{
|
||||
@@ -474,35 +472,6 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
|
||||
info!(ctx.expect_logger(), "Starting service...",);
|
||||
|
||||
if !cmd.hord_disabled {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Ordinal indexing is enabled by default hord, checking index... (use --no-hord to disable ordinals)"
|
||||
);
|
||||
|
||||
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
|
||||
if start_block == 0 {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Initializing hord indexing from block #{}", start_block
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Resuming hord indexing from block #{}", start_block
|
||||
);
|
||||
}
|
||||
perform_hord_db_update(
|
||||
start_block,
|
||||
end_block,
|
||||
&config.get_hord_config(),
|
||||
&config,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
let mut service = Service::new(config, ctx);
|
||||
return service.run(predicates, cmd.hord_disabled).await;
|
||||
}
|
||||
@@ -673,7 +642,8 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: if a stacks.rocksdb is present, use it.
|
||||
// TODO: update Stacks archive file if required.
|
||||
scan_stacks_chainstate_via_csv_using_predicate(
|
||||
&predicate_spec,
|
||||
&mut config,
|
||||
@@ -731,11 +701,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
find_last_block_inserted(&hord_db_conn) as u64
|
||||
};
|
||||
if cmd.block_height > tip_height {
|
||||
perform_hord_db_update(
|
||||
crate::hord::perform_hord_db_update(
|
||||
tip_height,
|
||||
cmd.block_height,
|
||||
&config.get_hord_config(),
|
||||
&config,
|
||||
None,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -802,11 +773,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
let tip_height = find_last_block_inserted(&blocks_db_conn) as u64;
|
||||
let _end_at = match cmd.block_height {
|
||||
Some(block_height) if block_height > tip_height => {
|
||||
perform_hord_db_update(
|
||||
crate::hord::perform_hord_db_update(
|
||||
tip_height,
|
||||
block_height,
|
||||
&config.get_hord_config(),
|
||||
&config,
|
||||
None,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -860,7 +832,9 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
Command::Hord(HordCommand::Db(subcmd)) => match subcmd {
|
||||
HordDbCommand::Sync(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
|
||||
if let Some((start_block, end_block)) =
|
||||
crate::hord::should_sync_hord_db(&config, &ctx)?
|
||||
{
|
||||
if start_block == 0 {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
@@ -872,11 +846,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
"Resuming hord indexing from block #{}", start_block
|
||||
);
|
||||
}
|
||||
perform_hord_db_update(
|
||||
crate::hord::perform_hord_db_update(
|
||||
start_block,
|
||||
end_block,
|
||||
&config.get_hord_config(),
|
||||
&config,
|
||||
None,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -902,11 +877,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
)?;
|
||||
}
|
||||
// Update data
|
||||
perform_hord_db_update(
|
||||
crate::hord::perform_hord_db_update(
|
||||
cmd.start_block,
|
||||
cmd.end_block,
|
||||
&config.get_hord_config(),
|
||||
&config,
|
||||
None,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1023,87 +999,6 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
|
||||
let auth = Auth::UserPass(
|
||||
config.network.bitcoind_rpc_username.clone(),
|
||||
config.network.bitcoind_rpc_password.clone(),
|
||||
);
|
||||
|
||||
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
|
||||
Ok(con) => con,
|
||||
Err(message) => {
|
||||
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
|
||||
}
|
||||
};
|
||||
|
||||
let start_block = match open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
|
||||
{
|
||||
Ok(blocks_db) => find_last_block_inserted(&blocks_db) as u64,
|
||||
Err(err) => {
|
||||
warn!(ctx.expect_logger(), "{}", err);
|
||||
0
|
||||
}
|
||||
};
|
||||
|
||||
if start_block == 0 {
|
||||
let _ = initialize_hord_db(&config.expected_cache_path(), &ctx);
|
||||
}
|
||||
|
||||
let end_block = match bitcoin_rpc.get_blockchain_info() {
|
||||
Ok(result) => result.blocks,
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"unable to retrieve Bitcoin chain tip ({})",
|
||||
e.to_string()
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if start_block < end_block {
|
||||
Ok(Some((start_block, end_block)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn perform_hord_db_update(
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
hord_config: &HordConfig,
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Syncing hord_db: {} blocks to download ({start_block}: {end_block})",
|
||||
end_block - start_block + 1
|
||||
);
|
||||
|
||||
let bitcoin_config = BitcoinConfig {
|
||||
username: config.network.bitcoind_rpc_username.clone(),
|
||||
password: config.network.bitcoind_rpc_password.clone(),
|
||||
rpc_url: config.network.bitcoind_rpc_url.clone(),
|
||||
network: config.network.bitcoin_network.clone(),
|
||||
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
|
||||
};
|
||||
|
||||
let blocks_db = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let inscriptions_db_conn_rw = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
|
||||
let _ = fetch_and_cache_blocks_in_hord_db(
|
||||
&bitcoin_config,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
start_block,
|
||||
end_block,
|
||||
hord_config,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn install_ctrlc_handler(terminate_tx: Sender<DigestingCommand>, ctx: Context) {
|
||||
ctrlc::set_handler(move || {
|
||||
|
||||
113
components/chainhook-cli/src/hord/mod.rs
Normal file
113
components/chainhook-cli/src/hord/mod.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
use std::sync::mpsc::Sender;
|
||||
|
||||
use chainhook_sdk::{
|
||||
bitcoincore_rpc::{Auth, Client, RpcApi},
|
||||
hord::{
|
||||
db::{
|
||||
fetch_and_cache_blocks_in_hord_db, find_last_block_inserted, initialize_hord_db,
|
||||
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
|
||||
open_readwrite_hord_db_conn_rocks_db, find_latest_inscription_block_height, open_readonly_hord_db_conn,
|
||||
},
|
||||
HordConfig,
|
||||
},
|
||||
observer::BitcoinConfig,
|
||||
utils::Context,
|
||||
};
|
||||
use chainhook_types::BitcoinBlockData;
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
|
||||
let auth = Auth::UserPass(
|
||||
config.network.bitcoind_rpc_username.clone(),
|
||||
config.network.bitcoind_rpc_password.clone(),
|
||||
);
|
||||
|
||||
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
|
||||
Ok(con) => con,
|
||||
Err(message) => {
|
||||
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
|
||||
}
|
||||
};
|
||||
|
||||
let mut start_block = match open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
|
||||
{
|
||||
Ok(blocks_db) => find_last_block_inserted(&blocks_db) as u64,
|
||||
Err(err) => {
|
||||
warn!(ctx.expect_logger(), "{}", err);
|
||||
0
|
||||
}
|
||||
};
|
||||
|
||||
if start_block == 0 {
|
||||
let _ = initialize_hord_db(&config.expected_cache_path(), &ctx);
|
||||
}
|
||||
|
||||
let inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
|
||||
match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)? {
|
||||
Some(height) => {
|
||||
start_block = start_block.min(height);
|
||||
}
|
||||
None => {
|
||||
start_block = start_block.min(config.get_hord_config().first_inscription_height);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
let end_block = match bitcoin_rpc.get_blockchain_info() {
|
||||
Ok(result) => result.blocks,
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"unable to retrieve Bitcoin chain tip ({})",
|
||||
e.to_string()
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if start_block < end_block {
|
||||
Ok(Some((start_block, end_block)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn perform_hord_db_update(
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
hord_config: &HordConfig,
|
||||
config: &Config,
|
||||
block_post_processor: Option<Sender<BitcoinBlockData>>,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Syncing hord_db: {} blocks to download ({start_block}: {end_block})",
|
||||
end_block - start_block + 1
|
||||
);
|
||||
|
||||
let bitcoin_config = BitcoinConfig {
|
||||
username: config.network.bitcoind_rpc_username.clone(),
|
||||
password: config.network.bitcoind_rpc_password.clone(),
|
||||
rpc_url: config.network.bitcoind_rpc_url.clone(),
|
||||
network: config.network.bitcoin_network.clone(),
|
||||
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
|
||||
};
|
||||
|
||||
let blocks_db = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let inscriptions_db_conn_rw = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
|
||||
let _ = fetch_and_cache_blocks_in_hord_db(
|
||||
&bitcoin_config,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
start_block,
|
||||
end_block,
|
||||
hord_config,
|
||||
block_post_processor,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -16,6 +16,7 @@ pub mod archive;
|
||||
pub mod block;
|
||||
pub mod cli;
|
||||
pub mod config;
|
||||
pub mod hord;
|
||||
pub mod scan;
|
||||
pub mod service;
|
||||
pub mod storage;
|
||||
|
||||
@@ -25,7 +25,7 @@ use chainhook_sdk::indexer::bitcoin::{
|
||||
};
|
||||
use chainhook_sdk::observer::{gather_proofs, EventObserverConfig};
|
||||
use chainhook_sdk::utils::{file_append, send_request, Context};
|
||||
use chainhook_types::{BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData};
|
||||
use chainhook_types::{BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
@@ -137,7 +137,10 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
};
|
||||
for (transaction_identifier, traversal_result) in local_traverals.into_iter() {
|
||||
traversals.insert(
|
||||
(transaction_identifier, traversal_result.input_index),
|
||||
(
|
||||
transaction_identifier,
|
||||
traversal_result.inscription_input_index,
|
||||
),
|
||||
traversal_result,
|
||||
);
|
||||
}
|
||||
@@ -170,15 +173,21 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
);
|
||||
}
|
||||
|
||||
let chain_event =
|
||||
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
|
||||
new_blocks: vec![block],
|
||||
confirmed_blocks: vec![],
|
||||
});
|
||||
match process_block_with_predicates(
|
||||
block,
|
||||
&vec![&predicate_spec],
|
||||
&event_observer_config,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(actions) => actions_triggered += actions,
|
||||
Err(_) => err_count += 1,
|
||||
}
|
||||
|
||||
let (predicates_triggered, _predicates_evaluated) =
|
||||
evaluate_bitcoin_chainhooks_on_chain_event(&chain_event, vec![&predicate_spec], ctx);
|
||||
occurrences_found += predicates_triggered.len() as u64;
|
||||
if err_count >= 3 {
|
||||
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
|
||||
}
|
||||
|
||||
if let PredicatesApi::On(ref api_config) = config.http_api {
|
||||
if blocks_scanned % 50 == 0 {
|
||||
@@ -199,15 +208,6 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
}
|
||||
}
|
||||
|
||||
match execute_predicates_action(predicates_triggered, &event_observer_config, &ctx).await {
|
||||
Ok(actions) => actions_triggered += actions,
|
||||
Err(_) => err_count += 1,
|
||||
}
|
||||
|
||||
if err_count >= 3 {
|
||||
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
|
||||
}
|
||||
|
||||
if cursor == end_block && floating_end_block {
|
||||
end_block = match bitcoin_rpc.get_blockchain_info() {
|
||||
Ok(result) => result.blocks - 1,
|
||||
@@ -236,6 +236,24 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn process_block_with_predicates(
|
||||
block: BitcoinBlockData,
|
||||
predicates: &Vec<&BitcoinChainhookSpecification>,
|
||||
event_observer_config: &EventObserverConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<u32, ()> {
|
||||
let chain_event =
|
||||
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
|
||||
new_blocks: vec![block],
|
||||
confirmed_blocks: vec![],
|
||||
});
|
||||
|
||||
let (predicates_triggered, _predicates_evaluated) =
|
||||
evaluate_bitcoin_chainhooks_on_chain_event(&chain_event, predicates, ctx);
|
||||
|
||||
execute_predicates_action(predicates_triggered, &event_observer_config, &ctx).await
|
||||
}
|
||||
|
||||
pub async fn execute_predicates_action<'a>(
|
||||
hits: Vec<BitcoinTriggerChainhook<'a>>,
|
||||
config: &EventObserverConfig,
|
||||
|
||||
@@ -2,6 +2,8 @@ mod http_api;
|
||||
mod runloops;
|
||||
|
||||
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
|
||||
use crate::hord::should_sync_hord_db;
|
||||
use crate::scan::bitcoin::process_block_with_predicates;
|
||||
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
|
||||
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
|
||||
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
|
||||
@@ -116,22 +118,71 @@ impl Service {
|
||||
if !hord_disabled {
|
||||
// TODO: add flag
|
||||
// let _ = download_ordinals_dataset_if_required(&mut self.config, &self.ctx).await;
|
||||
}
|
||||
|
||||
// Start chainhook event observer
|
||||
let context_cloned = self.ctx.clone();
|
||||
let event_observer_config_moved = event_observer_config.clone();
|
||||
let observer_command_tx_moved = observer_command_tx.clone();
|
||||
let _ = hiro_system_kit::thread_named("Chainhook event observer").spawn(move || {
|
||||
let future = start_event_observer(
|
||||
event_observer_config_moved,
|
||||
observer_command_tx_moved,
|
||||
observer_command_rx,
|
||||
Some(observer_event_tx),
|
||||
context_cloned,
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Ordinal indexing is enabled by default, checking index... (use --no-hord to disable ordinals)"
|
||||
);
|
||||
let _ = hiro_system_kit::nestable_block_on(future);
|
||||
});
|
||||
|
||||
if let Some((start_block, end_block)) = should_sync_hord_db(&self.config, &self.ctx)? {
|
||||
if start_block == 0 {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Initializing hord indexing from block #{}", start_block
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Resuming hord indexing from block #{}", start_block
|
||||
);
|
||||
}
|
||||
|
||||
let (tx, rx) = channel();
|
||||
|
||||
let mut moved_event_observer_config = event_observer_config.clone();
|
||||
let moved_ctx = self.ctx.clone();
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Initial predicate processing")
|
||||
.spawn(move || {
|
||||
if let Some(mut chainhook_config) =
|
||||
moved_event_observer_config.chainhook_config.take()
|
||||
{
|
||||
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut()
|
||||
{
|
||||
bitcoin_predicate.enabled = false;
|
||||
}
|
||||
let mut bitcoin_predicates_ref: Vec<
|
||||
&chainhook_sdk::chainhooks::types::BitcoinChainhookSpecification,
|
||||
> = vec![];
|
||||
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter() {
|
||||
bitcoin_predicates_ref.push(bitcoin_predicate);
|
||||
}
|
||||
while let Ok(block) = rx.recv() {
|
||||
let future = process_block_with_predicates(
|
||||
block,
|
||||
&bitcoin_predicates_ref,
|
||||
&moved_event_observer_config,
|
||||
&moved_ctx,
|
||||
);
|
||||
let res = hiro_system_kit::nestable_block_on(future);
|
||||
if let Err(_) = res {
|
||||
error!(moved_ctx.expect_logger(), "Initial ingestion failing");
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
crate::hord::perform_hord_db_update(
|
||||
start_block,
|
||||
end_block,
|
||||
&self.config.get_hord_config(),
|
||||
&self.config,
|
||||
Some(tx),
|
||||
&self.ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Stacks scan operation threadpool
|
||||
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
|
||||
@@ -193,6 +244,28 @@ impl Service {
|
||||
);
|
||||
}
|
||||
BitcoinBlockSignaling::Stacks(ref _url) => {
|
||||
// Start chainhook event observer
|
||||
let context_cloned = self.ctx.clone();
|
||||
let event_observer_config_moved = event_observer_config.clone();
|
||||
let observer_command_tx_moved = observer_command_tx.clone();
|
||||
let _ =
|
||||
hiro_system_kit::thread_named("Chainhook event observer").spawn(move || {
|
||||
let future = start_event_observer(
|
||||
event_observer_config_moved,
|
||||
observer_command_tx_moved,
|
||||
observer_command_rx,
|
||||
Some(observer_event_tx),
|
||||
context_cloned,
|
||||
);
|
||||
let _ = hiro_system_kit::nestable_block_on(future);
|
||||
});
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Listening on port {} for Stacks chain events",
|
||||
event_observer_config
|
||||
.get_stacks_node_config()
|
||||
.ingestion_port
|
||||
);
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Observing Bitcoin chain events via Stacks node"
|
||||
|
||||
Reference in New Issue
Block a user