feat: re-introduce ingestion

This commit is contained in:
Ludo Galabru
2023-07-31 21:21:12 +02:00
parent 278a65524b
commit 71c90d755d
4 changed files with 46 additions and 3 deletions

View File

@@ -749,7 +749,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;
let (tx, handle) = start_ordinals_number_processor(&config, ctx);
let (tx, handle) = start_ordinals_number_processor(&config, ctx, None);
rebuild_rocks_db(
&config,

View File

@@ -2207,6 +2207,7 @@ pub fn process_blocks(
inscription_height_hint: &mut InscriptionHeigthHint,
inscriptions_db_conn_rw: &mut Connection,
hord_config: &HordConfig,
post_processor: &Option<Sender<BitcoinBlockData>>,
ctx: &Context,
) {
let mut cache_l1 = HashMap::new();
@@ -2224,6 +2225,10 @@ pub fn process_blocks(
hord_config,
ctx,
);
if let Some(post_processor_tx) = post_processor {
let _ = post_processor_tx.send(block);
}
}
}

View File

@@ -1,4 +1,7 @@
use std::{sync::Arc, thread::JoinHandle};
use std::{
sync::{mpsc::Sender, Arc},
thread::JoinHandle,
};
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
@@ -15,6 +18,7 @@ use super::new_traversals_lazy_cache;
pub fn start_ordinals_number_processor(
config: &Config,
ctx: &Context,
post_processor: Option<Sender<BitcoinBlockData>>,
) -> (
crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>,
JoinHandle<()>,
@@ -97,6 +101,7 @@ pub fn start_ordinals_number_processor(
&mut inscription_height_hint,
&mut inscriptions_db_conn_rw,
&hord_config,
&post_processor,
&ctx,
);

View File

@@ -89,7 +89,40 @@ impl Service {
// Catch-up with chain tip
{
// Start predicate processor
let (tx, handle) = start_ordinals_number_processor(&self.config, &self.ctx);
let (tx_replayer, rx_replayer) = channel();
let (tx, handle) =
start_ordinals_number_processor(&self.config, &self.ctx, Some(tx_replayer));
let mut moved_event_observer_config = event_observer_config.clone();
let moved_ctx = self.ctx.clone();
let _ = hiro_system_kit::thread_named("Initial predicate processing")
.spawn(move || {
if let Some(mut chainhook_config) =
moved_event_observer_config.chainhook_config.take()
{
let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> =
vec![];
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() {
bitcoin_predicate.enabled = false;
bitcoin_predicates_ref.push(bitcoin_predicate);
}
while let Ok(block) = rx_replayer.recv() {
let future = process_block_with_predicates(
block,
&bitcoin_predicates_ref,
&moved_event_observer_config,
&moved_ctx,
);
let res = hiro_system_kit::nestable_block_on(future);
if let Err(_) = res {
error!(moved_ctx.expect_logger(), "Initial ingestion failing");
}
}
}
})
.expect("unable to spawn thread");
while let Some((start_block, end_block)) = should_sync_hord_db(&self.config, &self.ctx)?
{