mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-13 16:19:01 +08:00
feat: batch ingestion, improve cleaning
This commit is contained in:
@@ -2318,7 +2318,7 @@ pub async fn rebuild_rocks_db(
|
||||
rx_thread_pool.push(rx);
|
||||
}
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Block data compression")
|
||||
let compression_thread = hiro_system_kit::thread_named("Block data compression")
|
||||
.spawn(move || {
|
||||
for rx in rx_thread_pool.into_iter() {
|
||||
let block_compressed_tx_moved = block_compressed_tx.clone();
|
||||
@@ -2328,7 +2328,7 @@ pub async fn rebuild_rocks_db(
|
||||
while let Ok(Some(block_bytes)) = rx.recv() {
|
||||
let raw_block_data = parse_downloaded_block(block_bytes).unwrap();
|
||||
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
|
||||
.expect("unable to serialize block");
|
||||
.expect("unable to compress block");
|
||||
let block_data = hord::parse_ordinals_and_standardize_block(
|
||||
raw_block_data,
|
||||
&moved_bitcoin_network,
|
||||
@@ -2347,7 +2347,7 @@ pub async fn rebuild_rocks_db(
|
||||
|
||||
let cloned_ctx = ctx.clone();
|
||||
|
||||
let _storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
|
||||
let storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
|
||||
.spawn(move || {
|
||||
let mut inbox = HashMap::new();
|
||||
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
|
||||
@@ -2367,7 +2367,7 @@ pub async fn rebuild_rocks_db(
|
||||
let mut chunk = Vec::new();
|
||||
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
info!(
|
||||
logger,
|
||||
"Dequeuing block #{inbox_cursor} for processing (# blocks inboxed: {})",
|
||||
inbox.len()
|
||||
@@ -2379,7 +2379,7 @@ pub async fn rebuild_rocks_db(
|
||||
if chunk.is_empty() {
|
||||
// Early return / wait for next block
|
||||
cloned_ctx.try_log(|logger| {
|
||||
slog::info!(logger, "Inboxing compacted block #{block_index}")
|
||||
info!(logger, "Inboxing compacted block #{block_index}")
|
||||
});
|
||||
continue;
|
||||
} else {
|
||||
@@ -2391,27 +2391,12 @@ pub async fn rebuild_rocks_db(
|
||||
|
||||
if blocks_processed == number_of_blocks_to_process {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
info!(
|
||||
logger,
|
||||
"Local block storage successfully seeded with #{blocks_processed} blocks"
|
||||
)
|
||||
});
|
||||
break;
|
||||
// match guard.report().build() {
|
||||
// Ok(report) => {
|
||||
// ctx.try_log(|logger| {
|
||||
// slog::info!(logger, "Generating report");
|
||||
// });
|
||||
|
||||
// let file = std::fs::File::create("hord-perf.svg").unwrap();
|
||||
// report.flamegraph(file).unwrap();
|
||||
// }
|
||||
// Err(e) => {
|
||||
// ctx.try_log(|logger| {
|
||||
// slog::error!(logger, "Reporting failed: {}", e.to_string());
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
}
|
||||
}
|
||||
()
|
||||
@@ -2437,6 +2422,41 @@ pub async fn rebuild_rocks_db(
|
||||
thread_index = (thread_index + 1) % hord_config.ingestion_thread_max;
|
||||
}
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Gargbage collecting will start"
|
||||
)
|
||||
});
|
||||
|
||||
for tx in tx_thread_pool.iter() {
|
||||
let _ = tx.send(None);
|
||||
}
|
||||
let _ = compression_thread.join();
|
||||
let _ = storage_thread.join();
|
||||
let _ = set.shutdown();
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Gargbage collecting did finish"
|
||||
)
|
||||
});
|
||||
|
||||
// match guard.report().build() {
|
||||
// Ok(report) => {
|
||||
// ctx.try_log(|logger| {
|
||||
// slog::info!(logger, "Generating report");
|
||||
// });
|
||||
// let file = std::fs::File::create("hord-perf.svg").unwrap();
|
||||
// report.flamegraph(file).unwrap();
|
||||
// }
|
||||
// Err(e) => {
|
||||
// ctx.try_log(|logger| {
|
||||
// slog::error!(logger, "Reporting failed: {}", e.to_string());
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -143,7 +143,7 @@ impl Service {
|
||||
rebuild_rocks_db(
|
||||
&self.config,
|
||||
start_block,
|
||||
end_block,
|
||||
end_block.min(start_block + 256),
|
||||
hord_config.first_inscription_height,
|
||||
Some(tx.clone()),
|
||||
&self.ctx,
|
||||
|
||||
Reference in New Issue
Block a user