mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-13 16:19:01 +08:00
feat: re-introduce ingestion
This commit is contained in:
@@ -749,7 +749,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
let mut hord_config = config.get_hord_config();
|
||||
hord_config.network_thread_max = cmd.network_threads;
|
||||
|
||||
let (tx, handle) = start_ordinals_number_processor(&config, ctx);
|
||||
let (tx, handle) = start_ordinals_number_processor(&config, ctx, None);
|
||||
|
||||
rebuild_rocks_db(
|
||||
&config,
|
||||
|
||||
@@ -2207,6 +2207,7 @@ pub fn process_blocks(
|
||||
inscription_height_hint: &mut InscriptionHeigthHint,
|
||||
inscriptions_db_conn_rw: &mut Connection,
|
||||
hord_config: &HordConfig,
|
||||
post_processor: &Option<Sender<BitcoinBlockData>>,
|
||||
ctx: &Context,
|
||||
) {
|
||||
let mut cache_l1 = HashMap::new();
|
||||
@@ -2224,6 +2225,10 @@ pub fn process_blocks(
|
||||
hord_config,
|
||||
ctx,
|
||||
);
|
||||
|
||||
if let Some(post_processor_tx) = post_processor {
|
||||
let _ = post_processor_tx.send(block);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::{sync::Arc, thread::JoinHandle};
|
||||
use std::{
|
||||
sync::{mpsc::Sender, Arc},
|
||||
thread::JoinHandle,
|
||||
};
|
||||
|
||||
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
|
||||
|
||||
@@ -15,6 +18,7 @@ use super::new_traversals_lazy_cache;
|
||||
pub fn start_ordinals_number_processor(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
post_processor: Option<Sender<BitcoinBlockData>>,
|
||||
) -> (
|
||||
crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>,
|
||||
JoinHandle<()>,
|
||||
@@ -97,6 +101,7 @@ pub fn start_ordinals_number_processor(
|
||||
&mut inscription_height_hint,
|
||||
&mut inscriptions_db_conn_rw,
|
||||
&hord_config,
|
||||
&post_processor,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
|
||||
@@ -89,7 +89,40 @@ impl Service {
|
||||
// Catch-up with chain tip
|
||||
{
|
||||
// Start predicate processor
|
||||
let (tx, handle) = start_ordinals_number_processor(&self.config, &self.ctx);
|
||||
let (tx_replayer, rx_replayer) = channel();
|
||||
|
||||
let (tx, handle) =
|
||||
start_ordinals_number_processor(&self.config, &self.ctx, Some(tx_replayer));
|
||||
|
||||
let mut moved_event_observer_config = event_observer_config.clone();
|
||||
let moved_ctx = self.ctx.clone();
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Initial predicate processing")
|
||||
.spawn(move || {
|
||||
if let Some(mut chainhook_config) =
|
||||
moved_event_observer_config.chainhook_config.take()
|
||||
{
|
||||
let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> =
|
||||
vec![];
|
||||
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() {
|
||||
bitcoin_predicate.enabled = false;
|
||||
bitcoin_predicates_ref.push(bitcoin_predicate);
|
||||
}
|
||||
while let Ok(block) = rx_replayer.recv() {
|
||||
let future = process_block_with_predicates(
|
||||
block,
|
||||
&bitcoin_predicates_ref,
|
||||
&moved_event_observer_config,
|
||||
&moved_ctx,
|
||||
);
|
||||
let res = hiro_system_kit::nestable_block_on(future);
|
||||
if let Err(_) = res {
|
||||
error!(moved_ctx.expect_logger(), "Initial ingestion failing");
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
while let Some((start_block, end_block)) = should_sync_hord_db(&self.config, &self.ctx)?
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user