fix: reopen connect on failures

This commit is contained in:
Ludo Galabru
2023-06-20 10:09:53 -04:00
parent ea1ff9aab4
commit 3e15da5565
5 changed files with 105 additions and 55 deletions

View File

@@ -664,11 +664,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
let config =
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
let hord_db_conn =
open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
.unwrap();
let tip_height = find_last_block_inserted(&hord_db_conn) as u64;
let tip_height = {
let hord_db_conn =
open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
.unwrap();
find_last_block_inserted(&hord_db_conn) as u64
};
if cmd.block_height > tip_height {
perform_hord_db_update(
tip_height,
@@ -691,7 +692,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
let transaction_identifier = TransactionIdentifier { hash: txid.clone() };
let traversals_cache = new_traversals_lazy_cache(1024);
let traversal = retrieve_satoshi_point_using_lazy_storage(
&hord_db_conn,
&config.expected_cache_path(),
&block_identifier,
&transaction_identifier,
0,
@@ -859,7 +860,9 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
let mut missing_blocks = vec![];
for i in 1..=790000 {
if find_lazy_block_at_block_height(i, 3, &blocks_db_rw, &ctx).is_none() {
if find_lazy_block_at_block_height(i, 3, false, &blocks_db_rw, &ctx)
.is_none()
{
println!("Missing block {i}");
missing_blocks.push(i);
}

View File

@@ -80,7 +80,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
if find_lazy_block_at_block_height(end_block as u32, 3, &blocks_db_rw, &ctx).is_none() {
if find_lazy_block_at_block_height(end_block as u32, 3, false, &blocks_db_rw, &ctx)
.is_none()
{
// Count how many entries in the table
// Compute the right interval
// Start the build local storage routine

View File

@@ -107,7 +107,10 @@ impl Service {
};
// Download and ingest a Stacks dump
let _ = consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await;
if self.config.rely_on_remote_stacks_tsv() {
let _ =
consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await;
}
// Download and ingest a Ordinal dump, if hord is enabled
if !hord_disabled {

View File

@@ -206,6 +206,21 @@ pub fn open_readonly_hord_db_conn_rocks_db(
Ok(db)
}
pub fn open_readonly_hord_db_conn_rocks_db_loop(base_dir: &PathBuf, ctx: &Context) -> DB {
let blocks_db = loop {
match open_readonly_hord_db_conn_rocks_db(&base_dir, &ctx) {
Ok(db) => break db,
Err(e) => {
ctx.try_log(|logger| {
slog::warn!(logger, "Unable to open db: {e}",);
});
continue;
}
}
};
blocks_db
}
pub fn open_readwrite_hord_dbs(
base_dir: &PathBuf,
ctx: &Context,
@@ -251,6 +266,7 @@ pub fn find_last_block_inserted(blocks_db: &DB) -> u32 {
pub fn find_lazy_block_at_block_height(
block_height: u32,
retry: u8,
try_iterator: bool,
blocks_db: &DB,
ctx: &Context,
) -> Option<LazyBlock> {
@@ -265,7 +281,7 @@ pub fn find_lazy_block_at_block_height(
match blocks_db.get(block_height.to_be_bytes()) {
Ok(Some(res)) => return Some(LazyBlock::new(res)),
_ => {
if attempt == 1 {
if attempt == 1 && try_iterator {
ctx.try_log(|logger| {
slog::warn!(
logger,
@@ -971,7 +987,7 @@ pub fn parse_outpoint_to_watch(outpoint_to_watch: &str) -> (TransactionIdentifie
}
pub fn retrieve_satoshi_point_using_lazy_storage(
blocks_db: &DB,
blocks_db_dir: &PathBuf,
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
input_index: usize,
@@ -987,6 +1003,8 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
let mut ordinal_block_number = block_identifier.index as u32;
let txid = transaction_identifier.get_8_hash_bytes();
let mut blocks_db = open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
let (sats_ranges, inscription_offset_cross_outputs) = match traversals_cache
.get(&(block_identifier.index as u32, txid.clone()))
{
@@ -997,21 +1015,38 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
tx.get_cumulated_sats_in_until_input_index(input_index),
)
}
None => match find_lazy_block_at_block_height(ordinal_block_number, 3, &blocks_db, &ctx) {
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
}
Some(block) => match block.find_and_serialize_transaction_with_txid(&txid) {
Some(tx) => {
let sats_ranges = tx.get_sat_ranges();
let inscription_offset_cross_outputs =
tx.get_cumulated_sats_in_until_input_index(input_index);
traversals_cache.insert((ordinal_block_number, txid.clone()), tx);
(sats_ranges, inscription_offset_cross_outputs)
None => {
let mut attempt = 0;
loop {
match find_lazy_block_at_block_height(
ordinal_block_number,
3,
false,
&blocks_db,
&ctx,
) {
None => {
if attempt < 3 {
attempt += 1;
blocks_db =
open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
} else {
return Err(format!("block #{ordinal_block_number} not in database"));
}
}
Some(block) => match block.find_and_serialize_transaction_with_txid(&txid) {
Some(tx) => {
let sats_ranges = tx.get_sat_ranges();
let inscription_offset_cross_outputs =
tx.get_cumulated_sats_in_until_input_index(input_index);
traversals_cache.insert((ordinal_block_number, txid.clone()), tx);
break (sats_ranges, inscription_offset_cross_outputs);
}
None => return Err(format!("txid not in block #{ordinal_block_number}")),
},
}
None => return Err(format!("txid not in block #{ordinal_block_number}")),
},
},
}
}
};
for (i, (min, max)) in sats_ranges.into_iter().enumerate() {
@@ -1095,13 +1130,29 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
}
}
let lazy_block =
match find_lazy_block_at_block_height(ordinal_block_number, 3, &blocks_db, &ctx) {
Some(block) => block,
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
let lazy_block = {
let mut attempt = 0;
loop {
match find_lazy_block_at_block_height(
ordinal_block_number,
3,
false,
&blocks_db,
&ctx,
) {
Some(block) => break block,
None => {
if attempt < 3 {
attempt += 1;
blocks_db =
open_readonly_hord_db_conn_rocks_db_loop(&blocks_db_dir, &ctx);
} else {
return Err(format!("block #{ordinal_block_number} not in database"));
}
}
}
};
}
};
let coinbase_txid = lazy_block.get_coinbase_txid();
let txid = tx_cursor.0;

View File

@@ -41,9 +41,10 @@ use crate::{
use self::db::{
find_inscription_with_id, find_latest_cursed_inscription_number_at_block_height,
find_latest_inscription_number_at_block_height, open_readonly_hord_db_conn_rocks_db,
parse_satpoint_to_watch, remove_entry_from_blocks, remove_entry_from_inscriptions, LazyBlock,
LazyBlockTransaction, TraversalResult, WatchedSatpoint,
find_latest_inscription_number_at_block_height,
parse_satpoint_to_watch, remove_entry_from_blocks,
remove_entry_from_inscriptions, LazyBlock, LazyBlockTransaction, TraversalResult,
WatchedSatpoint,
};
use self::inscription::InscriptionParser;
use self::ord::inscription_id::InscriptionId;
@@ -276,27 +277,17 @@ pub fn retrieve_inscribed_satoshi_points_from_block(
let block_identifier = block.block_identifier.clone();
let moved_hord_db_path = hord_config.db_path.clone();
let local_cache = traversals_cache.clone();
traversal_data_pool.execute(move || loop {
match open_readonly_hord_db_conn_rocks_db(&moved_hord_db_path, &moved_ctx) {
Ok(blocks_db) => {
let traversal = retrieve_satoshi_point_using_lazy_storage(
&blocks_db,
&block_identifier,
&transaction_id,
input_index,
0,
local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send(traversal);
break;
}
Err(e) => {
moved_ctx.try_log(|logger| {
slog::warn!(logger, "Unable to open db: {e}",);
});
}
}
traversal_data_pool.execute(move || {
let traversal = retrieve_satoshi_point_using_lazy_storage(
&moved_hord_db_path,
&block_identifier,
&transaction_id,
input_index,
0,
local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send(traversal);
});
}