mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-14 08:29:31 +08:00
feat: streamline processors
This commit is contained in:
@@ -2,7 +2,8 @@ use crate::archive::download_ordinals_dataset_if_required;
|
||||
use crate::config::generator::generate_config;
|
||||
use crate::config::Config;
|
||||
use crate::core::pipeline::download_and_pipeline_blocks;
|
||||
use crate::core::pipeline::processors::start_ordinals_number_processor;
|
||||
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
|
||||
use crate::core::pipeline::processors::start_inscription_indexing_processor;
|
||||
use crate::core::{self};
|
||||
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
|
||||
use crate::service::Service;
|
||||
@@ -622,12 +623,14 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
let mut hord_config = config.get_hord_config();
|
||||
hord_config.network_thread_max = cmd.network_threads;
|
||||
|
||||
let blocks_post_processor = start_block_ingestion_processor(&config, ctx, None);
|
||||
|
||||
download_and_pipeline_blocks(
|
||||
&config,
|
||||
cmd.start_block,
|
||||
cmd.end_block,
|
||||
hord_config.first_inscription_height,
|
||||
None,
|
||||
Some(&blocks_post_processor),
|
||||
&ctx,
|
||||
)
|
||||
.await?
|
||||
@@ -637,19 +640,18 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
let mut hord_config = config.get_hord_config();
|
||||
hord_config.network_thread_max = cmd.network_threads;
|
||||
|
||||
let (tx, handle) = start_ordinals_number_processor(&config, ctx, None);
|
||||
let blocks_post_processor =
|
||||
start_inscription_indexing_processor(&config, ctx, None);
|
||||
|
||||
download_and_pipeline_blocks(
|
||||
&config,
|
||||
cmd.start_block,
|
||||
cmd.end_block,
|
||||
hord_config.first_inscription_height,
|
||||
Some(tx),
|
||||
Some(&blocks_post_processor),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let _ = handle.join();
|
||||
}
|
||||
RepairCommand::Transfers(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
|
||||
@@ -5,7 +5,7 @@ use chainhook_sdk::types::BitcoinBlockData;
|
||||
use chainhook_sdk::utils::Context;
|
||||
use crossbeam_channel::bounded;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::thread::sleep;
|
||||
use std::thread::{sleep, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
@@ -18,12 +18,27 @@ use chainhook_sdk::indexer::bitcoin::{
|
||||
|
||||
use super::parse_ordinals_and_standardize_block;
|
||||
|
||||
pub enum PostProcessorCommand {
|
||||
ProcessBlocks(Vec<(BitcoinBlockData, LazyBlock)>),
|
||||
Terminate,
|
||||
}
|
||||
|
||||
pub enum PostProcessorEvent {
|
||||
EmptyQueue,
|
||||
}
|
||||
|
||||
pub struct PostProcessorController {
|
||||
pub commands_tx: crossbeam_channel::Sender<PostProcessorCommand>,
|
||||
pub events_rx: crossbeam_channel::Receiver<PostProcessorEvent>,
|
||||
pub thread_handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
pub async fn download_and_pipeline_blocks(
|
||||
config: &Config,
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
start_sequencing_blocks_at_height: u64,
|
||||
blocks_post_processor: Option<crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>>,
|
||||
blocks_post_processor: Option<&PostProcessorController>,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
// let guard = pprof::ProfilerGuardBuilder::default()
|
||||
@@ -111,6 +126,10 @@ pub async fn download_and_pipeline_blocks(
|
||||
|
||||
let cloned_ctx = ctx.clone();
|
||||
|
||||
let post_processor_commands_tx = blocks_post_processor
|
||||
.as_ref()
|
||||
.and_then(|p| Some(p.commands_tx.clone()));
|
||||
|
||||
let storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
|
||||
.spawn(move || {
|
||||
let mut inbox = HashMap::new();
|
||||
@@ -151,8 +170,8 @@ pub async fn download_and_pipeline_blocks(
|
||||
inbox_cursor += 1;
|
||||
}
|
||||
if !chunk.is_empty() {
|
||||
if let Some(ref tx) = blocks_post_processor {
|
||||
let _ = tx.send(chunk);
|
||||
if let Some(ref blocks_tx) = post_processor_commands_tx {
|
||||
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(chunk));
|
||||
}
|
||||
} else {
|
||||
if blocks_processed == number_of_blocks_to_process {
|
||||
@@ -200,6 +219,14 @@ pub async fn download_and_pipeline_blocks(
|
||||
let _ = handle.join();
|
||||
}
|
||||
|
||||
if let Some(post_processor) = blocks_post_processor {
|
||||
loop {
|
||||
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let _ = storage_thread.join();
|
||||
let _ = set.shutdown();
|
||||
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
use std::{
|
||||
sync::mpsc::Sender,
|
||||
thread::{sleep, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
|
||||
use crossbeam_channel::TryRecvError;
|
||||
|
||||
use crate::{
|
||||
config::Config,
|
||||
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
|
||||
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db},
|
||||
};
|
||||
|
||||
pub fn start_block_ingestion_processor(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
_post_processor: Option<Sender<BitcoinBlockData>>,
|
||||
) -> PostProcessorController {
|
||||
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
|
||||
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();
|
||||
|
||||
let config = config.clone();
|
||||
let ctx = ctx.clone();
|
||||
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
|
||||
.spawn(move || {
|
||||
let mut num_writes = 0;
|
||||
let blocks_db_rw =
|
||||
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();
|
||||
|
||||
let mut empty_cycles = 0;
|
||||
|
||||
loop {
|
||||
let blocks = match commands_rx.try_recv() {
|
||||
Ok(PostProcessorCommand::ProcessBlocks(blocks)) => blocks,
|
||||
Ok(PostProcessorCommand::Terminate) => break,
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => {
|
||||
empty_cycles += 1;
|
||||
|
||||
if empty_cycles == 30 {
|
||||
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
if empty_cycles > 120 {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
break;
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
info!(ctx.expect_logger(), "Storing {} blocks", blocks.len());
|
||||
|
||||
for (block, compacted_block) in blocks.into_iter() {
|
||||
insert_entry_in_blocks(
|
||||
block.block_identifier.index as u32,
|
||||
&compacted_block,
|
||||
&blocks_db_rw,
|
||||
&ctx,
|
||||
);
|
||||
num_writes += 1;
|
||||
}
|
||||
|
||||
// Early return
|
||||
if num_writes % 128 == 0 {
|
||||
ctx.try_log(|logger| {
|
||||
info!(logger, "Flushing DB to disk ({num_writes} inserts)");
|
||||
});
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "{}", e.to_string());
|
||||
});
|
||||
}
|
||||
num_writes = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Write blocks to disk, before traversals
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "{}", e.to_string());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "{}", e.to_string());
|
||||
});
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
PostProcessorController {
|
||||
commands_tx,
|
||||
events_rx,
|
||||
thread_handle: handle,
|
||||
}
|
||||
}
|
||||
@@ -1,28 +1,32 @@
|
||||
use std::{
|
||||
sync::{mpsc::Sender, Arc},
|
||||
thread::JoinHandle,
|
||||
thread::{sleep, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
|
||||
use crossbeam_channel::TryRecvError;
|
||||
|
||||
use crate::{
|
||||
config::Config,
|
||||
core::{new_traversals_lazy_cache, protocol::sequencing::process_blocks},
|
||||
core::{
|
||||
new_traversals_lazy_cache,
|
||||
pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
|
||||
protocol::sequencing::process_blocks,
|
||||
},
|
||||
db::{
|
||||
insert_entry_in_blocks, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
|
||||
InscriptionHeigthHint, LazyBlock,
|
||||
InscriptionHeigthHint,
|
||||
},
|
||||
};
|
||||
|
||||
pub fn start_ordinals_number_processor(
|
||||
pub fn start_inscription_indexing_processor(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
post_processor: Option<Sender<BitcoinBlockData>>,
|
||||
) -> (
|
||||
crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>,
|
||||
JoinHandle<()>,
|
||||
) {
|
||||
let (tx, rx) = crossbeam_channel::bounded::<Vec<(BitcoinBlockData, LazyBlock)>>(1);
|
||||
) -> PostProcessorController {
|
||||
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
|
||||
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();
|
||||
|
||||
let config = config.clone();
|
||||
let ctx = ctx.clone();
|
||||
@@ -41,16 +45,39 @@ pub fn start_ordinals_number_processor(
|
||||
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();
|
||||
|
||||
let mut inscription_height_hint = InscriptionHeigthHint::new();
|
||||
let mut empty_cycles = 0;
|
||||
|
||||
loop {
|
||||
let blocks_to_process = match commands_rx.try_recv() {
|
||||
Ok(PostProcessorCommand::ProcessBlocks(blocks)) => blocks,
|
||||
Ok(PostProcessorCommand::Terminate) => break,
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => {
|
||||
empty_cycles += 1;
|
||||
|
||||
if empty_cycles == 30 {
|
||||
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
if empty_cycles > 120 {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
break;
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
while let Ok(raw_blocks) = rx.recv() {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Processing {} blocks",
|
||||
raw_blocks.len()
|
||||
blocks_to_process.len()
|
||||
);
|
||||
|
||||
let mut blocks = vec![];
|
||||
for (block, compacted_block) in raw_blocks.into_iter() {
|
||||
for (block, compacted_block) in blocks_to_process.into_iter() {
|
||||
insert_entry_in_blocks(
|
||||
block.block_identifier.index as u32,
|
||||
&compacted_block,
|
||||
@@ -118,5 +145,9 @@ pub fn start_ordinals_number_processor(
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
(tx, handle)
|
||||
PostProcessorController {
|
||||
commands_tx,
|
||||
events_rx,
|
||||
thread_handle: handle,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod block_ingestion;
|
||||
pub mod inscription_indexing;
|
||||
|
||||
pub use inscription_indexing::start_ordinals_number_processor;
|
||||
pub use inscription_indexing::start_inscription_indexing_processor;
|
||||
|
||||
@@ -4,7 +4,7 @@ mod runloops;
|
||||
use crate::cli::fetch_and_standardize_block;
|
||||
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
|
||||
use crate::core::pipeline::download_and_pipeline_blocks;
|
||||
use crate::core::pipeline::processors::start_ordinals_number_processor;
|
||||
use crate::core::pipeline::processors::start_inscription_indexing_processor;
|
||||
use crate::core::protocol::sequencing::{
|
||||
update_hord_db_and_augment_bitcoin_block_v3,
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
|
||||
@@ -93,8 +93,8 @@ impl Service {
|
||||
// Start predicate processor
|
||||
let (tx_replayer, rx_replayer) = channel();
|
||||
|
||||
let (tx, handle) =
|
||||
start_ordinals_number_processor(&self.config, &self.ctx, Some(tx_replayer));
|
||||
let blocks_post_processor =
|
||||
start_inscription_indexing_processor(&self.config, &self.ctx, Some(tx_replayer));
|
||||
|
||||
let mut moved_event_observer_config = event_observer_config.clone();
|
||||
let moved_ctx = self.ctx.clone();
|
||||
@@ -141,13 +141,11 @@ impl Service {
|
||||
start_block,
|
||||
end_block,
|
||||
hord_config.first_inscription_height,
|
||||
Some(tx.clone()),
|
||||
Some(&blocks_post_processor),
|
||||
&self.ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let _ = handle.join();
|
||||
}
|
||||
|
||||
// Bitcoin scan operation threadpool
|
||||
|
||||
Reference in New Issue
Block a user