feat: simplify + improve coordination

This commit is contained in:
Ludo Galabru
2023-08-13 21:46:10 +02:00
parent 0233dc5bf0
commit 1922fd9bc4
10 changed files with 154 additions and 134 deletions

View File

@@ -1,7 +1,7 @@
use crate::config::generator::generate_config;
use crate::config::Config;
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::protocol::inscription_parsing::parse_ordinals_and_standardize_block;
use crate::download::download_ordinals_dataset_if_required;
@@ -12,8 +12,9 @@ use crate::db::{
delete_data_in_hord_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
find_latest_inscription_block_height, find_lazy_block_at_block_height,
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db, initialize_hord_db, get_default_hord_db_file_path,
get_default_hord_db_file_path, initialize_hord_db, open_readonly_hord_db_conn,
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db,
};
use chainhook_sdk::bitcoincore_rpc::{Auth, Client, RpcApi};
use chainhook_sdk::chainhooks::types::HttpHook;
@@ -263,7 +264,7 @@ struct StartCommand {
enum HordDbCommand {
/// Initialize a new hord db
#[clap(name = "new", bin_name = "new")]
New(SyncHordDbCommand),
New(SyncHordDbCommand),
/// Catch-up hord db
#[clap(name = "sync", bin_name = "sync")]
Sync(SyncHordDbCommand),
@@ -482,8 +483,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut total_inscriptions = 0;
let mut total_transfers = 0;
let inscriptions_db_conn =
initialize_hord_db(&config.expected_cache_path(), &ctx);
let inscriptions_db_conn = initialize_hord_db(&config.expected_cache_path(), &ctx);
while let Some(block_height) = block_range.pop_front() {
let inscriptions =
find_all_inscriptions_in_block(&block_height, &inscriptions_db_conn, &ctx);
@@ -529,7 +529,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
if total_transfers == 0 && total_inscriptions == 0 {
let db_file_path = get_default_hord_db_file_path(&config.expected_cache_path());
warn!(ctx.expect_logger(), "No data available. Check the validity of the range being scanned and the validity of your local database {}", db_file_path.display());
}
}
}
}
Command::Scan(ScanCommand::Inscription(cmd)) => {
@@ -637,20 +637,20 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(HordDbCommand::New(cmd)) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
initialize_hord_db(&config.expected_cache_path(), &ctx);
},
}
Command::Db(HordDbCommand::Sync(cmd)) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
initialize_hord_db(&config.expected_cache_path(), &ctx);
let service = Service::new(config, ctx.clone());
service.update_state(None).await?;
},
}
Command::Db(HordDbCommand::Repair(subcmd)) => match subcmd {
RepairCommand::Blocks(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;
let block_ingestion_processor = start_block_ingestion_processor(&config, ctx, None);
let block_ingestion_processor = start_block_archiving_processor(&config, ctx, None);
download_and_pipeline_blocks(
&config,
@@ -658,7 +658,6 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
cmd.end_block,
hord_config.first_inscription_height,
Some(&block_ingestion_processor),
Some(&block_ingestion_processor),
10_000,
&ctx,
)
@@ -677,7 +676,6 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
cmd.start_block,
cmd.end_block,
hord_config.first_inscription_height,
None,
Some(&inscription_indexing_processor),
10_000,
&ctx,

View File

@@ -155,9 +155,9 @@ impl Config {
let bootstrap = match config_file.bootstrap {
Some(bootstrap) => match bootstrap.download_url {
Some(ref url) => BootstrapConfig::Download(url.to_string()),
None => BootstrapConfig::Build
}
None => BootstrapConfig::Build
None => BootstrapConfig::Build,
},
None => BootstrapConfig::Build,
};
let config = Config {
@@ -243,7 +243,7 @@ impl Config {
pub fn should_bootstrap_through_download(&self) -> bool {
match &self.bootstrap {
BootstrapConfig::Build => false,
BootstrapConfig::Download(_) => true
BootstrapConfig::Download(_) => true,
}
}
@@ -267,7 +267,7 @@ impl Config {
fn expected_remote_ordinals_sqlite_base_url(&self) -> &str {
match &self.bootstrap {
BootstrapConfig::Build => unreachable!(),
BootstrapConfig::Download(url) => &url
BootstrapConfig::Download(url) => &url,
}
}
@@ -367,7 +367,9 @@ impl Config {
working_dir: default_cache_path(),
},
http_api: PredicatesApi::Off,
bootstrap: BootstrapConfig::Download(DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string()),
bootstrap: BootstrapConfig::Download(
DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string(),
),
limits: LimitsConfig {
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE,

View File

@@ -25,7 +25,9 @@ pub enum PostProcessorCommand {
}
pub enum PostProcessorEvent {
EmptyQueue,
Started,
Terminated,
Expired,
}
pub struct PostProcessorController {
@@ -39,8 +41,7 @@ pub async fn download_and_pipeline_blocks(
start_block: u64,
end_block: u64,
start_sequencing_blocks_at_height: u64,
blocks_post_processor_pre_sequence: Option<&PostProcessorController>,
blocks_post_processor_post_sequence: Option<&PostProcessorController>,
blocks_post_processor: Option<&PostProcessorController>,
speed: usize,
ctx: &Context,
) -> Result<(), String> {
@@ -101,7 +102,7 @@ pub async fn download_and_pipeline_blocks(
rx_thread_pool.push(rx);
}
for rx in rx_thread_pool.into_iter() {
for (thread_index, rx) in rx_thread_pool.into_iter().enumerate() {
let block_compressed_tx_moved = block_compressed_tx.clone();
let moved_ctx: Context = moved_ctx.clone();
let moved_bitcoin_network = moved_bitcoin_network.clone();
@@ -131,6 +132,8 @@ pub async fn download_and_pipeline_blocks(
compressed_block,
)));
}
moved_ctx
.try_log(|logger| debug!(logger, "Exiting processing thread {thread_index}"));
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
@@ -138,11 +141,7 @@ pub async fn download_and_pipeline_blocks(
let cloned_ctx = ctx.clone();
let blocks_post_processor_post_sequence_commands_tx = blocks_post_processor_post_sequence
.as_ref()
.and_then(|p| Some(p.commands_tx.clone()));
let blocks_post_processor_pre_sequence_commands_tx = blocks_post_processor_pre_sequence
let blocks_post_processor_commands_tx = blocks_post_processor
.as_ref()
.and_then(|p| Some(p.commands_tx.clone()));
@@ -151,19 +150,52 @@ pub async fn download_and_pipeline_blocks(
let mut inbox = HashMap::new();
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
let mut blocks_processed = 0;
let mut pre_seq_processor_started = false;
let mut post_seq_processor_started = false;
let mut processor_started = false;
let mut stop_runloop = false;
loop {
if stop_runloop {
cloned_ctx.try_log(|logger| {
info!(
logger,
"#{blocks_processed} blocks successfully sent to processor"
)
});
break;
}
// Dequeue all the blocks available
let mut new_blocks = vec![];
while let Ok(Some((block_height, block, compacted_block))) =
block_compressed_rx.try_recv()
{
blocks_processed += 1;
new_blocks.push((block_height, block, compacted_block));
if new_blocks.len() >= 10_000 {
break;
while let Ok(message) = block_compressed_rx.try_recv() {
match message {
Some((block_height, block, compacted_block)) => {
blocks_processed += 1;
new_blocks.push((block_height, block, compacted_block));
// Max batch size: 10_000 blocks
if new_blocks.len() >= 10_000 {
break;
}
}
None => {
stop_runloop = true;
}
}
}
if blocks_processed == number_of_blocks_to_process {
stop_runloop = true;
}
// Early "continue"
if new_blocks.is_empty() {
sleep(Duration::from_millis(500));
continue;
}
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
if !processor_started {
processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}
}
@@ -176,18 +208,15 @@ pub async fn download_and_pipeline_blocks(
}
}
if !ooo_compacted_blocks.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_pre_sequence_commands_tx {
if !pre_seq_processor_started {
pre_seq_processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}
// Early "continue"
if inbox.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
ooo_compacted_blocks,
vec![],
));
}
continue;
}
// In order processing: construct the longest sequence of known blocks
@@ -199,24 +228,8 @@ pub async fn download_and_pipeline_blocks(
inbox_cursor += 1;
}
if blocks.is_empty() {
if blocks_processed == number_of_blocks_to_process {
cloned_ctx.try_log(|logger| {
info!(
logger,
"#{blocks_processed} blocks successfully sent to processor"
)
});
break;
} else {
sleep(Duration::from_secs(1));
}
} else {
if let Some(ref blocks_tx) = blocks_post_processor_post_sequence_commands_tx {
if !post_seq_processor_started {
post_seq_processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}
if !blocks.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
compacted_blocks,
blocks,
@@ -248,7 +261,7 @@ pub async fn download_and_pipeline_blocks(
}
ctx.try_log(|logger| {
info!(
debug!(
logger,
"Pipeline successfully fed with sequence of blocks ({} to {})", start_block, end_block
)
@@ -258,25 +271,32 @@ pub async fn download_and_pipeline_blocks(
let _ = tx.send(None);
}
ctx.try_log(|logger| debug!(logger, "Enqueued pipeline termination commands"));
for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
if let Some(post_processor) = blocks_post_processor_pre_sequence {
ctx.try_log(|logger| debug!(logger, "Pipeline successfully terminated"));
if let Some(post_processor) = blocks_post_processor {
if let Ok(PostProcessorEvent::Started) = post_processor.events_rx.recv() {
ctx.try_log(|logger| debug!(logger, "Block post processing started"));
let _ = post_processor
.commands_tx
.send(PostProcessorCommand::Terminate);
}
loop {
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
break;
if let Ok(signal) = post_processor.events_rx.recv() {
match signal {
PostProcessorEvent::Terminated | PostProcessorEvent::Expired => break,
PostProcessorEvent::Started => unreachable!(),
}
}
}
}
if let Some(post_processor) = blocks_post_processor_post_sequence {
loop {
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
break;
}
}
}
let _ = block_compressed_tx.send(None);
let _ = storage_thread.join();
let _ = set.shutdown();

View File

@@ -14,7 +14,7 @@ use crate::{
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db, LazyBlock},
};
pub fn start_block_ingestion_processor(
pub fn start_block_archiving_processor(
config: &Config,
ctx: &Context,
_post_processor: Option<Sender<BitcoinBlockData>>,
@@ -31,22 +31,29 @@ pub fn start_block_ingestion_processor(
let mut empty_cycles = 0;
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start block indexing runloop");
let _ = events_tx.send(PostProcessorEvent::Started);
debug!(ctx.expect_logger(), "Start block indexing runloop");
}
loop {
debug!(ctx.expect_logger(), "Tick");
let (compacted_blocks, _) = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
(compacted_blocks, blocks)
}
Ok(PostProcessorCommand::Terminate) => break,
Ok(PostProcessorCommand::Start) => continue,
Ok(PostProcessorCommand::Terminate) => {
debug!(ctx.expect_logger(), "Terminating block processor");
let _ = events_tx.send(PostProcessorEvent::Terminated);
break;
}
Ok(PostProcessorCommand::Start) => unreachable!(),
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 30 {
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
warn!(ctx.expect_logger(), "Block processor reached expiration");
let _ = events_tx.send(PostProcessorEvent::Expired);
break;
}
sleep(Duration::from_secs(1));
if empty_cycles > 120 {

View File

@@ -19,7 +19,7 @@ use std::hash::BuildHasherDefault;
use crate::{
core::{
pipeline::processors::block_ingestion::store_compacted_blocks,
pipeline::processors::block_archiving::store_compacted_blocks,
protocol::{
inscription_parsing::get_inscriptions_revealed_in_block,
inscription_sequencing::{
@@ -72,6 +72,7 @@ pub fn start_inscription_indexing_processor(
let mut sequence_cursor = SequenceCursor::new(inscriptions_db_conn);
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
let _ = events_tx.send(PostProcessorEvent::Started);
info!(ctx.expect_logger(), "Start inscription indexing runloop");
}
@@ -81,14 +82,19 @@ pub fn start_inscription_indexing_processor(
empty_cycles = 0;
(compacted_blocks, blocks)
}
Ok(PostProcessorCommand::Terminate) => break,
Ok(PostProcessorCommand::Start) => continue,
Ok(PostProcessorCommand::Terminate) => {
debug!(ctx.expect_logger(), "Terminating block processor");
let _ = events_tx.send(PostProcessorEvent::Terminated);
break;
}
Ok(PostProcessorCommand::Start) => unreachable!(),
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 10 {
empty_cycles = 0;
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
warn!(ctx.expect_logger(), "Block processor reached expiration");
let _ = events_tx.send(PostProcessorEvent::Expired);
break;
}
sleep(Duration::from_secs(1));
continue;

View File

@@ -1,4 +1,4 @@
pub mod block_ingestion;
pub mod block_archiving;
pub mod inscription_indexing;
pub mod transfers_recomputing;

View File

@@ -38,7 +38,8 @@ pub fn start_transfers_recomputing_processor(
let mut empty_cycles = 0;
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start transfers recomputing runloop");
let _ = events_tx.send(PostProcessorEvent::Started);
info!(ctx.expect_logger(), "Start inscription indexing runloop");
}
loop {
@@ -47,14 +48,19 @@ pub fn start_transfers_recomputing_processor(
empty_cycles = 0;
blocks
}
Ok(PostProcessorCommand::Terminate) => break,
Ok(PostProcessorCommand::Start) => continue,
Ok(PostProcessorCommand::Terminate) => {
debug!(ctx.expect_logger(), "Terminating block processor");
let _ = events_tx.send(PostProcessorEvent::Terminated);
break;
}
Ok(PostProcessorCommand::Start) => unreachable!(),
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 10 {
empty_cycles = 0;
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
warn!(ctx.expect_logger(), "Block processor reached expiration");
let _ = events_tx.send(PostProcessorEvent::Expired);
break;
}
sleep(Duration::from_secs(1));
continue;

View File

@@ -63,7 +63,7 @@ pub fn parallelize_inscription_data_computations(
// Nothing to do? early return
if !has_transactions_to_process {
return Ok(false)
return Ok(false);
}
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
@@ -83,12 +83,8 @@ pub fn parallelize_inscription_data_computations(
let handle = hiro_system_kit::thread_named("Worker")
.spawn(move || {
while let Ok(Some((
transaction_id,
block_identifier,
input_index,
prioritary,
))) = rx.recv()
while let Ok(Some((transaction_id, block_identifier, input_index, prioritary))) =
rx.recv()
{
let traversal: Result<TraversalResult, String> = compute_satoshi_number(
&moved_hord_db_path,
@@ -190,12 +186,8 @@ pub fn parallelize_inscription_data_computations(
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(next_block) = next_block_iter.next() {
let (mut transactions_ids, _) = get_transactions_to_process(
next_block,
cache_l1,
inscriptions_db_tx,
ctx,
);
let (mut transactions_ids, _) =
get_transactions_to_process(next_block, cache_l1, inscriptions_db_tx, ctx);
ctx.try_log(|logger| {
info!(
@@ -356,9 +348,7 @@ impl SequenceCursor {
self.blessed = Some(inscription_number);
inscription_number + 1
}
_ => {
0
}
_ => 0,
}
}
Some(value) => value + 1,
@@ -378,9 +368,7 @@ impl SequenceCursor {
self.cursed = Some(inscription_number);
inscription_number - 1
}
_ => {
-1
}
_ => -1,
}
}
Some(value) => value - 1,
@@ -413,7 +401,7 @@ pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
ctx,
) {
reinscriptions_data.insert(inscription_data.ordinal_number, inscription_id);
}
}
}
}
@@ -667,11 +655,19 @@ pub fn consolidate_transaction_with_pre_computed_inscription_data(
inscription.curse_type = Some(OrdinalInscriptionCurseType::Unknown);
}
if traversal.transfer_data.transaction_identifier_location.eq(coinbase_txid) {
if traversal
.transfer_data
.transaction_identifier_location
.eq(coinbase_txid)
{
continue;
}
if let Some(output) = tx.metadata.outputs.get(traversal.transfer_data.output_index) {
if let Some(output) = tx
.metadata
.outputs
.get(traversal.transfer_data.output_index)
{
inscription.inscription_output_value = output.value;
inscription.inscriber_address = {
let script_pub_key = output.get_script_pubkey_hex();
@@ -684,7 +680,6 @@ pub fn consolidate_transaction_with_pre_computed_inscription_data(
}
};
}
}
}

View File

@@ -161,7 +161,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
number_of_blocks_sent += 1;
}
actions_triggered += actions
},
}
Err(_) => err_count += 1,
}

View File

@@ -398,12 +398,16 @@ impl Service {
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
) -> Result<(), String> {
// Start predicate processor
let blocks_post_processor =
start_inscription_indexing_processor(&self.config, &self.ctx, block_post_processor);
while let Some((start_block, end_block, speed)) =
should_sync_hord_db(&self.config, &self.ctx)?
{
let blocks_post_processor = start_inscription_indexing_processor(
&self.config,
&self.ctx,
block_post_processor.clone(),
);
info!(
self.ctx.expect_logger(),
"Indexing inscriptions from block #{start_block} to block #{end_block}"
@@ -416,26 +420,13 @@ impl Service {
start_block,
end_block,
first_inscription_height,
if end_block <= first_inscription_height {
Some(&blocks_post_processor)
} else {
None
},
if start_block >= first_inscription_height {
Some(&blocks_post_processor)
} else {
None
},
Some(&blocks_post_processor),
speed,
&self.ctx,
)
.await?;
}
let _ = blocks_post_processor
.commands_tx
.send(PostProcessorCommand::Terminate);
Ok(())
}
@@ -461,17 +452,12 @@ impl Service {
start_block,
end_block,
first_inscription_height,
None,
Some(&blocks_post_processor),
100,
&self.ctx,
)
.await?;
let _ = blocks_post_processor
.commands_tx
.send(PostProcessorCommand::Terminate);
Ok(())
}
}