mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 16:52:57 +08:00
fix: stateful observers
# Conflicts: # components/ordhook-core/src/core/protocol/inscription_sequencing.rs
This commit is contained in:
@@ -33,7 +33,7 @@ anyhow = { version = "1.0.56", features = ["backtrace"] }
|
||||
schemars = { version = "0.8.10", git = "https://github.com/hirosystems/schemars.git", branch = "feat-chainhook-fixes" }
|
||||
progressing = '3'
|
||||
futures = "0.3.28"
|
||||
rocksdb = { version = "0.21.0", default-features = false }
|
||||
rocksdb = { version = "0.21.0", default-features = false, features = ["snappy"] }
|
||||
pprof = { version = "0.13.0", features = ["flamegraph"], optional = true }
|
||||
|
||||
# [profile.release]
|
||||
|
||||
@@ -23,8 +23,8 @@ use crate::{
|
||||
find_blessed_inscription_with_ordinal_number,
|
||||
find_latest_cursed_inscription_number_at_block_height,
|
||||
find_latest_inscription_number_at_block_height, format_satpoint_to_watch,
|
||||
update_inscriptions_with_block, update_sequence_metadata_with_block, LazyBlockTransaction,
|
||||
TraversalResult,
|
||||
update_inscriptions_with_block, update_sequence_metadata_with_block,
|
||||
LazyBlockTransaction, TraversalResult,
|
||||
},
|
||||
ord::height::Height,
|
||||
};
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::core::protocol::inscription_sequencing::consolidate_block_with_pre_co
|
||||
use crate::db::{get_any_entry_in_ordinal_activities, open_readonly_ordhook_db_conn};
|
||||
use crate::download::download_ordinals_dataset_if_required;
|
||||
use crate::service::observers::{
|
||||
open_readwrite_observers_db_conn_or_panic, update_observer_progress, ObserverReport,
|
||||
open_readwrite_observers_db_conn_or_panic, update_observer_progress,
|
||||
};
|
||||
use chainhook_sdk::bitcoincore_rpc::RpcApi;
|
||||
use chainhook_sdk::bitcoincore_rpc::{Auth, Client};
|
||||
@@ -77,7 +77,8 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Starting predicate evaluation on Bitcoin blocks",
|
||||
"Starting predicate evaluation on {} Bitcoin blocks",
|
||||
block_heights_to_scan.len()
|
||||
);
|
||||
let mut actions_triggered = 0;
|
||||
let mut err_count = 0;
|
||||
@@ -88,12 +89,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
};
|
||||
let bitcoin_config = event_observer_config.get_bitcoin_config();
|
||||
let mut number_of_blocks_scanned = 0;
|
||||
let mut last_block_height = 0;
|
||||
let http_client = build_http_client();
|
||||
|
||||
while let Some(current_block_height) = block_heights_to_scan.pop_front() {
|
||||
last_block_height = current_block_height;
|
||||
|
||||
let mut inscriptions_db_conn =
|
||||
open_readonly_ordhook_db_conn(&config.expected_cache_path(), ctx)?;
|
||||
|
||||
@@ -174,7 +172,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
open_readwrite_observers_db_conn_or_panic(&config.expected_cache_path(), &ctx);
|
||||
update_observer_progress(
|
||||
&predicate_spec.uuid,
|
||||
last_block_height,
|
||||
current_block_height,
|
||||
&observers_db_conn,
|
||||
&ctx,
|
||||
)
|
||||
|
||||
@@ -119,19 +119,11 @@ impl Service {
|
||||
// If HTTP Predicates API is on, we start:
|
||||
// - Thread pool in charge of performing replays
|
||||
// - API server
|
||||
if self.config.is_http_api_enabled() {
|
||||
self.start_main_runloop_with_dynamic_predicates(
|
||||
&observer_command_tx,
|
||||
observer_event_rx,
|
||||
predicate_activity_relayer,
|
||||
)?;
|
||||
} else {
|
||||
self.start_main_runloop(
|
||||
&observer_command_tx,
|
||||
observer_event_rx,
|
||||
predicate_activity_relayer,
|
||||
)?;
|
||||
}
|
||||
self.start_main_runloop_with_dynamic_predicates(
|
||||
&observer_command_tx,
|
||||
observer_event_rx,
|
||||
predicate_activity_relayer,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -224,10 +216,6 @@ impl Service {
|
||||
crossbeam_channel::Sender<BitcoinChainhookOccurrencePayload>,
|
||||
>,
|
||||
) -> Result<(), String> {
|
||||
let PredicatesApi::On(ref api_config) = self.config.http_api else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded();
|
||||
let ctx = self.ctx.clone();
|
||||
let config = self.config.clone();
|
||||
@@ -243,24 +231,26 @@ impl Service {
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Listening on port {} for chainhook predicate registrations", api_config.http_port
|
||||
);
|
||||
let ctx = self.ctx.clone();
|
||||
let api_config = api_config.clone();
|
||||
let moved_observer_command_tx = observer_command_tx.clone();
|
||||
let db_dir_path = self.config.expected_cache_path();
|
||||
// Test and initialize a database connection
|
||||
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
|
||||
let future = start_predicate_api_server(
|
||||
api_config.http_port,
|
||||
db_dir_path,
|
||||
moved_observer_command_tx,
|
||||
ctx,
|
||||
if let PredicatesApi::On(ref api_config) = self.config.http_api {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Listening on port {} for chainhook predicate registrations", api_config.http_port
|
||||
);
|
||||
let _ = hiro_system_kit::nestable_block_on(future);
|
||||
});
|
||||
let ctx = self.ctx.clone();
|
||||
let api_config = api_config.clone();
|
||||
let moved_observer_command_tx = observer_command_tx.clone();
|
||||
let db_dir_path = self.config.expected_cache_path();
|
||||
// Test and initialize a database connection
|
||||
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
|
||||
let future = start_predicate_api_server(
|
||||
api_config.http_port,
|
||||
db_dir_path,
|
||||
moved_observer_command_tx,
|
||||
ctx,
|
||||
);
|
||||
let _ = hiro_system_kit::nestable_block_on(future);
|
||||
});
|
||||
}
|
||||
|
||||
loop {
|
||||
let event = match observer_event_rx.recv() {
|
||||
|
||||
@@ -61,6 +61,7 @@ pub fn insert_entry_in_observers(
|
||||
observers_db_conn: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
remove_entry_from_observers(&spec.uuid(), observers_db_conn, ctx);
|
||||
while let Err(e) = observers_db_conn.execute(
|
||||
"INSERT INTO observers (uuid, spec, streaming_enabled, last_block_height_update) VALUES (?1, ?2, ?3, ?4)",
|
||||
rusqlite::params![&spec.uuid(), json!(spec).to_string(), report.streaming_enabled, report.last_block_height_update],
|
||||
@@ -238,7 +239,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
|
||||
BitcoinChainhookSpecification {
|
||||
uuid: format!("ordhook-internal-trigger"),
|
||||
owner_uuid: None,
|
||||
name: format!("ordhook"),
|
||||
name: format!("ordhook-internal-trigger"),
|
||||
network: config.network.bitcoin_network.clone(),
|
||||
version: 1,
|
||||
blocks: None,
|
||||
@@ -259,7 +260,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
|
||||
));
|
||||
}
|
||||
|
||||
let observers_db_conn = open_readwrite_observers_db_conn(&config.expected_cache_path(), ctx)?;
|
||||
let observers_db_conn = initialize_observers_db(&config.expected_cache_path(), ctx);
|
||||
|
||||
let mut observers_to_catchup = vec![];
|
||||
let mut observers_to_clean_up = vec![];
|
||||
@@ -340,13 +341,15 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
|
||||
action: observer.action,
|
||||
},
|
||||
);
|
||||
full_specs.push(BitcoinChainhookFullSpecification {
|
||||
let full_spec = BitcoinChainhookFullSpecification {
|
||||
uuid: observer.uuid,
|
||||
owner_uuid: observer.owner_uuid,
|
||||
name: observer.name,
|
||||
version: observer.version,
|
||||
networks,
|
||||
});
|
||||
};
|
||||
info!(ctx.expect_logger(), "Observer '{}' to be caught-up (last block sent: {}, tip: {})", full_spec.name, report.last_block_height_update, chain_tip_height);
|
||||
full_specs.push(full_spec);
|
||||
}
|
||||
|
||||
Ok((chainhook_config, full_specs))
|
||||
|
||||
Reference in New Issue
Block a user