feat: streamline processors

This commit is contained in:
Ludo Galabru
2023-08-02 10:08:33 +02:00
parent c0991c516c
commit 13421db297
6 changed files with 193 additions and 30 deletions

View File

@@ -2,7 +2,8 @@ use crate::archive::download_ordinals_dataset_if_required;
use crate::config::generator::generate_config;
use crate::config::Config;
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::processors::start_ordinals_number_processor;
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::{self};
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::service::Service;
@@ -622,12 +623,14 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;
let blocks_post_processor = start_block_ingestion_processor(&config, ctx, None);
download_and_pipeline_blocks(
&config,
cmd.start_block,
cmd.end_block,
hord_config.first_inscription_height,
None,
Some(&blocks_post_processor),
&ctx,
)
.await?
@@ -637,19 +640,18 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;
let (tx, handle) = start_ordinals_number_processor(&config, ctx, None);
let blocks_post_processor =
start_inscription_indexing_processor(&config, ctx, None);
download_and_pipeline_blocks(
&config,
cmd.start_block,
cmd.end_block,
hord_config.first_inscription_height,
Some(tx),
Some(&blocks_post_processor),
&ctx,
)
.await?;
let _ = handle.join();
}
RepairCommand::Transfers(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;

View File

@@ -5,7 +5,7 @@ use chainhook_sdk::types::BitcoinBlockData;
use chainhook_sdk::utils::Context;
use crossbeam_channel::bounded;
use std::collections::{HashMap, VecDeque};
use std::thread::sleep;
use std::thread::{sleep, JoinHandle};
use std::time::Duration;
use tokio::task::JoinSet;
@@ -18,12 +18,27 @@ use chainhook_sdk::indexer::bitcoin::{
use super::parse_ordinals_and_standardize_block;
pub enum PostProcessorCommand {
ProcessBlocks(Vec<(BitcoinBlockData, LazyBlock)>),
Terminate,
}
pub enum PostProcessorEvent {
EmptyQueue,
}
pub struct PostProcessorController {
pub commands_tx: crossbeam_channel::Sender<PostProcessorCommand>,
pub events_rx: crossbeam_channel::Receiver<PostProcessorEvent>,
pub thread_handle: JoinHandle<()>,
}
pub async fn download_and_pipeline_blocks(
config: &Config,
start_block: u64,
end_block: u64,
start_sequencing_blocks_at_height: u64,
blocks_post_processor: Option<crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>>,
blocks_post_processor: Option<&PostProcessorController>,
ctx: &Context,
) -> Result<(), String> {
// let guard = pprof::ProfilerGuardBuilder::default()
@@ -111,6 +126,10 @@ pub async fn download_and_pipeline_blocks(
let cloned_ctx = ctx.clone();
let post_processor_commands_tx = blocks_post_processor
.as_ref()
.and_then(|p| Some(p.commands_tx.clone()));
let storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
.spawn(move || {
let mut inbox = HashMap::new();
@@ -151,8 +170,8 @@ pub async fn download_and_pipeline_blocks(
inbox_cursor += 1;
}
if !chunk.is_empty() {
if let Some(ref tx) = blocks_post_processor {
let _ = tx.send(chunk);
if let Some(ref blocks_tx) = post_processor_commands_tx {
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(chunk));
}
} else {
if blocks_processed == number_of_blocks_to_process {
@@ -200,6 +219,14 @@ pub async fn download_and_pipeline_blocks(
let _ = handle.join();
}
if let Some(post_processor) = blocks_post_processor {
loop {
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
break;
}
}
}
let _ = storage_thread.join();
let _ = set.shutdown();

View File

@@ -0,0 +1,104 @@
use std::{
sync::mpsc::Sender,
thread::{sleep, JoinHandle},
time::Duration,
};
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
use crossbeam_channel::TryRecvError;
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db},
};
pub fn start_block_ingestion_processor(
config: &Config,
ctx: &Context,
_post_processor: Option<Sender<BitcoinBlockData>>,
) -> PostProcessorController {
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();
let config = config.clone();
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
.spawn(move || {
let mut num_writes = 0;
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();
let mut empty_cycles = 0;
loop {
let blocks = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(blocks)) => blocks,
Ok(PostProcessorCommand::Terminate) => break,
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 30 {
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
}
sleep(Duration::from_secs(1));
if empty_cycles > 120 {
break;
}
continue;
}
_ => {
break;
}
},
};
info!(ctx.expect_logger(), "Storing {} blocks", blocks.len());
for (block, compacted_block) in blocks.into_iter() {
insert_entry_in_blocks(
block.block_identifier.index as u32,
&compacted_block,
&blocks_db_rw,
&ctx,
);
num_writes += 1;
}
// Early return
if num_writes % 128 == 0 {
ctx.try_log(|logger| {
info!(logger, "Flushing DB to disk ({num_writes} inserts)");
});
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
num_writes = 0;
continue;
}
// Write blocks to disk, before traversals
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
}
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
})
.expect("unable to spawn thread");
PostProcessorController {
commands_tx,
events_rx,
thread_handle: handle,
}
}

View File

@@ -1,28 +1,32 @@
use std::{
sync::{mpsc::Sender, Arc},
thread::JoinHandle,
thread::{sleep, JoinHandle},
time::Duration,
};
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
use crossbeam_channel::TryRecvError;
use crate::{
config::Config,
core::{new_traversals_lazy_cache, protocol::sequencing::process_blocks},
core::{
new_traversals_lazy_cache,
pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
protocol::sequencing::process_blocks,
},
db::{
insert_entry_in_blocks, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
InscriptionHeigthHint, LazyBlock,
InscriptionHeigthHint,
},
};
pub fn start_ordinals_number_processor(
pub fn start_inscription_indexing_processor(
config: &Config,
ctx: &Context,
post_processor: Option<Sender<BitcoinBlockData>>,
) -> (
crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>,
JoinHandle<()>,
) {
let (tx, rx) = crossbeam_channel::bounded::<Vec<(BitcoinBlockData, LazyBlock)>>(1);
) -> PostProcessorController {
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();
let config = config.clone();
let ctx = ctx.clone();
@@ -41,16 +45,39 @@ pub fn start_ordinals_number_processor(
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();
let mut inscription_height_hint = InscriptionHeigthHint::new();
let mut empty_cycles = 0;
loop {
let blocks_to_process = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(blocks)) => blocks,
Ok(PostProcessorCommand::Terminate) => break,
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 30 {
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
}
sleep(Duration::from_secs(1));
if empty_cycles > 120 {
break;
}
continue;
}
_ => {
break;
}
},
};
while let Ok(raw_blocks) = rx.recv() {
info!(
ctx.expect_logger(),
"Processing {} blocks",
raw_blocks.len()
blocks_to_process.len()
);
let mut blocks = vec![];
for (block, compacted_block) in raw_blocks.into_iter() {
for (block, compacted_block) in blocks_to_process.into_iter() {
insert_entry_in_blocks(
block.block_identifier.index as u32,
&compacted_block,
@@ -118,5 +145,9 @@ pub fn start_ordinals_number_processor(
})
.expect("unable to spawn thread");
(tx, handle)
PostProcessorController {
commands_tx,
events_rx,
thread_handle: handle,
}
}

View File

@@ -1,3 +1,4 @@
pub mod block_ingestion;
pub mod inscription_indexing;
pub use inscription_indexing::start_ordinals_number_processor;
pub use inscription_indexing::start_inscription_indexing_processor;

View File

@@ -4,7 +4,7 @@ mod runloops;
use crate::cli::fetch_and_standardize_block;
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::processors::start_ordinals_number_processor;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::protocol::sequencing::{
update_hord_db_and_augment_bitcoin_block_v3,
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
@@ -93,8 +93,8 @@ impl Service {
// Start predicate processor
let (tx_replayer, rx_replayer) = channel();
let (tx, handle) =
start_ordinals_number_processor(&self.config, &self.ctx, Some(tx_replayer));
let blocks_post_processor =
start_inscription_indexing_processor(&self.config, &self.ctx, Some(tx_replayer));
let mut moved_event_observer_config = event_observer_config.clone();
let moved_ctx = self.ctx.clone();
@@ -141,13 +141,11 @@ impl Service {
start_block,
end_block,
hord_config.first_inscription_height,
Some(tx.clone()),
Some(&blocks_post_processor),
&self.ctx,
)
.await?;
}
let _ = handle.join();
}
// Bitcoin scan operation threadpool