fix: patch boot latency

This commit is contained in:
Ludo Galabru
2023-08-03 03:15:58 +02:00
parent 108117b82e
commit 0e3faf9a61
4 changed files with 27 additions and 7 deletions

View File

@@ -19,6 +19,7 @@ use chainhook_sdk::indexer::bitcoin::{
use super::parse_ordinals_and_standardize_block;
pub enum PostProcessorCommand {
Start,
ProcessBlocks(Vec<(u64, LazyBlock)>, Vec<BitcoinBlockData>),
Terminate,
}
@@ -150,6 +151,8 @@ pub async fn download_and_pipeline_blocks(
let mut inbox = HashMap::new();
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
let mut blocks_processed = 0;
let mut pre_seq_processor_started = false;
let mut post_seq_processor_started = false;
loop {
// Dequeue all the blocks available
@@ -173,6 +176,11 @@ pub async fn download_and_pipeline_blocks(
if !ooo_compacted_blocks.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_pre_sequence_commands_tx {
if !pre_seq_processor_started {
pre_seq_processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(ooo_compacted_blocks, vec![]));
}
}
@@ -194,6 +202,10 @@ pub async fn download_and_pipeline_blocks(
}
if !blocks.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_post_sequence_commands_tx {
if !post_seq_processor_started {
post_seq_processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks));
}
} else {

View File

@@ -32,12 +32,17 @@ pub fn start_block_ingestion_processor(
let mut empty_cycles = 0;
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start block indexing runloop");
}
loop {
let (compacted_blocks, _) = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
(compacted_blocks, blocks)
}
Ok(PostProcessorCommand::Terminate) => break,
Ok(PostProcessorCommand::Start) => continue,
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;

View File

@@ -49,7 +49,7 @@ pub fn start_inscription_indexing_processor(
let config = config.clone();
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Batch receiver")
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Inscription indexing runloop")
.spawn(move || {
let cache_l2 = Arc::new(new_traversals_lazy_cache(1024));
let garbage_collect_every_n_blocks = 100;
@@ -66,23 +66,26 @@ pub fn start_inscription_indexing_processor(
let mut inscription_height_hint = InscriptionHeigthHint::new();
let mut empty_cycles = 0;
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start inscription indexing runloop");
}
loop {
let (compacted_blocks, mut blocks) = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
empty_cycles = 0;
(compacted_blocks, blocks)
}
Ok(PostProcessorCommand::Terminate) => break,
Ok(PostProcessorCommand::Start) => continue,
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 30 {
if empty_cycles == 10 {
empty_cycles = 0;
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
}
sleep(Duration::from_secs(1));
if empty_cycles > 120 {
break;
}
continue;
}
_ => {

View File

@@ -139,7 +139,7 @@ impl Service {
start_block,
end_block,
hord_config.first_inscription_height,
Some(&blocks_post_processor),
None,
Some(&blocks_post_processor),
speed,
&self.ctx,