feat: improve repair command conveniency

This commit is contained in:
Ludo Galabru
2023-08-13 22:05:14 +02:00
parent 561e51eb27
commit 46be0ab5a7
6 changed files with 29 additions and 11 deletions

View File

@@ -650,7 +650,8 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;
let block_ingestion_processor = start_block_archiving_processor(&config, ctx, None);
let block_ingestion_processor =
start_block_archiving_processor(&config, ctx, false, None);
download_and_pipeline_blocks(
&config,

View File

@@ -17,6 +17,7 @@ use crate::{
pub fn start_block_archiving_processor(
config: &Config,
ctx: &Context,
update_tip: bool,
_post_processor: Option<Sender<BitcoinBlockData>>,
) -> PostProcessorController {
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
@@ -66,7 +67,7 @@ pub fn start_block_archiving_processor(
}
},
};
store_compacted_blocks(compacted_blocks, &blocks_db_rw, &ctx);
store_compacted_blocks(compacted_blocks, update_tip, &blocks_db_rw, &ctx);
}
if let Err(e) = blocks_db_rw.flush() {
@@ -86,13 +87,20 @@ pub fn start_block_archiving_processor(
pub fn store_compacted_blocks(
mut compacted_blocks: Vec<(u64, LazyBlock)>,
update_tip: bool,
blocks_db_rw: &DB,
ctx: &Context,
) {
compacted_blocks.sort_by(|(a, _), (b, _)| a.cmp(b));
for (block_height, compacted_block) in compacted_blocks.into_iter() {
insert_entry_in_blocks(block_height as u32, &compacted_block, &blocks_db_rw, &ctx);
insert_entry_in_blocks(
block_height as u32,
&compacted_block,
update_tip,
&blocks_db_rw,
&ctx,
);
ctx.try_log(|logger| {
info!(logger, "Block #{block_height} saved to disk");
});

View File

@@ -73,7 +73,7 @@ pub fn start_inscription_indexing_processor(
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
let _ = events_tx.send(PostProcessorEvent::Started);
info!(ctx.expect_logger(), "Start inscription indexing runloop");
debug!(ctx.expect_logger(), "Start inscription indexing runloop");
}
loop {
@@ -107,10 +107,15 @@ pub fn start_inscription_indexing_processor(
// Early return
if blocks.is_empty() {
store_compacted_blocks(compacted_blocks, &blocks_db_rw, &ctx);
store_compacted_blocks(compacted_blocks, true, &blocks_db_rw, &ctx);
continue;
} else {
store_compacted_blocks(compacted_blocks, &blocks_db_rw, &Context::empty());
store_compacted_blocks(
compacted_blocks,
true,
&blocks_db_rw,
&Context::empty(),
);
}
info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());

View File

@@ -39,7 +39,7 @@ pub fn start_transfers_recomputing_processor(
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
let _ = events_tx.send(PostProcessorEvent::Started);
info!(ctx.expect_logger(), "Start inscription indexing runloop");
debug!(ctx.expect_logger(), "Start inscription indexing runloop");
}
loop {

View File

@@ -267,6 +267,7 @@ pub fn open_readwrite_hord_db_conn_rocks_db(
pub fn insert_entry_in_blocks(
block_height: u32,
lazy_block: &LazyBlock,
update_tip: bool,
blocks_db_rw: &DB,
_ctx: &Context,
) {
@@ -274,9 +275,11 @@ pub fn insert_entry_in_blocks(
blocks_db_rw
.put(&block_height_bytes, &lazy_block.bytes)
.expect("unable to insert blocks");
blocks_db_rw
.put(b"metadata::last_insert", block_height_bytes)
.expect("unable to insert metadata");
if update_tip {
blocks_db_rw
.put(b"metadata::last_insert", block_height_bytes)
.expect("unable to insert metadata");
}
}
pub fn find_last_block_inserted(blocks_db: &DB) -> u32 {

View File

@@ -2,10 +2,10 @@ mod http_api;
mod runloops;
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::processors::inscription_indexing::process_blocks;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor;
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::protocol::inscription_parsing::parse_inscriptions_in_standardized_block;
use crate::core::protocol::inscription_sequencing::SequenceCursor;
use crate::core::{
@@ -252,6 +252,7 @@ impl Service {
insert_entry_in_blocks(
block.block_identifier.index as u32,
&compressed_block,
true,
&blocks_db_rw,
&ctx,
);