feat: batch ingestion, improve cleaning

This commit is contained in:
Ludo Galabru
2023-08-01 02:08:05 +02:00
parent 41ecacee0e
commit 168162e0dd
2 changed files with 42 additions and 22 deletions

View File

@@ -2318,7 +2318,7 @@ pub async fn rebuild_rocks_db(
rx_thread_pool.push(rx);
}
let _ = hiro_system_kit::thread_named("Block data compression")
let compression_thread = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
for rx in rx_thread_pool.into_iter() {
let block_compressed_tx_moved = block_compressed_tx.clone();
@@ -2328,7 +2328,7 @@ pub async fn rebuild_rocks_db(
while let Ok(Some(block_bytes)) = rx.recv() {
let raw_block_data = parse_downloaded_block(block_bytes).unwrap();
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
.expect("unable to serialize block");
.expect("unable to compress block");
let block_data = hord::parse_ordinals_and_standardize_block(
raw_block_data,
&moved_bitcoin_network,
@@ -2347,7 +2347,7 @@ pub async fn rebuild_rocks_db(
let cloned_ctx = ctx.clone();
let _storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
let storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
.spawn(move || {
let mut inbox = HashMap::new();
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
@@ -2367,7 +2367,7 @@ pub async fn rebuild_rocks_db(
let mut chunk = Vec::new();
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
cloned_ctx.try_log(|logger| {
slog::info!(
info!(
logger,
"Dequeuing block #{inbox_cursor} for processing (# blocks inboxed: {})",
inbox.len()
@@ -2379,7 +2379,7 @@ pub async fn rebuild_rocks_db(
if chunk.is_empty() {
// Early return / wait for next block
cloned_ctx.try_log(|logger| {
slog::info!(logger, "Inboxing compacted block #{block_index}")
info!(logger, "Inboxing compacted block #{block_index}")
});
continue;
} else {
@@ -2391,27 +2391,12 @@ pub async fn rebuild_rocks_db(
if blocks_processed == number_of_blocks_to_process {
cloned_ctx.try_log(|logger| {
slog::info!(
info!(
logger,
"Local block storage successfully seeded with #{blocks_processed} blocks"
)
});
break;
// match guard.report().build() {
// Ok(report) => {
// ctx.try_log(|logger| {
// slog::info!(logger, "Generating report");
// });
// let file = std::fs::File::create("hord-perf.svg").unwrap();
// report.flamegraph(file).unwrap();
// }
// Err(e) => {
// ctx.try_log(|logger| {
// slog::error!(logger, "Reporting failed: {}", e.to_string());
// });
// }
// }
}
}
()
@@ -2437,6 +2422,41 @@ pub async fn rebuild_rocks_db(
thread_index = (thread_index + 1) % hord_config.ingestion_thread_max;
}
ctx.try_log(|logger| {
info!(
logger,
"Gargbage collecting will start"
)
});
for tx in tx_thread_pool.iter() {
let _ = tx.send(None);
}
let _ = compression_thread.join();
let _ = storage_thread.join();
let _ = set.shutdown();
ctx.try_log(|logger| {
info!(
logger,
"Gargbage collecting did finish"
)
});
// match guard.report().build() {
// Ok(report) => {
// ctx.try_log(|logger| {
// slog::info!(logger, "Generating report");
// });
// let file = std::fs::File::create("hord-perf.svg").unwrap();
// report.flamegraph(file).unwrap();
// }
// Err(e) => {
// ctx.try_log(|logger| {
// slog::error!(logger, "Reporting failed: {}", e.to_string());
// });
// }
// }
Ok(())
}

View File

@@ -143,7 +143,7 @@ impl Service {
rebuild_rocks_db(
&self.config,
start_block,
end_block,
end_block.min(start_block + 256),
hord_config.first_inscription_height,
Some(tx.clone()),
&self.ctx,