fix: patch crach

This commit is contained in:
Ludo Galabru
2023-08-03 15:09:51 +02:00
parent 832b4dd7ff
commit 20d9df6c65
6 changed files with 35 additions and 30 deletions

View File

@@ -666,7 +666,9 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
RepairCommand::Transfers(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let service = Service::new(config, ctx.clone());
service.replay_transfers(cmd.start_block, cmd.end_block, None).await?;
service
.replay_transfers(cmd.start_block, cmd.end_block, None)
.await?;
}
},
Command::Db(HordDbCommand::Check(cmd)) => {

View File

@@ -157,12 +157,13 @@ pub async fn download_and_pipeline_blocks(
loop {
// Dequeue all the blocks available
let mut new_blocks = vec![];
while let Ok(Some((block_height, block, compacted_block))) = block_compressed_rx.try_recv()
while let Ok(Some((block_height, block, compacted_block))) =
block_compressed_rx.try_recv()
{
blocks_processed += 1;
new_blocks.push((block_height, block, compacted_block));
if new_blocks.len() >= 10_000 {
break
break;
}
}
let mut ooo_compacted_blocks = vec![];
@@ -181,7 +182,10 @@ pub async fn download_and_pipeline_blocks(
let _ = blocks_tx.send(PostProcessorCommand::Start);
}
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(ooo_compacted_blocks, vec![]));
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
ooo_compacted_blocks,
vec![],
));
}
}
@@ -206,14 +210,17 @@ pub async fn download_and_pipeline_blocks(
post_seq_processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks));
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
compacted_blocks,
blocks,
));
}
} else {
if blocks_processed == number_of_blocks_to_process {
cloned_ctx.try_log(|logger| {
info!(
logger,
"Local block storage successfully seeded with #{blocks_processed} blocks"
"#{blocks_processed} blocks successfully sent to processor"
)
});
break;

View File

@@ -1,5 +1,5 @@
use std::{
sync::{Arc},
sync::Arc,
thread::{sleep, JoinHandle},
time::Duration,
};
@@ -12,7 +12,7 @@ use chainhook_sdk::{
},
utils::Context,
};
use crossbeam_channel::{TryRecvError, Sender};
use crossbeam_channel::{Sender, TryRecvError};
use dashmap::DashMap;
use fxhash::FxHasher;

View File

@@ -443,7 +443,6 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_t
};
for (tx_index, new_tx) in block.transactions.iter_mut().skip(1).enumerate() {
for (input_index, input) in new_tx.metadata.inputs.iter().enumerate() {
let outpoint_pre_transfer = format_outpoint_to_watch(
&input.previous_output.txid,
input.previous_output.vout as usize,

View File

@@ -751,19 +751,25 @@ pub fn find_all_inscriptions_in_block(
let inscription_id: String = row.get(2).unwrap();
let (transaction_identifier_inscription, inscription_input_index) =
{ parse_inscription_id(&inscription_id) };
let transfer_data = transfers_data
let Some(transfer_data) = transfers_data
.get(&inscription_id)
.unwrap()
.first()
.unwrap()
.clone();
.and_then(|entries| entries.first()) else {
ctx.try_log(|logger| {
error!(
logger,
"unable to retrieve inscription genesis transfer data: {}",
inscription_id,
)
});
continue;
};
let traversal = TraversalResult {
inscription_number,
ordinal_number,
inscription_input_index,
transfers: 0,
transaction_identifier_inscription: transaction_identifier_inscription.clone(),
transfer_data: transfer_data,
transfer_data: transfer_data.clone(),
};
results.insert(
(transaction_identifier_inscription, inscription_input_index),

View File

@@ -1,46 +1,36 @@
mod http_api;
mod runloops;
use crate::cli::fetch_and_standardize_block;
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::core::pipeline::processors::inscription_indexing::process_blocks;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor;
use crate::core::pipeline::{download_and_pipeline_blocks, PostProcessorCommand};
use crate::core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx;
use crate::core::{
new_traversals_lazy_cache, parse_inscriptions_in_standardized_block,
revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db,
};
use crate::db::{
find_all_inscriptions_in_block, find_latest_inscription_block_height, format_satpoint_to_watch,
insert_entry_in_blocks, insert_entry_in_locations, open_readonly_hord_db_conn,
open_readwrite_hord_db_conn, open_readwrite_hord_dbs, parse_satpoint_to_watch,
remove_entries_from_locations_at_block_height, InscriptionHeigthHint, LazyBlock, initialize_hord_db,
find_latest_inscription_block_height, initialize_hord_db, insert_entry_in_blocks,
open_readonly_hord_db_conn, open_readwrite_hord_dbs, InscriptionHeigthHint, LazyBlock,
};
use crate::scan::bitcoin::process_block_with_predicates;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::start_bitcoin_scan_runloop;
use chainhook_sdk::bitcoincore_rpc_json::bitcoin::hashes::hex::FromHex;
use chainhook_sdk::bitcoincore_rpc_json::bitcoin::{Address, Network, Script};
use chainhook_sdk::chainhooks::types::{
BitcoinChainhookSpecification, ChainhookConfig, ChainhookFullSpecification,
ChainhookSpecification,
};
use chainhook_sdk::indexer::bitcoin::build_http_client;
use chainhook_sdk::observer::{
start_event_observer, BitcoinConfig, EventObserverConfig, HandleBlock, ObserverEvent,
};
use chainhook_sdk::types::{
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation,
start_event_observer, EventObserverConfig, HandleBlock, ObserverEvent,
};
use chainhook_sdk::types::BitcoinBlockData;
use chainhook_sdk::utils::Context;
use crossbeam_channel::unbounded;
use redis::{Commands, Connection};
use std::collections::BTreeMap;
use std::sync::mpsc::{channel, Sender};
use std::sync::Arc;
@@ -126,7 +116,8 @@ impl Service {
}
};
self.replay_transfers(775808, tip, Some(tx_replayer.clone())).await?;
self.replay_transfers(775808, tip, Some(tx_replayer.clone()))
.await?;
self.update_state(Some(tx_replayer.clone())).await?;
// Catch-up with chain tip