mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-04-26 04:45:08 +08:00
fix: patch boot latency
This commit is contained in:
@@ -19,6 +19,7 @@ use chainhook_sdk::indexer::bitcoin::{
|
||||
use super::parse_ordinals_and_standardize_block;
|
||||
|
||||
pub enum PostProcessorCommand {
|
||||
Start,
|
||||
ProcessBlocks(Vec<(u64, LazyBlock)>, Vec<BitcoinBlockData>),
|
||||
Terminate,
|
||||
}
|
||||
@@ -150,6 +151,8 @@ pub async fn download_and_pipeline_blocks(
|
||||
let mut inbox = HashMap::new();
|
||||
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
|
||||
let mut blocks_processed = 0;
|
||||
let mut pre_seq_processor_started = false;
|
||||
let mut post_seq_processor_started = false;
|
||||
|
||||
loop {
|
||||
// Dequeue all the blocks available
|
||||
@@ -173,6 +176,11 @@ pub async fn download_and_pipeline_blocks(
|
||||
|
||||
if !ooo_compacted_blocks.is_empty() {
|
||||
if let Some(ref blocks_tx) = blocks_post_processor_pre_sequence_commands_tx {
|
||||
if !pre_seq_processor_started {
|
||||
pre_seq_processor_started = true;
|
||||
let _ = blocks_tx.send(PostProcessorCommand::Start);
|
||||
}
|
||||
|
||||
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(ooo_compacted_blocks, vec![]));
|
||||
}
|
||||
}
|
||||
@@ -194,6 +202,10 @@ pub async fn download_and_pipeline_blocks(
|
||||
}
|
||||
if !blocks.is_empty() {
|
||||
if let Some(ref blocks_tx) = blocks_post_processor_post_sequence_commands_tx {
|
||||
if !post_seq_processor_started {
|
||||
post_seq_processor_started = true;
|
||||
let _ = blocks_tx.send(PostProcessorCommand::Start);
|
||||
}
|
||||
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks));
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -32,12 +32,17 @@ pub fn start_block_ingestion_processor(
|
||||
|
||||
let mut empty_cycles = 0;
|
||||
|
||||
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
|
||||
info!(ctx.expect_logger(), "Start block indexing runloop");
|
||||
}
|
||||
|
||||
loop {
|
||||
let (compacted_blocks, _) = match commands_rx.try_recv() {
|
||||
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
|
||||
(compacted_blocks, blocks)
|
||||
}
|
||||
Ok(PostProcessorCommand::Terminate) => break,
|
||||
Ok(PostProcessorCommand::Start) => continue,
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => {
|
||||
empty_cycles += 1;
|
||||
|
||||
@@ -49,7 +49,7 @@ pub fn start_inscription_indexing_processor(
|
||||
|
||||
let config = config.clone();
|
||||
let ctx = ctx.clone();
|
||||
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Batch receiver")
|
||||
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Inscription indexing runloop")
|
||||
.spawn(move || {
|
||||
let cache_l2 = Arc::new(new_traversals_lazy_cache(1024));
|
||||
let garbage_collect_every_n_blocks = 100;
|
||||
@@ -66,23 +66,26 @@ pub fn start_inscription_indexing_processor(
|
||||
let mut inscription_height_hint = InscriptionHeigthHint::new();
|
||||
let mut empty_cycles = 0;
|
||||
|
||||
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
|
||||
info!(ctx.expect_logger(), "Start inscription indexing runloop");
|
||||
}
|
||||
|
||||
loop {
|
||||
let (compacted_blocks, mut blocks) = match commands_rx.try_recv() {
|
||||
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
|
||||
empty_cycles = 0;
|
||||
(compacted_blocks, blocks)
|
||||
}
|
||||
Ok(PostProcessorCommand::Terminate) => break,
|
||||
Ok(PostProcessorCommand::Start) => continue,
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => {
|
||||
empty_cycles += 1;
|
||||
|
||||
if empty_cycles == 30 {
|
||||
if empty_cycles == 10 {
|
||||
empty_cycles = 0;
|
||||
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
if empty_cycles > 120 {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
|
||||
@@ -139,7 +139,7 @@ impl Service {
|
||||
start_block,
|
||||
end_block,
|
||||
hord_config.first_inscription_height,
|
||||
Some(&blocks_post_processor),
|
||||
None,
|
||||
Some(&blocks_post_processor),
|
||||
speed,
|
||||
&self.ctx,
|
||||
|
||||
Reference in New Issue
Block a user