mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-14 08:29:31 +08:00
fix: move parsing back to network thread
This commit is contained in:
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -501,7 +501,7 @@ dependencies = [
|
||||
"clarinet-files",
|
||||
"clarity-repl",
|
||||
"criterion",
|
||||
"crossbeam-channel 0.5.6",
|
||||
"crossbeam-channel 0.5.8",
|
||||
"csv",
|
||||
"ctrlc",
|
||||
"flate2",
|
||||
@@ -539,7 +539,7 @@ dependencies = [
|
||||
"clap_generate",
|
||||
"clarinet-utils",
|
||||
"clarity-repl",
|
||||
"crossbeam-channel 0.5.6",
|
||||
"crossbeam-channel 0.5.8",
|
||||
"ctrlc",
|
||||
"futures",
|
||||
"hex",
|
||||
@@ -970,9 +970,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.6"
|
||||
version = "0.5.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
|
||||
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"crossbeam-utils 0.8.14",
|
||||
@@ -3123,7 +3123,7 @@ version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
|
||||
dependencies = [
|
||||
"crossbeam-channel 0.5.6",
|
||||
"crossbeam-channel 0.5.8",
|
||||
"crossbeam-deque 0.8.2",
|
||||
"crossbeam-utils 0.8.14",
|
||||
"num_cpus",
|
||||
@@ -3944,7 +3944,7 @@ version = "2.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "766c59b252e62a34651412870ff55d8c4e6d04df19b43eecb2703e417b097ffe"
|
||||
dependencies = [
|
||||
"crossbeam-channel 0.5.6",
|
||||
"crossbeam-channel 0.5.8",
|
||||
"slog",
|
||||
"take_mut",
|
||||
"thread_local",
|
||||
|
||||
@@ -35,7 +35,7 @@ tar = "0.4.38"
|
||||
flume = "0.10.14"
|
||||
ansi_term = "0.12.1"
|
||||
atty = "0.2.14"
|
||||
crossbeam-channel = "0.5.6"
|
||||
crossbeam-channel = "0.5.8"
|
||||
uuid = { version = "1.3.0", features = ["v4", "fast-rng"] }
|
||||
threadpool = "1.8.1"
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ use threadpool::ThreadPool;
|
||||
|
||||
use crate::{
|
||||
indexer::bitcoin::{
|
||||
download_block_with_retry, parse_downloaded_block, retrieve_block_hash_with_retry,
|
||||
download_block_with_retry, retrieve_block_hash_with_retry,
|
||||
standardize_bitcoin_block, BitcoinBlockFullBreakdown,
|
||||
},
|
||||
observer::BitcoinConfig,
|
||||
@@ -786,7 +786,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
let future =
|
||||
download_block_with_retry(&block_hash, &moved_bitcoin_config, &moved_ctx);
|
||||
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
|
||||
let _ = block_data_tx.send(Some(block_data));
|
||||
let _ = block_data_tx.send(Some((block_height, block_hash, block_data)));
|
||||
});
|
||||
}
|
||||
let res = retrieve_block_data_pool.join();
|
||||
@@ -796,10 +796,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Block data compression")
|
||||
.spawn(move || {
|
||||
while let Ok(Some(downloaded_block)) = block_data_rx.recv() {
|
||||
while let Ok(Some((_, _, block_data))) = block_data_rx.recv() {
|
||||
let block_compressed_tx_moved = block_compressed_tx.clone();
|
||||
compress_block_data_pool.execute(move || {
|
||||
let block_data = parse_downloaded_block(downloaded_block).unwrap();
|
||||
let compressed_block = CompactedBlock::from_full_block(&block_data);
|
||||
let block_index = block_data.height as u32;
|
||||
let _ = block_compressed_tx_moved.send(Some((
|
||||
|
||||
@@ -174,11 +174,21 @@ pub async fn download_block_with_retry(
|
||||
block_hash: &str,
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<Vec<u8>, String> {
|
||||
) -> Result<BitcoinBlockFullBreakdown, String> {
|
||||
let mut errors_count = 0;
|
||||
let block = loop {
|
||||
match download_block(block_hash, bitcoin_config, ctx).await {
|
||||
Ok(result) => break result,
|
||||
Ok(result) => match parse_downloaded_block(result) {
|
||||
Ok(result) => break result,
|
||||
Err(e) => {
|
||||
errors_count += 1;
|
||||
error!(
|
||||
"unable to retrieve block #{block_hash} (attempt #{errors_count}): {}",
|
||||
e.to_string()
|
||||
);
|
||||
std::thread::sleep(std::time::Duration::from_millis(500));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
errors_count += 1;
|
||||
error!(
|
||||
|
||||
Reference in New Issue
Block a user