refactor: move from multithread to local pool

This commit is contained in:
Ludo Galabru
2023-07-22 00:57:01 -04:00
parent c9c43ae3e3
commit 57a46ec00a
4 changed files with 145 additions and 165 deletions

47
Cargo.lock generated
View File

@@ -452,8 +452,6 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chainhook-sdk"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "462ce230bfc1cd6570fa3717d6ef44a0f79729db542dc4f86efd7ad386aa159c"
dependencies = [
"base58 0.2.0",
"base64",
@@ -472,7 +470,7 @@ dependencies = [
"rand 0.8.5",
"reqwest",
"rocket",
"schemars 0.8.11",
"schemars 0.8.12",
"serde",
"serde-hex",
"serde_derive",
@@ -486,11 +484,9 @@ dependencies = [
[[package]]
name = "chainhook-types"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e205711f1d01227cd9239754309c9ff2ef6a637faaf737b4642570753980ee5"
dependencies = [
"hex",
"schemars 0.8.11",
"schemars 0.8.12",
"serde",
"serde_derive",
"serde_json",
@@ -1387,9 +1383,9 @@ dependencies = [
[[package]]
name = "futures"
version = "0.3.25"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [
"futures-channel",
"futures-core",
@@ -1402,9 +1398,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.25"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
dependencies = [
"futures-core",
"futures-sink",
@@ -1412,15 +1408,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.25"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
[[package]]
name = "futures-executor"
version = "0.3.25"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
dependencies = [
"futures-core",
"futures-task",
@@ -1429,38 +1425,38 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.25"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
[[package]]
name = "futures-macro"
version = "0.3.25"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.105",
"syn 2.0.18",
]
[[package]]
name = "futures-sink"
version = "0.3.25"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
[[package]]
name = "futures-task"
version = "0.3.25"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
[[package]]
name = "futures-util"
version = "0.3.25"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [
"futures-channel",
"futures-core",
@@ -1715,6 +1711,7 @@ dependencies = [
"dashmap 5.4.0",
"flate2",
"flume",
"futures",
"futures-util",
"fxhash",
"hex",

View File

@@ -14,7 +14,8 @@ redis = "0.21.5"
serde-redis = "0.12.0"
hex = "0.4.3"
rand = "0.8.5"
chainhook-sdk = { version = "=0.7.7", default-features = false, features = ["zeromq"] }
# chainhook-sdk = { version = "=0.7.7", default-features = false, features = ["zeromq"] }
chainhook-sdk = { version = "=0.7.7", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
hiro-system-kit = "0.1.0"
clap = { version = "3.2.23", features = ["derive"], optional = true }
clap_generate = { version = "3.0.3", optional = true }
@@ -40,6 +41,7 @@ anyhow = { version = "1.0.56", features = ["backtrace"] }
schemars = { version = "0.8.10", git = "https://github.com/hirosystems/schemars.git", branch = "feat-chainhook-fixes" }
pprof = { version = "0.12", features = ["flamegraph"] }
progressing = '3'
futures = "0.3.28"
[dependencies.rocksdb]
version = "0.20.1"

View File

@@ -728,22 +728,10 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;
let bitcoin_config = BitcoinConfig {
username: config.network.bitcoind_rpc_username.clone(),
password: config.network.bitcoind_rpc_password.clone(),
rpc_url: config.network.bitcoind_rpc_url.clone(),
network: config.network.bitcoin_network.clone(),
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
};
let blocks_db =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
rebuild_rocks_db(
&bitcoin_config,
&blocks_db,
&config,
cmd.start_block,
cmd.end_block,
&config.get_hord_config(),
&ctx,
)
.await?

View File

@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, VecDeque},
hash::BuildHasherDefault,
path::PathBuf,
sync::{mpsc::Sender, Arc},
@@ -7,7 +7,7 @@ use std::{
use chainhook_sdk::{
indexer::bitcoin::{
build_http_client, download_block_with_retry, retrieve_block_hash_with_retry,
build_http_client, download_block_with_retry, retrieve_block_hash_with_retry, try_fetch_block_bytes_with_retry, parse_downloaded_block,
},
types::{
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData,
@@ -21,6 +21,7 @@ use rand::{thread_rng, Rng};
use rocksdb::DB;
use rusqlite::{Connection, OpenFlags, ToSql, Transaction};
use tokio::task::JoinSet;
use std::io::Cursor;
use std::io::{Read, Write};
use threadpool::ThreadPool;
@@ -29,7 +30,7 @@ use chainhook_sdk::{
indexer::bitcoin::BitcoinBlockFullBreakdown, observer::BitcoinConfig, utils::Context,
};
use crate::hord::{self, HordConfig};
use crate::{hord::{self, HordConfig}, config::Config};
use crate::hord::{new_traversals_lazy_cache, update_hord_db_and_augment_bitcoin_block};
use crate::ord::{height::Height, sat::Sat};
@@ -1842,98 +1843,69 @@ impl<'a> Iterator for LazyBlockTransactionIterator<'a> {
}
pub async fn rebuild_rocks_db(
bitcoin_config: &BitcoinConfig,
blocks_db_rw: &DB,
config: &Config,
start_block: u64,
end_block: u64,
hord_config: &HordConfig,
ctx: &Context,
) -> Result<(), String> {
let guard = pprof::ProfilerGuardBuilder::default()
.frequency(20)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.unwrap();
// let guard = pprof::ProfilerGuardBuilder::default()
// .frequency(20)
// .blocklist(&["libc", "libgcc", "pthread", "vdso"])
// .build()
// .unwrap();
let bitcoin_config = BitcoinConfig {
username: config.network.bitcoind_rpc_username.clone(),
password: config.network.bitcoind_rpc_password.clone(),
rpc_url: config.network.bitcoind_rpc_url.clone(),
network: config.network.bitcoin_network.clone(),
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
};
let hord_config = config.get_hord_config();
ctx.try_log(|logger| {
slog::info!(logger, "Generating report");
});
let number_of_blocks_to_process = end_block - start_block + 1;
let (block_hash_req_lim, block_req_lim, block_process_lim) = (256, 128, 128);
let (block_req_lim, block_process_lim) = (128, 128);
let retrieve_block_hash_pool = ThreadPool::new(hord_config.network_thread_max);
let (block_hash_tx, block_hash_rx) = crossbeam_channel::bounded(block_hash_req_lim);
let retrieve_block_data_pool = ThreadPool::new(hord_config.network_thread_max);
let (block_data_tx, block_data_rx) = crossbeam_channel::bounded(block_req_lim);
let compress_block_data_pool = ThreadPool::new(hord_config.ingestion_thread_max);
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(block_process_lim);
let http_client = build_http_client();
// Thread pool #1: given a block height, retrieve the block hash
for block_cursor in start_block..=end_block {
let block_height = block_cursor.clone();
let block_hash_tx = block_hash_tx.clone();
let config = bitcoin_config.clone();
let moved_ctx = ctx.clone();
let moved_http_client = http_client.clone();
retrieve_block_hash_pool.execute(move || {
let future = retrieve_block_hash_with_retry(
&moved_http_client,
&block_height,
&config,
&moved_ctx,
);
let block_hash = hiro_system_kit::nestable_block_on(future).unwrap();
block_hash_tx
.send(Some((block_height, block_hash)))
.expect("unable to channel block_hash");
})
}
// Thread pool #2: given a block hash, retrieve the full block (verbosity max, including prevout)
let bitcoin_config = bitcoin_config.clone();
let moved_config = bitcoin_config.clone();
let moved_ctx = ctx.clone();
let block_data_tx_moved = block_data_tx.clone();
let _ = hiro_system_kit::thread_named("Block data retrieval")
.spawn(move || {
while let Ok(Some((block_height, block_hash))) = block_hash_rx.recv() {
let moved_bitcoin_config = bitcoin_config.clone();
let block_data_tx = block_data_tx_moved.clone();
let moved_ctx = moved_ctx.clone();
let moved_http_client = http_client.clone();
retrieve_block_data_pool.execute(move || {
moved_ctx
.try_log(|logger| slog::debug!(logger, "Fetching block #{block_height}"));
let future = download_block_with_retry(
&moved_http_client,
&block_hash,
&moved_bitcoin_config,
&moved_ctx,
);
let res = match hiro_system_kit::nestable_block_on(future) {
Ok(block_data) => Some(block_data),
Err(e) => {
moved_ctx.try_log(|logger| {
slog::error!(logger, "unable to fetch block #{block_height}: {e}")
});
None
}
};
let _ = block_data_tx.send(res);
});
}
let res = retrieve_block_data_pool.join();
res
})
.expect("unable to spawn thread");
let moved_http_client = http_client.clone();
let mut set = JoinSet::new();
let mut block_heights = VecDeque::from((start_block..=end_block).collect::<Vec<u64>>());
for _ in 0..hord_config.network_thread_max {
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = moved_ctx.clone();
let http_client = moved_http_client.clone();
set.spawn(try_fetch_block_bytes_with_retry(
http_client,
block_height,
config,
ctx,
));
}
}
let _ = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
while let Ok(Some(block_data)) = block_data_rx.recv() {
while let Ok(Some(block_bytes)) = block_data_rx.recv() {
let block_compressed_tx_moved = block_compressed_tx.clone();
compress_block_data_pool.execute(move || {
let block_data = parse_downloaded_block(block_bytes).unwrap();
let compressed_block =
LazyBlock::from_full_block(&block_data).expect("unable to serialize block");
let block_index = block_data.height as u32;
@@ -1947,72 +1919,93 @@ pub async fn rebuild_rocks_db(
let res = compress_block_data_pool.join();
res
})
.expect("unable to spawn thread");
.expect("unable to spawn thread");
let mut blocks_stored = 0;
let mut num_writes = 0;
while let Ok(Some((block_height, compacted_block, _raw_block))) = block_compressed_rx.recv() {
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &ctx);
blocks_stored += 1;
num_writes += 1;
// In the context of ordinals, we're constrained to process blocks sequentially
// Blocks are processed by a threadpool and could be coming out of order.
// Inbox block for later if the current block is not the one we should be
// processing.
// Should we start look for inscriptions data in blocks?
ctx.try_log(|logger| slog::info!(logger, "Storing compacted block #{block_height}",));
if blocks_stored == number_of_blocks_to_process {
let _ = block_data_tx.send(None);
let _ = block_hash_tx.send(None);
ctx.try_log(|logger| {
slog::info!(
logger,
"Local block storage successfully seeded with #{blocks_stored} blocks"
)
});
match guard.report().build() {
Ok(report) => {
ctx.try_log(|logger| {
slog::info!(logger, "Generating report");
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let cloned_ctx = ctx.clone();
let ingestion_thread = hiro_system_kit::thread_named("Block data ingestion")
.spawn(move || {
let mut blocks_stored = 0;
let mut num_writes = 0;
while let Ok(Some((block_height, compacted_block, _raw_block))) = block_compressed_rx.recv() {
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &cloned_ctx);
blocks_stored += 1;
num_writes += 1;
// In the context of ordinals, we're constrained to process blocks sequentially
// Blocks are processed by a threadpool and could be coming out of order.
// Inbox block for later if the current block is not the one we should be
// processing.
// Should we start look for inscriptions data in blocks?
cloned_ctx.try_log(|logger| slog::info!(logger, "Storing compacted block #{block_height}",));
if blocks_stored == number_of_blocks_to_process {
cloned_ctx.try_log(|logger| {
slog::info!(
logger,
"Local block storage successfully seeded with #{blocks_stored} blocks"
)
});
let file = std::fs::File::create("hord-perf.svg").unwrap();
report.flamegraph(file).unwrap();
// match guard.report().build() {
// Ok(report) => {
// ctx.try_log(|logger| {
// slog::info!(logger, "Generating report");
// });
// let file = std::fs::File::create("hord-perf.svg").unwrap();
// report.flamegraph(file).unwrap();
// }
// Err(e) => {
// ctx.try_log(|logger| {
// slog::error!(logger, "Reporting failed: {}", e.to_string());
// });
// }
// }
}
Err(e) => {
ctx.try_log(|logger| {
slog::error!(logger, "Reporting failed: {}", e.to_string());
if num_writes % 128 == 0 {
cloned_ctx.try_log(|logger| {
slog::info!(logger, "Flushing DB to disk ({num_writes} inserts)");
});
if let Err(e) = blocks_db_rw.flush() {
cloned_ctx.try_log(|logger| {
slog::error!(logger, "{}", e.to_string());
});
}
num_writes = 0;
}
}
return Ok(());
}
if num_writes % 128 == 0 {
ctx.try_log(|logger| {
slog::info!(logger, "Flushing DB to disk ({num_writes} inserts)");
});
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
cloned_ctx.try_log(|logger| {
slog::error!(logger, "{}", e.to_string());
});
}
num_writes = 0;
()
}).expect("unable to spawn thread");
while let Some(res) = set.join_next().await {
let block = res.unwrap().unwrap();
let _ = block_data_tx
.send(Some(block));
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = ctx.clone();
let http_client = moved_http_client.clone();
set.spawn(try_fetch_block_bytes_with_retry(
http_client,
block_height,
config,
ctx,
));
}
}
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
slog::error!(logger, "{}", e.to_string());
});
}
retrieve_block_hash_pool.join();
Ok(())
}