feat: improve queue management

This commit is contained in:
Ludo Galabru
2024-01-12 10:24:04 -05:00
parent d6679588fa
commit d57ac758e1
4 changed files with 34 additions and 14 deletions

View File

@@ -5,8 +5,8 @@ use ordhook::chainhook_sdk::types::{
};
use ordhook::config::{
Config, LogConfig, PredicatesApi, PredicatesApiConfig, ResourcesConfig, SnapshotConfig,
StorageConfig, DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, DEFAULT_CONTROL_PORT,
DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
StorageConfig, DEFAULT_BITCOIND_RPC_THREADS, DEFAULT_BITCOIND_RPC_TIMEOUT,
DEFAULT_CONTROL_PORT, DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
};
use std::fs::File;
use std::io::{BufReader, Read};
@@ -85,7 +85,11 @@ impl ConfigFile {
bitcoind_rpc_threads: config_file
.resources
.bitcoind_rpc_threads
.unwrap_or(DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE),
.unwrap_or(DEFAULT_BITCOIND_RPC_THREADS),
bitcoind_rpc_timeout: config_file
.resources
.bitcoind_rpc_timeout
.unwrap_or(DEFAULT_BITCOIND_RPC_TIMEOUT),
expected_observers_count: config_file
.resources
.expected_observers_count
@@ -170,6 +174,7 @@ pub struct ResourcesConfigFile {
pub cpu_core_available: Option<usize>,
pub memory_available: Option<usize>,
pub bitcoind_rpc_threads: Option<usize>,
pub bitcoind_rpc_timeout: Option<u32>,
pub expected_observers_count: Option<usize>,
}

View File

@@ -30,7 +30,8 @@ bitcoind_zmq_url = "tcp://0.0.0.0:18543"
ulimit = 2048
cpu_core_available = 16
memory_available = 32
bitcoind_rpc_threads = 8
bitcoind_rpc_threads = 4
bitcoind_rpc_timeout = 15
expected_observers_count = 1
# Disable the following section if the state

View File

@@ -13,7 +13,8 @@ pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const DEFAULT_ULIMIT: usize = 2048;
pub const DEFAULT_MEMORY_AVAILABLE: usize = 8;
pub const DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE: usize = 4;
pub const DEFAULT_BITCOIND_RPC_THREADS: usize = 4;
pub const DEFAULT_BITCOIND_RPC_TIMEOUT: u32 = 15;
#[derive(Clone, Debug)]
pub struct Config {
@@ -70,6 +71,7 @@ pub struct ResourcesConfig {
pub cpu_core_available: usize,
pub memory_available: usize,
pub bitcoind_rpc_threads: usize,
pub bitcoind_rpc_timeout: u32,
pub expected_observers_count: usize,
}
@@ -167,7 +169,8 @@ impl Config {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
expected_observers_count: 1,
},
network: IndexerConfig {
@@ -198,7 +201,8 @@ impl Config {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
expected_observers_count: 1,
},
network: IndexerConfig {
@@ -229,7 +233,8 @@ impl Config {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
expected_observers_count: 1,
},
network: IndexerConfig {

View File

@@ -86,9 +86,9 @@ pub async fn download_and_pipeline_blocks(
// For each worker in that pool, we want to bound the size of the queue to avoid OOM
// Blocks size can range from 1 to 4Mb (when packed with witness data).
// Start blocking networking when each worker has a backlog of 8 blocks seems reasonable.
let worker_queue_size = 8;
let worker_queue_size = 2;
for _ in 0..thread_pool_network_response_processing_capacity {
for _ in 0..ordhook_config.resources.bitcoind_rpc_threads {
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = moved_ctx.clone();
@@ -262,9 +262,20 @@ pub async fn download_and_pipeline_blocks(
let mut round_robin_worker_thread_index = 0;
while let Some(res) = set.join_next().await {
let block = res.unwrap().unwrap();
let block = res
.expect("unable to retrieve block")
.expect("unable to deserialize block");
loop {
let res = tx_thread_pool[round_robin_worker_thread_index].send(Some(block.clone()));
round_robin_worker_thread_index = (round_robin_worker_thread_index + 1)
% thread_pool_network_response_processing_capacity;
if res.is_ok() {
break;
}
sleep(Duration::from_millis(500));
}
let _ = tx_thread_pool[round_robin_worker_thread_index].send(Some(block));
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = ctx.clone();
@@ -276,8 +287,6 @@ pub async fn download_and_pipeline_blocks(
ctx,
));
}
round_robin_worker_thread_index = (round_robin_worker_thread_index + 1)
% thread_pool_network_response_processing_capacity;
}
ctx.try_log(|logger| {