mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 22:43:06 +08:00
feat: improve queue management
This commit is contained in:
@@ -5,8 +5,8 @@ use ordhook::chainhook_sdk::types::{
|
||||
};
|
||||
use ordhook::config::{
|
||||
Config, LogConfig, PredicatesApi, PredicatesApiConfig, ResourcesConfig, SnapshotConfig,
|
||||
StorageConfig, DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, DEFAULT_CONTROL_PORT,
|
||||
DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
|
||||
StorageConfig, DEFAULT_BITCOIND_RPC_THREADS, DEFAULT_BITCOIND_RPC_TIMEOUT,
|
||||
DEFAULT_CONTROL_PORT, DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
|
||||
};
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, Read};
|
||||
@@ -85,7 +85,11 @@ impl ConfigFile {
|
||||
bitcoind_rpc_threads: config_file
|
||||
.resources
|
||||
.bitcoind_rpc_threads
|
||||
.unwrap_or(DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE),
|
||||
.unwrap_or(DEFAULT_BITCOIND_RPC_THREADS),
|
||||
bitcoind_rpc_timeout: config_file
|
||||
.resources
|
||||
.bitcoind_rpc_timeout
|
||||
.unwrap_or(DEFAULT_BITCOIND_RPC_TIMEOUT),
|
||||
expected_observers_count: config_file
|
||||
.resources
|
||||
.expected_observers_count
|
||||
@@ -170,6 +174,7 @@ pub struct ResourcesConfigFile {
|
||||
pub cpu_core_available: Option<usize>,
|
||||
pub memory_available: Option<usize>,
|
||||
pub bitcoind_rpc_threads: Option<usize>,
|
||||
pub bitcoind_rpc_timeout: Option<u32>,
|
||||
pub expected_observers_count: Option<usize>,
|
||||
}
|
||||
|
||||
|
||||
@@ -30,7 +30,8 @@ bitcoind_zmq_url = "tcp://0.0.0.0:18543"
|
||||
ulimit = 2048
|
||||
cpu_core_available = 16
|
||||
memory_available = 32
|
||||
bitcoind_rpc_threads = 8
|
||||
bitcoind_rpc_threads = 4
|
||||
bitcoind_rpc_timeout = 15
|
||||
expected_observers_count = 1
|
||||
|
||||
# Disable the following section if the state
|
||||
|
||||
@@ -13,7 +13,8 @@ pub const DEFAULT_INGESTION_PORT: u16 = 20455;
|
||||
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
|
||||
pub const DEFAULT_ULIMIT: usize = 2048;
|
||||
pub const DEFAULT_MEMORY_AVAILABLE: usize = 8;
|
||||
pub const DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE: usize = 4;
|
||||
pub const DEFAULT_BITCOIND_RPC_THREADS: usize = 4;
|
||||
pub const DEFAULT_BITCOIND_RPC_TIMEOUT: u32 = 15;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Config {
|
||||
@@ -70,6 +71,7 @@ pub struct ResourcesConfig {
|
||||
pub cpu_core_available: usize,
|
||||
pub memory_available: usize,
|
||||
pub bitcoind_rpc_threads: usize,
|
||||
pub bitcoind_rpc_timeout: u32,
|
||||
pub expected_observers_count: usize,
|
||||
}
|
||||
|
||||
@@ -167,7 +169,8 @@ impl Config {
|
||||
cpu_core_available: num_cpus::get(),
|
||||
memory_available: DEFAULT_MEMORY_AVAILABLE,
|
||||
ulimit: DEFAULT_ULIMIT,
|
||||
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
|
||||
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
|
||||
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
|
||||
expected_observers_count: 1,
|
||||
},
|
||||
network: IndexerConfig {
|
||||
@@ -198,7 +201,8 @@ impl Config {
|
||||
cpu_core_available: num_cpus::get(),
|
||||
memory_available: DEFAULT_MEMORY_AVAILABLE,
|
||||
ulimit: DEFAULT_ULIMIT,
|
||||
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
|
||||
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
|
||||
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
|
||||
expected_observers_count: 1,
|
||||
},
|
||||
network: IndexerConfig {
|
||||
@@ -229,7 +233,8 @@ impl Config {
|
||||
cpu_core_available: num_cpus::get(),
|
||||
memory_available: DEFAULT_MEMORY_AVAILABLE,
|
||||
ulimit: DEFAULT_ULIMIT,
|
||||
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
|
||||
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
|
||||
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
|
||||
expected_observers_count: 1,
|
||||
},
|
||||
network: IndexerConfig {
|
||||
|
||||
@@ -86,9 +86,9 @@ pub async fn download_and_pipeline_blocks(
|
||||
// For each worker in that pool, we want to bound the size of the queue to avoid OOM
|
||||
// Blocks size can range from 1 to 4Mb (when packed with witness data).
|
||||
// Start blocking networking when each worker has a backlog of 8 blocks seems reasonable.
|
||||
let worker_queue_size = 8;
|
||||
let worker_queue_size = 2;
|
||||
|
||||
for _ in 0..thread_pool_network_response_processing_capacity {
|
||||
for _ in 0..ordhook_config.resources.bitcoind_rpc_threads {
|
||||
if let Some(block_height) = block_heights.pop_front() {
|
||||
let config = moved_config.clone();
|
||||
let ctx = moved_ctx.clone();
|
||||
@@ -262,9 +262,20 @@ pub async fn download_and_pipeline_blocks(
|
||||
|
||||
let mut round_robin_worker_thread_index = 0;
|
||||
while let Some(res) = set.join_next().await {
|
||||
let block = res.unwrap().unwrap();
|
||||
let block = res
|
||||
.expect("unable to retrieve block")
|
||||
.expect("unable to deserialize block");
|
||||
|
||||
loop {
|
||||
let res = tx_thread_pool[round_robin_worker_thread_index].send(Some(block.clone()));
|
||||
round_robin_worker_thread_index = (round_robin_worker_thread_index + 1)
|
||||
% thread_pool_network_response_processing_capacity;
|
||||
if res.is_ok() {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_millis(500));
|
||||
}
|
||||
|
||||
let _ = tx_thread_pool[round_robin_worker_thread_index].send(Some(block));
|
||||
if let Some(block_height) = block_heights.pop_front() {
|
||||
let config = moved_config.clone();
|
||||
let ctx = ctx.clone();
|
||||
@@ -276,8 +287,6 @@ pub async fn download_and_pipeline_blocks(
|
||||
ctx,
|
||||
));
|
||||
}
|
||||
round_robin_worker_thread_index = (round_robin_worker_thread_index + 1)
|
||||
% thread_pool_network_response_processing_capacity;
|
||||
}
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
|
||||
Reference in New Issue
Block a user