mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 16:52:57 +08:00
fix: reopen connect on failures
This commit is contained in:
@@ -664,11 +664,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
let config =
|
||||
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
|
||||
|
||||
let hord_db_conn =
|
||||
open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
|
||||
.unwrap();
|
||||
|
||||
let tip_height = find_last_block_inserted(&hord_db_conn) as u64;
|
||||
let tip_height = {
|
||||
let hord_db_conn =
|
||||
open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
|
||||
.unwrap();
|
||||
find_last_block_inserted(&hord_db_conn) as u64
|
||||
};
|
||||
if cmd.block_height > tip_height {
|
||||
perform_hord_db_update(
|
||||
tip_height,
|
||||
@@ -691,7 +692,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
let transaction_identifier = TransactionIdentifier { hash: txid.clone() };
|
||||
let traversals_cache = new_traversals_lazy_cache(1024);
|
||||
let traversal = retrieve_satoshi_point_using_lazy_storage(
|
||||
&hord_db_conn,
|
||||
&config.expected_cache_path(),
|
||||
&block_identifier,
|
||||
&transaction_identifier,
|
||||
0,
|
||||
@@ -859,7 +860,9 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
|
||||
let mut missing_blocks = vec![];
|
||||
for i in 1..=790000 {
|
||||
if find_lazy_block_at_block_height(i, 3, &blocks_db_rw, &ctx).is_none() {
|
||||
if find_lazy_block_at_block_height(i, 3, false, &blocks_db_rw, &ctx)
|
||||
.is_none()
|
||||
{
|
||||
println!("Missing block {i}");
|
||||
missing_blocks.push(i);
|
||||
}
|
||||
|
||||
@@ -80,7 +80,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
let blocks_db_rw =
|
||||
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
|
||||
|
||||
if find_lazy_block_at_block_height(end_block as u32, 3, &blocks_db_rw, &ctx).is_none() {
|
||||
if find_lazy_block_at_block_height(end_block as u32, 3, false, &blocks_db_rw, &ctx)
|
||||
.is_none()
|
||||
{
|
||||
// Count how many entries in the table
|
||||
// Compute the right interval
|
||||
// Start the build local storage routine
|
||||
|
||||
@@ -107,7 +107,10 @@ impl Service {
|
||||
};
|
||||
|
||||
// Download and ingest a Stacks dump
|
||||
let _ = consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await;
|
||||
if self.config.rely_on_remote_stacks_tsv() {
|
||||
let _ =
|
||||
consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await;
|
||||
}
|
||||
|
||||
// Download and ingest a Ordinal dump, if hord is enabled
|
||||
if !hord_disabled {
|
||||
|
||||
@@ -206,6 +206,21 @@ pub fn open_readonly_hord_db_conn_rocks_db(
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
pub fn open_readonly_hord_db_conn_rocks_db_loop(base_dir: &PathBuf, ctx: &Context) -> DB {
|
||||
let blocks_db = loop {
|
||||
match open_readonly_hord_db_conn_rocks_db(&base_dir, &ctx) {
|
||||
Ok(db) => break db,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::warn!(logger, "Unable to open db: {e}",);
|
||||
});
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
blocks_db
|
||||
}
|
||||
|
||||
pub fn open_readwrite_hord_dbs(
|
||||
base_dir: &PathBuf,
|
||||
ctx: &Context,
|
||||
@@ -251,6 +266,7 @@ pub fn find_last_block_inserted(blocks_db: &DB) -> u32 {
|
||||
pub fn find_lazy_block_at_block_height(
|
||||
block_height: u32,
|
||||
retry: u8,
|
||||
try_iterator: bool,
|
||||
blocks_db: &DB,
|
||||
ctx: &Context,
|
||||
) -> Option<LazyBlock> {
|
||||
@@ -265,7 +281,7 @@ pub fn find_lazy_block_at_block_height(
|
||||
match blocks_db.get(block_height.to_be_bytes()) {
|
||||
Ok(Some(res)) => return Some(LazyBlock::new(res)),
|
||||
_ => {
|
||||
if attempt == 1 {
|
||||
if attempt == 1 && try_iterator {
|
||||
ctx.try_log(|logger| {
|
||||
slog::warn!(
|
||||
logger,
|
||||
@@ -971,7 +987,7 @@ pub fn parse_outpoint_to_watch(outpoint_to_watch: &str) -> (TransactionIdentifie
|
||||
}
|
||||
|
||||
pub fn retrieve_satoshi_point_using_lazy_storage(
|
||||
blocks_db: &DB,
|
||||
blocks_db_dir: &PathBuf,
|
||||
block_identifier: &BlockIdentifier,
|
||||
transaction_identifier: &TransactionIdentifier,
|
||||
input_index: usize,
|
||||
@@ -987,6 +1003,8 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
|
||||
let mut ordinal_block_number = block_identifier.index as u32;
|
||||
let txid = transaction_identifier.get_8_hash_bytes();
|
||||
|
||||
let mut blocks_db = open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
|
||||
|
||||
let (sats_ranges, inscription_offset_cross_outputs) = match traversals_cache
|
||||
.get(&(block_identifier.index as u32, txid.clone()))
|
||||
{
|
||||
@@ -997,21 +1015,38 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
|
||||
tx.get_cumulated_sats_in_until_input_index(input_index),
|
||||
)
|
||||
}
|
||||
None => match find_lazy_block_at_block_height(ordinal_block_number, 3, &blocks_db, &ctx) {
|
||||
None => {
|
||||
return Err(format!("block #{ordinal_block_number} not in database"));
|
||||
}
|
||||
Some(block) => match block.find_and_serialize_transaction_with_txid(&txid) {
|
||||
Some(tx) => {
|
||||
let sats_ranges = tx.get_sat_ranges();
|
||||
let inscription_offset_cross_outputs =
|
||||
tx.get_cumulated_sats_in_until_input_index(input_index);
|
||||
traversals_cache.insert((ordinal_block_number, txid.clone()), tx);
|
||||
(sats_ranges, inscription_offset_cross_outputs)
|
||||
None => {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
match find_lazy_block_at_block_height(
|
||||
ordinal_block_number,
|
||||
3,
|
||||
false,
|
||||
&blocks_db,
|
||||
&ctx,
|
||||
) {
|
||||
None => {
|
||||
if attempt < 3 {
|
||||
attempt += 1;
|
||||
blocks_db =
|
||||
open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
|
||||
} else {
|
||||
return Err(format!("block #{ordinal_block_number} not in database"));
|
||||
}
|
||||
}
|
||||
Some(block) => match block.find_and_serialize_transaction_with_txid(&txid) {
|
||||
Some(tx) => {
|
||||
let sats_ranges = tx.get_sat_ranges();
|
||||
let inscription_offset_cross_outputs =
|
||||
tx.get_cumulated_sats_in_until_input_index(input_index);
|
||||
traversals_cache.insert((ordinal_block_number, txid.clone()), tx);
|
||||
break (sats_ranges, inscription_offset_cross_outputs);
|
||||
}
|
||||
None => return Err(format!("txid not in block #{ordinal_block_number}")),
|
||||
},
|
||||
}
|
||||
None => return Err(format!("txid not in block #{ordinal_block_number}")),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (i, (min, max)) in sats_ranges.into_iter().enumerate() {
|
||||
@@ -1095,13 +1130,29 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
|
||||
}
|
||||
}
|
||||
|
||||
let lazy_block =
|
||||
match find_lazy_block_at_block_height(ordinal_block_number, 3, &blocks_db, &ctx) {
|
||||
Some(block) => block,
|
||||
None => {
|
||||
return Err(format!("block #{ordinal_block_number} not in database"));
|
||||
let lazy_block = {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
match find_lazy_block_at_block_height(
|
||||
ordinal_block_number,
|
||||
3,
|
||||
false,
|
||||
&blocks_db,
|
||||
&ctx,
|
||||
) {
|
||||
Some(block) => break block,
|
||||
None => {
|
||||
if attempt < 3 {
|
||||
attempt += 1;
|
||||
blocks_db =
|
||||
open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
|
||||
} else {
|
||||
return Err(format!("block #{ordinal_block_number} not in database"));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
let coinbase_txid = lazy_block.get_coinbase_txid();
|
||||
let txid = tx_cursor.0;
|
||||
|
||||
@@ -41,9 +41,10 @@ use crate::{
|
||||
|
||||
use self::db::{
|
||||
find_inscription_with_id, find_latest_cursed_inscription_number_at_block_height,
|
||||
find_latest_inscription_number_at_block_height, open_readonly_hord_db_conn_rocks_db,
|
||||
parse_satpoint_to_watch, remove_entry_from_blocks, remove_entry_from_inscriptions, LazyBlock,
|
||||
LazyBlockTransaction, TraversalResult, WatchedSatpoint,
|
||||
find_latest_inscription_number_at_block_height,
|
||||
parse_satpoint_to_watch, remove_entry_from_blocks,
|
||||
remove_entry_from_inscriptions, LazyBlock, LazyBlockTransaction, TraversalResult,
|
||||
WatchedSatpoint,
|
||||
};
|
||||
use self::inscription::InscriptionParser;
|
||||
use self::ord::inscription_id::InscriptionId;
|
||||
@@ -276,27 +277,17 @@ pub fn retrieve_inscribed_satoshi_points_from_block(
|
||||
let block_identifier = block.block_identifier.clone();
|
||||
let moved_hord_db_path = hord_config.db_path.clone();
|
||||
let local_cache = traversals_cache.clone();
|
||||
traversal_data_pool.execute(move || loop {
|
||||
match open_readonly_hord_db_conn_rocks_db(&moved_hord_db_path, &moved_ctx) {
|
||||
Ok(blocks_db) => {
|
||||
let traversal = retrieve_satoshi_point_using_lazy_storage(
|
||||
&blocks_db,
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
input_index,
|
||||
0,
|
||||
local_cache,
|
||||
&moved_ctx,
|
||||
);
|
||||
let _ = moved_traversal_tx.send(traversal);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
moved_ctx.try_log(|logger| {
|
||||
slog::warn!(logger, "Unable to open db: {e}",);
|
||||
});
|
||||
}
|
||||
}
|
||||
traversal_data_pool.execute(move || {
|
||||
let traversal = retrieve_satoshi_point_using_lazy_storage(
|
||||
&moved_hord_db_path,
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
input_index,
|
||||
0,
|
||||
local_cache,
|
||||
&moved_ctx,
|
||||
);
|
||||
let _ = moved_traversal_tx.send(traversal);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user