mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-14 08:29:31 +08:00
fix: boot sequence, logs, format
This commit is contained in:
@@ -726,13 +726,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
let mut hord_config = config.get_hord_config();
|
||||
hord_config.network_thread_max = cmd.network_threads;
|
||||
|
||||
rebuild_rocks_db(
|
||||
&config,
|
||||
cmd.start_block,
|
||||
cmd.end_block,
|
||||
&ctx,
|
||||
)
|
||||
.await?
|
||||
rebuild_rocks_db(&config, cmd.start_block, cmd.end_block, &ctx).await?
|
||||
}
|
||||
RepairCommand::Transfers(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
|
||||
@@ -7,7 +7,8 @@ use std::{
|
||||
|
||||
use chainhook_sdk::{
|
||||
indexer::bitcoin::{
|
||||
build_http_client, download_block_with_retry, retrieve_block_hash_with_retry, try_download_block_bytes_with_retry, parse_fetched_block, download_block, parse_downloaded_block,
|
||||
build_http_client, download_block, download_block_with_retry, parse_downloaded_block,
|
||||
parse_fetched_block, retrieve_block_hash_with_retry, try_download_block_bytes_with_retry,
|
||||
},
|
||||
types::{
|
||||
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData,
|
||||
@@ -21,18 +22,21 @@ use rand::{thread_rng, Rng};
|
||||
|
||||
use rocksdb::DB;
|
||||
use rusqlite::{Connection, OpenFlags, ToSql, Transaction};
|
||||
use tokio::task::JoinSet;
|
||||
use std::io::Cursor;
|
||||
use std::io::{Read, Write};
|
||||
use threadpool::ThreadPool;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
use chainhook_sdk::{
|
||||
indexer::bitcoin::BitcoinBlockFullBreakdown, observer::BitcoinConfig, utils::Context,
|
||||
};
|
||||
|
||||
use crate::{hord::{self, HordConfig}, config::Config};
|
||||
use crate::hord::{new_traversals_lazy_cache, update_hord_db_and_augment_bitcoin_block};
|
||||
use crate::ord::{height::Height, sat::Sat};
|
||||
use crate::{
|
||||
config::Config,
|
||||
hord::{self, HordConfig},
|
||||
};
|
||||
|
||||
fn get_default_hord_db_file_path(base_dir: &PathBuf) -> PathBuf {
|
||||
let mut destination_path = base_dir.clone();
|
||||
@@ -973,6 +977,10 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
};
|
||||
let _ = block_data_tx.send(res);
|
||||
});
|
||||
// TODO: remove this join?
|
||||
if block_height >= ordinal_computing_height {
|
||||
let _ = retrieve_block_data_pool.join();
|
||||
}
|
||||
}
|
||||
let res = retrieve_block_data_pool.join();
|
||||
res
|
||||
@@ -994,6 +1002,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
block_data,
|
||||
)));
|
||||
});
|
||||
if block_height >= ordinal_computing_height {
|
||||
let _ = compress_block_data_pool.join();
|
||||
}
|
||||
}
|
||||
let res = compress_block_data_pool.join();
|
||||
res
|
||||
@@ -1018,10 +1029,15 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
|
||||
// Should we start look for inscriptions data in blocks?
|
||||
if raw_block.height as u64 >= ordinal_computing_height {
|
||||
if cursor == 0 {
|
||||
if (cursor as u64) < ordinal_computing_height {
|
||||
cursor = raw_block.height;
|
||||
}
|
||||
ctx.try_log(|logger| slog::info!(logger, "Queueing compacted block #{block_height}",));
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Queueing compacted block #{block_height} (#{cursor})",
|
||||
)
|
||||
});
|
||||
// Is the action of processing a block allows us
|
||||
// to process more blocks present in the inbox?
|
||||
inbox.insert(raw_block.height, raw_block);
|
||||
@@ -1871,7 +1887,6 @@ pub async fn rebuild_rocks_db(
|
||||
let number_of_blocks_to_process = end_block - start_block + 1;
|
||||
let (block_req_lim, block_process_lim) = (128, 128);
|
||||
|
||||
|
||||
let (block_data_tx, block_data_rx) = crossbeam_channel::bounded(block_req_lim);
|
||||
let compress_block_data_pool = ThreadPool::new(hord_config.ingestion_thread_max);
|
||||
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(block_process_lim);
|
||||
@@ -1902,7 +1917,7 @@ pub async fn rebuild_rocks_db(
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Block data compression")
|
||||
.spawn(move || {
|
||||
while let Ok(Some(block_bytes)) = block_data_rx.recv() {
|
||||
while let Ok(Some(block_bytes)) = block_data_rx.recv() {
|
||||
let block_compressed_tx_moved = block_compressed_tx.clone();
|
||||
compress_block_data_pool.execute(move || {
|
||||
let block_data = parse_downloaded_block(block_bytes).unwrap();
|
||||
@@ -1919,29 +1934,32 @@ pub async fn rebuild_rocks_db(
|
||||
let res = compress_block_data_pool.join();
|
||||
res
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
let blocks_db_rw =
|
||||
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let blocks_db_rw = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let cloned_ctx = ctx.clone();
|
||||
let ingestion_thread = hiro_system_kit::thread_named("Block data ingestion")
|
||||
.spawn(move || {
|
||||
let mut blocks_stored = 0;
|
||||
let mut num_writes = 0;
|
||||
|
||||
while let Ok(Some((block_height, compacted_block, _raw_block))) = block_compressed_rx.recv() {
|
||||
|
||||
while let Ok(Some((block_height, compacted_block, _raw_block))) =
|
||||
block_compressed_rx.recv()
|
||||
{
|
||||
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &cloned_ctx);
|
||||
blocks_stored += 1;
|
||||
num_writes += 1;
|
||||
|
||||
|
||||
// In the context of ordinals, we're constrained to process blocks sequentially
|
||||
// Blocks are processed by a threadpool and could be coming out of order.
|
||||
// Inbox block for later if the current block is not the one we should be
|
||||
// processing.
|
||||
|
||||
|
||||
// Should we start look for inscriptions data in blocks?
|
||||
cloned_ctx.try_log(|logger| slog::info!(logger, "Storing compacted block #{block_height}",));
|
||||
|
||||
cloned_ctx.try_log(|logger| {
|
||||
slog::info!(logger, "Storing compacted block #{block_height}",)
|
||||
});
|
||||
|
||||
if blocks_stored == number_of_blocks_to_process {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
@@ -1949,13 +1967,13 @@ pub async fn rebuild_rocks_db(
|
||||
"Local block storage successfully seeded with #{blocks_stored} blocks"
|
||||
)
|
||||
});
|
||||
|
||||
|
||||
// match guard.report().build() {
|
||||
// Ok(report) => {
|
||||
// ctx.try_log(|logger| {
|
||||
// slog::info!(logger, "Generating report");
|
||||
// });
|
||||
|
||||
|
||||
// let file = std::fs::File::create("hord-perf.svg").unwrap();
|
||||
// report.flamegraph(file).unwrap();
|
||||
// }
|
||||
@@ -1966,7 +1984,7 @@ pub async fn rebuild_rocks_db(
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
|
||||
if num_writes % 128 == 0 {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
slog::info!(logger, "Flushing DB to disk ({num_writes} inserts)");
|
||||
@@ -1979,20 +1997,20 @@ pub async fn rebuild_rocks_db(
|
||||
num_writes = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
slog::error!(logger, "{}", e.to_string());
|
||||
});
|
||||
}
|
||||
()
|
||||
}).expect("unable to spawn thread");
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
while let Some(res) = set.join_next().await {
|
||||
let block = res.unwrap().unwrap();
|
||||
|
||||
let _ = block_data_tx
|
||||
.send(Some(block));
|
||||
let _ = block_data_tx.send(Some(block));
|
||||
|
||||
if let Some(block_height) = block_heights.pop_front() {
|
||||
let config = moved_config.clone();
|
||||
|
||||
@@ -437,6 +437,10 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
&inner_ctx,
|
||||
)?;
|
||||
|
||||
if !any_inscription_revealed && !any_inscription_transferred {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if discard_changes {
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
@@ -462,22 +466,21 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
)
|
||||
});
|
||||
}
|
||||
if any_inscription_revealed || any_inscription_transferred {
|
||||
let inscriptions_revealed = get_inscriptions_revealed_in_block(&new_block)
|
||||
.iter()
|
||||
.map(|d| d.inscription_number.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Block #{} processed through hord, revealing {} inscriptions [{}]",
|
||||
new_block.block_identifier.index,
|
||||
inscriptions_revealed.len(),
|
||||
inscriptions_revealed.join(", ")
|
||||
)
|
||||
});
|
||||
}
|
||||
let inscriptions_revealed = get_inscriptions_revealed_in_block(&new_block)
|
||||
.iter()
|
||||
.map(|d| d.inscription_number.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Block #{} processed through hord, revealing {} inscriptions [{}]",
|
||||
new_block.block_identifier.index,
|
||||
inscriptions_revealed.len(),
|
||||
inscriptions_revealed.join(", ")
|
||||
)
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
|
||||
use crate::db::{
|
||||
find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_locations,
|
||||
open_readwrite_hord_db_conn, open_readwrite_hord_dbs, parse_satpoint_to_watch,
|
||||
remove_entries_from_locations_at_block_height, rebuild_rocks_db,
|
||||
rebuild_rocks_db, remove_entries_from_locations_at_block_height,
|
||||
};
|
||||
use crate::hord::{
|
||||
new_traversals_lazy_cache, revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db,
|
||||
@@ -152,7 +152,7 @@ impl Service {
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
rebuild_rocks_db(&self.config, 420000, 767420, &self.ctx).await?;
|
||||
// rebuild_rocks_db(&self.config, 420000, 767420, &self.ctx).await?;
|
||||
|
||||
while let Some((start_block, end_block)) = should_sync_hord_db(&self.config, &self.ctx)? {
|
||||
if start_block == 0 {
|
||||
|
||||
Reference in New Issue
Block a user