feat: revisit config files

This commit is contained in:
Ludovic Galabru
2024-01-11 14:45:43 -05:00
parent 4e7bdc670f
commit 1c80b67658
13 changed files with 243 additions and 183 deletions

View File

@@ -575,15 +575,14 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
while let Some(block_height) = block_range.pop_front() {
let inscriptions =
find_all_inscriptions_in_block(&block_height, &inscriptions_db_conn, ctx);
let mut locations =
let locations =
find_all_transfers_in_block(&block_height, &inscriptions_db_conn, ctx);
let mut total_transfers_in_block = 0;
for (_, inscription) in inscriptions.iter() {
println!("Inscription {} revealed at block #{} (inscription_number {}, ordinal_number {})", inscription.get_inscription_id(), block_height, inscription.inscription_number.jubilee, inscription.ordinal_number);
if let Some(transfers) = locations.remove(&inscription.get_inscription_id())
{
if let Some(transfers) = locations.get(&inscription.ordinal_number) {
for t in transfers.iter().skip(1) {
total_transfers_in_block += 1;
println!(
@@ -678,6 +677,8 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
&transaction_identifier,
0,
&Arc::new(cache),
config.resources.ulimit,
config.resources.memory_available,
true,
ctx,
)?;
@@ -707,7 +708,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let last_known_block =
find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?;
if last_known_block.is_none() {
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
ctx,
);
}
let ordhook_config = config.get_ordhook_config();
@@ -766,7 +773,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(OrdhookDbCommand::New(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
initialize_ordhook_db(&config.expected_cache_path(), ctx);
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
ctx,
);
}
Command::Db(OrdhookDbCommand::Sync(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
@@ -779,10 +792,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
let mut ordhook_config = config.get_ordhook_config();
if let Some(network_threads) = cmd.network_threads {
ordhook_config.network_thread_max = network_threads;
}
if let Some(network_threads) = cmd.network_threads {
ordhook_config.network_thread_max = network_threads;
ordhook_config.resources.bitcoind_rpc_threads = network_threads;
}
let blocks = cmd.get_blocks();
let block_ingestion_processor =
@@ -800,6 +810,8 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
false,
&config.get_ordhook_config().db_path,
config.resources.ulimit,
config.resources.memory_available,
ctx,
);
for i in cmd.get_blocks().into_iter() {
@@ -819,7 +831,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
let mut ordhook_config = config.get_ordhook_config();
if let Some(network_threads) = cmd.network_threads {
ordhook_config.network_thread_max = network_threads;
ordhook_config.resources.bitcoind_rpc_threads = network_threads;
}
let block_post_processor = match cmd.repair_observers {
Some(true) => {
@@ -870,8 +882,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(OrdhookDbCommand::Check(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
{
let blocks_db =
open_readonly_ordhook_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
let blocks_db = open_readonly_ordhook_db_conn_rocks_db(
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
ctx,
)?;
let tip = find_last_block_inserted(&blocks_db);
println!("Tip: {}", tip);
let missing_blocks = find_missing_blocks(&blocks_db, 1, tip, ctx);
@@ -880,8 +896,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
}
Command::Db(OrdhookDbCommand::Drop(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
let blocks_db =
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
ctx,
);
let inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), ctx)?;

View File

@@ -1,29 +1,24 @@
use ordhook::chainhook_sdk::indexer::IndexerConfig;
use ordhook::chainhook_sdk::observer::DEFAULT_INGESTION_PORT;
use ordhook::chainhook_sdk::types::{
BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig,
};
use ordhook::config::{
BootstrapConfig, Config, LimitsConfig, LogConfig, PredicatesApi, PredicatesApiConfig,
StorageConfig,
Config, LogConfig, PredicatesApi, PredicatesApiConfig, ResourcesConfig, SnapshotConfig,
StorageConfig, DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, DEFAULT_CONTROL_PORT,
DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
};
use std::fs::File;
use std::io::{BufReader, Read};
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 10;
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 10;
pub const STACKS_MAX_PREDICATE_REGISTRATION: usize = 50;
pub const BITCOIN_MAX_PREDICATE_REGISTRATION: usize = 50;
#[derive(Deserialize, Debug, Clone)]
pub struct ConfigFile {
pub storage: StorageConfigFile,
pub http_api: Option<PredicatesApiConfigFile>,
pub limits: LimitsConfigFile,
pub resources: ResourcesConfigFile,
pub network: NetworkConfigFile,
pub logs: Option<LogConfigFile>,
pub bootstrap: Option<BootstrapConfigFile>,
pub snapthot: Option<SnapshotConfigFile>,
}
impl ConfigFile {
@@ -54,12 +49,12 @@ impl ConfigFile {
_ => return Err("network.mode not supported".to_string()),
};
let bootstrap = match config_file.bootstrap {
let snapshot = match config_file.snapthot {
Some(bootstrap) => match bootstrap.download_url {
Some(ref url) => BootstrapConfig::Download(url.to_string()),
None => BootstrapConfig::Build,
Some(ref url) => SnapshotConfig::Download(url.to_string()),
None => SnapshotConfig::Build,
},
None => BootstrapConfig::Build,
None => SnapshotConfig::Build,
};
let config = Config {
@@ -76,36 +71,25 @@ impl ConfigFile {
}),
},
},
bootstrap,
limits: LimitsConfig {
max_number_of_stacks_predicates: config_file
.limits
.max_number_of_stacks_predicates
.unwrap_or(STACKS_MAX_PREDICATE_REGISTRATION),
max_number_of_bitcoin_predicates: config_file
.limits
.max_number_of_bitcoin_predicates
.unwrap_or(BITCOIN_MAX_PREDICATE_REGISTRATION),
max_number_of_concurrent_stacks_scans: config_file
.limits
.max_number_of_concurrent_stacks_scans
.unwrap_or(STACKS_SCAN_THREAD_POOL_SIZE),
max_number_of_concurrent_bitcoin_scans: config_file
.limits
.max_number_of_concurrent_bitcoin_scans
.unwrap_or(BITCOIN_SCAN_THREAD_POOL_SIZE),
max_number_of_processing_threads: config_file
.limits
.max_number_of_processing_threads
.unwrap_or(1.max(num_cpus::get().saturating_sub(1))),
bitcoin_concurrent_http_requests_max: config_file
.limits
.bitcoin_concurrent_http_requests_max
.unwrap_or(1.max(num_cpus::get().saturating_sub(1))),
max_caching_memory_size_mb: config_file
.limits
.max_caching_memory_size_mb
.unwrap_or(2048),
snapshot,
resources: ResourcesConfig {
ulimit: config_file.resources.ulimit.unwrap_or(DEFAULT_ULIMIT),
cpu_core_available: config_file
.resources
.cpu_core_available
.unwrap_or(num_cpus::get()),
memory_available: config_file
.resources
.memory_available
.unwrap_or(DEFAULT_MEMORY_AVAILABLE),
bitcoind_rpc_threads: config_file
.resources
.bitcoind_rpc_threads
.unwrap_or(DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE),
expected_observers_count: config_file
.resources
.expected_observers_count
.unwrap_or(1),
},
network: IndexerConfig {
bitcoind_rpc_url: config_file.network.bitcoind_rpc_url.to_string(),
@@ -176,19 +160,17 @@ pub struct PredicatesApiConfigFile {
}
#[derive(Deserialize, Debug, Clone)]
pub struct BootstrapConfigFile {
pub struct SnapshotConfigFile {
pub download_url: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct LimitsConfigFile {
pub max_number_of_bitcoin_predicates: Option<usize>,
pub max_number_of_concurrent_bitcoin_scans: Option<usize>,
pub max_number_of_stacks_predicates: Option<usize>,
pub max_number_of_concurrent_stacks_scans: Option<usize>,
pub max_number_of_processing_threads: Option<usize>,
pub max_caching_memory_size_mb: Option<usize>,
pub bitcoin_concurrent_http_requests_max: Option<usize>,
pub struct ResourcesConfigFile {
pub ulimit: Option<usize>,
pub cpu_core_available: Option<usize>,
pub memory_available: Option<usize>,
pub bitcoind_rpc_threads: Option<usize>,
pub expected_observers_count: Option<usize>,
}
#[derive(Deserialize, Debug, Clone)]

View File

@@ -26,16 +26,16 @@ bitcoind_zmq_url = "tcp://0.0.0.0:18543"
# but stacks can also be used:
# stacks_node_rpc_url = "http://0.0.0.0:20443"
[limits]
max_number_of_bitcoin_predicates = 100
max_number_of_concurrent_bitcoin_scans = 100
max_number_of_processing_threads = 16
bitcoin_concurrent_http_requests_max = 16
max_caching_memory_size_mb = 32000
[resources]
ulimit = 2048
cpu_core_available = 16
memory_available = 32
bitcoind_rpc_threads = 8
expected_observers_count = 1
# Disable the following section if the state
# must be built locally
[bootstrap]
[snapshot]
download_url = "https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest"
[logs]

View File

@@ -11,18 +11,17 @@ const DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE: &str =
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 10;
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 10;
pub const STACKS_MAX_PREDICATE_REGISTRATION: usize = 50;
pub const BITCOIN_MAX_PREDICATE_REGISTRATION: usize = 50;
pub const DEFAULT_ULIMIT: usize = 2048;
pub const DEFAULT_MEMORY_AVAILABLE: usize = 8;
pub const DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE: usize = 4;
#[derive(Clone, Debug)]
pub struct Config {
pub storage: StorageConfig,
pub http_api: PredicatesApi,
pub limits: LimitsConfig,
pub resources: ResourcesConfig,
pub network: IndexerConfig,
pub bootstrap: BootstrapConfig,
pub snapshot: SnapshotConfig,
pub logs: LogConfig,
}
@@ -50,7 +49,7 @@ pub struct PredicatesApiConfig {
}
#[derive(Clone, Debug)]
pub enum BootstrapConfig {
pub enum SnapshotConfig {
Build,
Download(String),
}
@@ -65,15 +64,22 @@ pub struct UrlConfig {
pub file_url: String,
}
#[derive(Clone, Debug)]
pub struct LimitsConfig {
pub max_number_of_bitcoin_predicates: usize,
pub max_number_of_concurrent_bitcoin_scans: usize,
pub max_number_of_stacks_predicates: usize,
pub max_number_of_concurrent_stacks_scans: usize,
pub max_number_of_processing_threads: usize,
pub bitcoin_concurrent_http_requests_max: usize,
pub max_caching_memory_size_mb: usize,
#[derive(Deserialize, Debug, Clone)]
pub struct ResourcesConfig {
pub ulimit: usize,
pub cpu_core_available: usize,
pub memory_available: usize,
pub bitcoind_rpc_threads: usize,
pub expected_observers_count: usize,
}
impl ResourcesConfig {
pub fn get_optimal_thread_pool_capacity(&self) -> usize {
// Generally speaking when dealing a pool, we need one thread for
// feeding the thread pool and eventually another thread for
// handling the "reduce" step.
self.cpu_core_available.saturating_sub(2).max(1)
}
}
impl Config {
@@ -86,10 +92,7 @@ impl Config {
pub fn get_ordhook_config(&self) -> OrdhookConfig {
OrdhookConfig {
network_thread_max: self.limits.bitcoin_concurrent_http_requests_max,
ingestion_thread_max: self.limits.max_number_of_processing_threads,
ingestion_thread_queue_size: 4,
cache_size: self.limits.max_caching_memory_size_mb,
resources: self.resources.clone(),
db_path: self.expected_cache_path(),
first_inscription_height: match self.network.bitcoin_network {
BitcoinNetwork::Mainnet => 767430,
@@ -119,9 +122,9 @@ impl Config {
}
pub fn should_bootstrap_through_download(&self) -> bool {
match &self.bootstrap {
BootstrapConfig::Build => false,
BootstrapConfig::Download(_) => true,
match &self.snapshot {
SnapshotConfig::Build => false,
SnapshotConfig::Download(_) => true,
}
}
@@ -139,9 +142,9 @@ impl Config {
}
fn expected_remote_ordinals_sqlite_base_url(&self) -> &str {
match &self.bootstrap {
BootstrapConfig::Build => unreachable!(),
BootstrapConfig::Download(url) => &url,
match &self.snapshot {
SnapshotConfig::Build => unreachable!(),
SnapshotConfig::Download(url) => &url,
}
}
@@ -159,15 +162,13 @@ impl Config {
working_dir: default_cache_path(),
},
http_api: PredicatesApi::Off,
bootstrap: BootstrapConfig::Build,
limits: LimitsConfig {
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE,
max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE,
max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)),
bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)),
max_caching_memory_size_mb: 2048,
snapshot: SnapshotConfig::Build,
resources: ResourcesConfig {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
expected_observers_count: 1,
},
network: IndexerConfig {
bitcoind_rpc_url: "http://0.0.0.0:18443".into(),
@@ -192,15 +193,13 @@ impl Config {
working_dir: default_cache_path(),
},
http_api: PredicatesApi::Off,
bootstrap: BootstrapConfig::Build,
limits: LimitsConfig {
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE,
max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE,
max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)),
bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)),
max_caching_memory_size_mb: 2048,
snapshot: SnapshotConfig::Build,
resources: ResourcesConfig {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
expected_observers_count: 1,
},
network: IndexerConfig {
bitcoind_rpc_url: "http://0.0.0.0:18332".into(),
@@ -225,17 +224,13 @@ impl Config {
working_dir: default_cache_path(),
},
http_api: PredicatesApi::Off,
bootstrap: BootstrapConfig::Download(
DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string(),
),
limits: LimitsConfig {
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE,
max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE,
max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)),
bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)),
max_caching_memory_size_mb: 2048,
snapshot: SnapshotConfig::Download(DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string()),
resources: ResourcesConfig {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
expected_observers_count: 1,
},
network: IndexerConfig {
bitcoind_rpc_url: "http://0.0.0.0:8332".into(),

View File

@@ -13,7 +13,7 @@ use chainhook_sdk::{
};
use crate::{
config::{Config, LogConfig},
config::{Config, LogConfig, ResourcesConfig},
db::{find_pinned_block_bytes_at_block_height, open_ordhook_db_conn_rocks_db_loop},
};
@@ -26,10 +26,7 @@ use crate::db::TransactionBytesCursor;
#[derive(Clone, Debug)]
pub struct OrdhookConfig {
pub network_thread_max: usize,
pub ingestion_thread_max: usize,
pub ingestion_thread_queue_size: usize,
pub cache_size: usize,
pub resources: ResourcesConfig,
pub db_path: PathBuf,
pub first_inscription_height: u64,
pub logs: LogConfig,
@@ -95,7 +92,13 @@ pub fn compute_next_satpoint_data(
}
pub fn should_sync_rocks_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
);
let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
let last_compressed_block = find_last_block_inserted(&blocks_db) as u64;
let last_indexed_block = match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?
@@ -128,7 +131,13 @@ pub fn should_sync_ordhook_db(
}
};
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
);
let mut start_block = find_last_block_inserted(&blocks_db) as u64;
if start_block == 0 {

View File

@@ -73,11 +73,27 @@ pub async fn download_and_pipeline_blocks(
let end_block = *blocks.last().expect("no blocks to pipeline");
let mut block_heights = VecDeque::from(blocks);
for _ in 0..ordhook_config.ingestion_thread_queue_size {
// All the requests are being processed on the same thread.
// As soon as we are getting the bytes back from wire, the
// processing is moved to a thread pool, to defer the parsing, quite expensive.
// We are initially seeding the networking thread with N requests,
// with N being the number of threads in the pool handling the response.
// We need:
// - 1 thread for the thread handling networking
// - 1 thread for the thread handling disk serialization
let thread_pool_network_response_processing_capacity =
ordhook_config.resources.get_optimal_thread_pool_capacity();
// For each worker in that pool, we want to bound the size of the queue to avoid OOM
// Blocks size can range from 1 to 4Mb (when packed with witness data).
// Start blocking networking when each worker has a backlog of 8 blocks seems reasonable.
let worker_queue_size = 8;
for _ in 0..thread_pool_network_response_processing_capacity {
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = moved_ctx.clone();
let http_client = moved_http_client.clone();
// We interleave the initial requests to avoid DDOSing bitcoind from the get go.
sleep(Duration::from_millis(500));
set.spawn(try_download_block_bytes_with_retry(
http_client,
@@ -95,8 +111,8 @@ pub async fn download_and_pipeline_blocks(
let mut rx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
for _ in 0..ordhook_config.ingestion_thread_max {
let (tx, rx) = bounded::<Option<Vec<u8>>>(ordhook_config.ingestion_thread_queue_size);
for _ in 0..thread_pool_network_response_processing_capacity {
let (tx, rx) = bounded::<Option<Vec<u8>>>(worker_queue_size);
tx_thread_pool.push(tx);
rx_thread_pool.push(rx);
}
@@ -244,11 +260,11 @@ pub async fn download_and_pipeline_blocks(
})
.expect("unable to spawn thread");
let mut thread_index = 0;
let mut round_robin_worker_thread_index = 0;
while let Some(res) = set.join_next().await {
let block = res.unwrap().unwrap();
let _ = tx_thread_pool[thread_index].send(Some(block));
let _ = tx_thread_pool[round_robin_worker_thread_index].send(Some(block));
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = ctx.clone();
@@ -260,7 +276,8 @@ pub async fn download_and_pipeline_blocks(
ctx,
));
}
thread_index = (thread_index + 1) % ordhook_config.ingestion_thread_max;
round_robin_worker_thread_index = (round_robin_worker_thread_index + 1)
% thread_pool_network_response_processing_capacity;
}
ctx.try_log(|logger| {

View File

@@ -25,8 +25,13 @@ pub fn start_block_archiving_processor(
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
.spawn(move || {
let blocks_db_rw =
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
);
let mut processed_blocks = 0;
loop {

View File

@@ -107,6 +107,8 @@ pub fn start_inscription_indexing_processor(
let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
);
store_compacted_blocks(

View File

@@ -91,7 +91,7 @@ pub fn parallelize_inscription_data_computations(
let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty();
let thread_max = ordhook_config.ingestion_thread_max;
let thread_pool_capacity = ordhook_config.resources.get_optimal_thread_pool_capacity();
// Nothing to do? early return
if !has_transactions_to_process {
@@ -104,13 +104,16 @@ pub fn parallelize_inscription_data_computations(
let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
for thread_index in 0..thread_max {
for thread_index in 0..thread_pool_capacity {
let (tx, rx) = channel();
tx_thread_pool.push(tx);
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = inner_ctx.clone();
let moved_ordhook_db_path = ordhook_config.db_path.clone();
let ulimit = ordhook_config.resources.ulimit;
let memory_available = ordhook_config.resources.memory_available;
let local_cache = cache_l2.clone();
let handle = hiro_system_kit::thread_named("Worker")
@@ -124,6 +127,8 @@ pub fn parallelize_inscription_data_computations(
&transaction_id,
input_index,
&local_cache,
ulimit,
memory_available,
false,
&moved_ctx,
);
@@ -135,12 +140,13 @@ pub fn parallelize_inscription_data_computations(
}
// Consume L1 cache: if the traversal was performed in a previous round
// retrieve it and use it.
let mut thread_index = 0;
// retrieve it and inject it to the "reduce" worker (by-passing the "map" thread pool)
let mut round_robin_thread_index = 0;
for key in l1_cache_hits.iter() {
if let Some(entry) = cache_l1.get(key) {
let _ = traversal_tx.send((Ok((entry.clone(), vec![])), true, thread_index));
thread_index = (thread_index + 1) % thread_max;
let _ =
traversal_tx.send((Ok((entry.clone(), vec![])), true, round_robin_thread_index));
round_robin_thread_index = (round_robin_thread_index + 1) % thread_pool_capacity;
}
}
@@ -176,11 +182,11 @@ pub fn parallelize_inscription_data_computations(
));
}
// Feed each workers with 2 workitems each
for thread_index in 0..thread_max {
// Feed each worker from the thread pool with 2 workitems each
for thread_index in 0..thread_pool_capacity {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
for thread_index in 0..thread_max {
for thread_index in 0..thread_pool_capacity {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}

View File

@@ -22,6 +22,8 @@ pub fn compute_satoshi_number(
traversals_cache: &Arc<
DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>,
>,
ulimit: usize,
memory_available: usize,
_back_tracking: bool,
ctx: &Context,
) -> Result<(TraversalResult, Vec<(u32, [u8; 8])>), String> {
@@ -31,7 +33,8 @@ pub fn compute_satoshi_number(
let mut ordinal_block_number = block_identifier.index as u32;
let txid = transaction_identifier.get_8_hash_bytes();
let mut back_track = vec![];
let blocks_db = open_ordhook_db_conn_rocks_db_loop(false, &blocks_db_dir, &ctx);
let blocks_db =
open_ordhook_db_conn_rocks_db_loop(false, &blocks_db_dir, ulimit, memory_available, &ctx);
let (sats_ranges, inscription_offset_cross_outputs) = match traversals_cache
.get(&(block_identifier.index as u32, txid.clone()))

View File

@@ -248,7 +248,7 @@ fn get_default_ordhook_db_file_path_rocks_db(base_dir: &PathBuf) -> PathBuf {
destination_path
}
fn rocks_db_default_options() -> rocksdb::Options {
fn rocks_db_default_options(ulimit: usize, memory_available: usize) -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
// Per rocksdb's documentation:
// If cache_index_and_filter_blocks is false (which is default),
@@ -262,7 +262,7 @@ fn rocks_db_default_options() -> rocksdb::Options {
// opts.set_write_buffer_size(64 * 1024 * 1024);
// opts.set_blob_file_size(1 * 1024 * 1024 * 1024);
// opts.set_target_file_size_base(64 * 1024 * 1024);
opts.set_max_open_files(2048);
opts.set_max_open_files(ulimit as i32);
opts.create_if_missing(true);
// opts.optimize_for_point_lookup(1 * 1024 * 1024 * 1024);
// opts.set_level_zero_stop_writes_trigger(64);
@@ -279,10 +279,12 @@ fn rocks_db_default_options() -> rocksdb::Options {
pub fn open_readonly_ordhook_db_conn_rocks_db(
base_dir: &PathBuf,
ulimit: usize,
memory_available: usize,
_ctx: &Context,
) -> Result<DB, String> {
let path = get_default_ordhook_db_file_path_rocks_db(&base_dir);
let mut opts = rocks_db_default_options();
let mut opts = rocks_db_default_options(ulimit, memory_available);
opts.set_disable_auto_compactions(true);
opts.set_max_background_jobs(0);
let db = DB::open_for_read_only(&opts, path, false)
@@ -293,14 +295,16 @@ pub fn open_readonly_ordhook_db_conn_rocks_db(
pub fn open_ordhook_db_conn_rocks_db_loop(
readwrite: bool,
base_dir: &PathBuf,
ulimit: usize,
memory_available: usize,
ctx: &Context,
) -> DB {
let mut retries = 0;
let blocks_db = loop {
let res = if readwrite {
open_readwrite_ordhook_db_conn_rocks_db(&base_dir, &ctx)
open_readwrite_ordhook_db_conn_rocks_db(&base_dir, ulimit, memory_available, &ctx)
} else {
open_readonly_ordhook_db_conn_rocks_db(&base_dir, &ctx)
open_readonly_ordhook_db_conn_rocks_db(&base_dir, ulimit, memory_available, &ctx)
};
match res {
Ok(db) => break db,
@@ -323,19 +327,24 @@ pub fn open_ordhook_db_conn_rocks_db_loop(
pub fn open_readwrite_ordhook_dbs(
base_dir: &PathBuf,
ulimit: usize,
memory_available: usize,
ctx: &Context,
) -> Result<(DB, Connection), String> {
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &base_dir, &ctx);
let blocks_db =
open_ordhook_db_conn_rocks_db_loop(true, &base_dir, ulimit, memory_available, &ctx);
let inscriptions_db = open_readwrite_ordhook_db_conn(&base_dir, &ctx)?;
Ok((blocks_db, inscriptions_db))
}
fn open_readwrite_ordhook_db_conn_rocks_db(
base_dir: &PathBuf,
ulimit: usize,
memory_available: usize,
_ctx: &Context,
) -> Result<DB, String> {
let path = get_default_ordhook_db_file_path_rocks_db(&base_dir);
let opts = rocks_db_default_options();
let opts = rocks_db_default_options(ulimit, memory_available);
let db = DB::open(&opts, path)
.map_err(|e| format!("unable to read-write hord.rocksdb: {}", e.to_string()))?;
Ok(db)
@@ -396,7 +405,6 @@ pub fn find_pinned_block_bytes_at_block_height<'a>(
// read_options.set_verify_checksums(false);
let mut backoff: f64 = 1.0;
let mut rng = thread_rng();
loop {
match blocks_db.get_pinned(block_height.to_be_bytes()) {
Ok(Some(res)) => return Some(res),

View File

@@ -413,9 +413,7 @@ impl Service {
bitcoin_blocks_mutator: Some((block_mutator_in_tx, block_mutator_out_rx)),
bitcoin_chain_event_notifier: Some(chain_event_notifier_tx),
};
let cache_l2 = Arc::new(new_traversals_lazy_cache(
self.config.limits.max_caching_memory_size_mb,
));
let cache_l2 = Arc::new(new_traversals_lazy_cache(100_000));
let ctx = self.ctx.clone();
let config = self.config.clone();
@@ -455,6 +453,8 @@ impl Service {
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
false,
&self.config.expected_cache_path(),
self.config.resources.ulimit,
self.config.resources.memory_available,
&self.ctx,
);
let tip = find_last_block_inserted(&blocks_db);
@@ -486,6 +486,8 @@ impl Service {
let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop(
false,
&self.config.expected_cache_path(),
self.config.resources.ulimit,
self.config.resources.memory_available,
&self.ctx,
);
info!(self.ctx.expect_logger(), "Running database compaction",);
@@ -496,6 +498,8 @@ impl Service {
let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop(
false,
&self.config.expected_cache_path(),
self.config.resources.ulimit,
self.config.resources.memory_available,
&self.ctx,
);
@@ -616,14 +620,18 @@ impl Service {
}
fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ctx: &Context) {
let (blocks_db_rw, inscriptions_db_conn_rw) =
match open_readwrite_ordhook_dbs(&config.expected_cache_path(), &ctx) {
Ok(dbs) => dbs,
Err(e) => {
ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",));
return;
}
};
let (blocks_db_rw, inscriptions_db_conn_rw) = match open_readwrite_ordhook_dbs(
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
) {
Ok(dbs) => dbs,
Err(e) => {
ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",));
return;
}
};
match command {
HandleBlock::UndoBlock(block) => {
@@ -725,14 +733,18 @@ pub fn chainhook_sidecar_mutate_blocks(
) {
let mut updated_blocks_ids = vec![];
let (blocks_db_rw, mut inscriptions_db_conn_rw) =
match open_readwrite_ordhook_dbs(&config.expected_cache_path(), &ctx) {
Ok(dbs) => dbs,
Err(e) => {
ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",));
return;
}
};
let (blocks_db_rw, mut inscriptions_db_conn_rw) = match open_readwrite_ordhook_dbs(
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
) {
Ok(dbs) => dbs,
Err(e) => {
ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",));
return;
}
};
let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap();

View File

@@ -21,7 +21,7 @@ pub fn start_bitcoin_scan_runloop(
observer_command_tx: Sender<ObserverCommand>,
ctx: &Context,
) {
let bitcoin_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_bitcoin_scans);
let bitcoin_scan_pool = ThreadPool::new(config.resources.expected_observers_count);
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();