fix: move parsing back to network thread

This commit is contained in:
Ludo Galabru
2023-04-17 23:44:17 -04:00
parent d80b1afdeb
commit bad1ee6d4e
4 changed files with 22 additions and 13 deletions

12
Cargo.lock generated
View File

@@ -501,7 +501,7 @@ dependencies = [
"clarinet-files",
"clarity-repl",
"criterion",
"crossbeam-channel 0.5.6",
"crossbeam-channel 0.5.8",
"csv",
"ctrlc",
"flate2",
@@ -539,7 +539,7 @@ dependencies = [
"clap_generate",
"clarinet-utils",
"clarity-repl",
"crossbeam-channel 0.5.6",
"crossbeam-channel 0.5.8",
"ctrlc",
"futures",
"hex",
@@ -970,9 +970,9 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils 0.8.14",
@@ -3123,7 +3123,7 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
dependencies = [
"crossbeam-channel 0.5.6",
"crossbeam-channel 0.5.8",
"crossbeam-deque 0.8.2",
"crossbeam-utils 0.8.14",
"num_cpus",
@@ -3944,7 +3944,7 @@ version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "766c59b252e62a34651412870ff55d8c4e6d04df19b43eecb2703e417b097ffe"
dependencies = [
"crossbeam-channel 0.5.6",
"crossbeam-channel 0.5.8",
"slog",
"take_mut",
"thread_local",

View File

@@ -35,7 +35,7 @@ tar = "0.4.38"
flume = "0.10.14"
ansi_term = "0.12.1"
atty = "0.2.14"
crossbeam-channel = "0.5.6"
crossbeam-channel = "0.5.8"
uuid = { version = "1.3.0", features = ["v4", "fast-rng"] }
threadpool = "1.8.1"

View File

@@ -14,7 +14,7 @@ use threadpool::ThreadPool;
use crate::{
indexer::bitcoin::{
download_block_with_retry, parse_downloaded_block, retrieve_block_hash_with_retry,
download_block_with_retry, retrieve_block_hash_with_retry,
standardize_bitcoin_block, BitcoinBlockFullBreakdown,
},
observer::BitcoinConfig,
@@ -786,7 +786,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
let future =
download_block_with_retry(&block_hash, &moved_bitcoin_config, &moved_ctx);
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
let _ = block_data_tx.send(Some(block_data));
let _ = block_data_tx.send(Some((block_height, block_hash, block_data)));
});
}
let res = retrieve_block_data_pool.join();
@@ -796,10 +796,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
let _ = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
while let Ok(Some(downloaded_block)) = block_data_rx.recv() {
while let Ok(Some((_, _, block_data))) = block_data_rx.recv() {
let block_compressed_tx_moved = block_compressed_tx.clone();
compress_block_data_pool.execute(move || {
let block_data = parse_downloaded_block(downloaded_block).unwrap();
let compressed_block = CompactedBlock::from_full_block(&block_data);
let block_index = block_data.height as u32;
let _ = block_compressed_tx_moved.send(Some((

View File

@@ -174,11 +174,21 @@ pub async fn download_block_with_retry(
block_hash: &str,
bitcoin_config: &BitcoinConfig,
ctx: &Context,
) -> Result<Vec<u8>, String> {
) -> Result<BitcoinBlockFullBreakdown, String> {
let mut errors_count = 0;
let block = loop {
match download_block(block_hash, bitcoin_config, ctx).await {
Ok(result) => break result,
Ok(result) => match parse_downloaded_block(result) {
Ok(result) => break result,
Err(e) => {
errors_count += 1;
error!(
"unable to retrieve block #{block_hash} (attempt #{errors_count}): {}",
e.to_string()
);
std::thread::sleep(std::time::Duration::from_millis(500));
}
},
Err(e) => {
errors_count += 1;
error!(