feat: use thread pools for scans

This commit is contained in:
Ludo Galabru
2023-04-12 21:52:24 -04:00
parent daf55476c9
commit 45b9abd3e0
3 changed files with 77 additions and 43 deletions

1
Cargo.lock generated
View File

@@ -518,6 +518,7 @@ dependencies = [
"serde_derive",
"serde_json",
"tar",
"threadpool",
"tokio",
"toml",
"uuid 1.3.0",

View File

@@ -37,6 +37,7 @@ ansi_term = "0.12.1"
atty = "0.2.14"
crossbeam-channel = "0.5.6"
uuid = { version = "1.3.0", features = ["v4", "fast-rng"] }
threadpool = "1.8.1"
[dev-dependencies]
criterion = "0.3"

View File

@@ -9,11 +9,14 @@ use chainhook_event_observer::observer::{start_event_observer, ApiKey, ObserverE
use chainhook_event_observer::utils::Context;
use chainhook_types::{BitcoinBlockSignaling, StacksBlockData, StacksChainEvent};
use redis::{Commands, Connection};
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 12;
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 12;
pub struct Service {
config: Config,
@@ -110,14 +113,6 @@ impl Service {
);
}
// let ordinal_index = match initialize_ordinal_index(&event_observer_config, None, &self.ctx)
// {
// Ok(index) => index,
// Err(e) => {
// panic!()
// }
// };
let context_cloned = self.ctx.clone();
let event_observer_config_moved = event_observer_config.clone();
let _ = std::thread::spawn(move || {
@@ -131,6 +126,76 @@ impl Service {
let _ = hiro_system_kit::nestable_block_on(future);
});
// Stacks scan operation threadpool
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
let stacks_scan_pool = ThreadPool::new(STACKS_SCAN_THREAD_POOL_SIZE);
let ctx = self.ctx.clone();
let config = self.config.clone();
let _ = hiro_system_kit::thread_named("Stacks scan runloop")
.spawn(move || {
while let Ok(predicate_spec) = stacks_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let mut moved_config = config.clone();
stacks_scan_pool.execute(move || {
let op = scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec,
&mut moved_config,
&moved_ctx,
);
let end_block = match hiro_system_kit::nestable_block_on(op) {
Ok(end_block) => end_block,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Stacks chainstate: {e}",
);
return;
}
};
info!(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", end_block.index
);
});
}
let res = stacks_scan_pool.join();
res
})
.expect("unable to spawn thread");
// Bitcoin scan operation threadpool
let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded();
let bitcoin_scan_pool = ThreadPool::new(BITCOIN_SCAN_THREAD_POOL_SIZE);
let ctx = self.ctx.clone();
let config = self.config.clone();
let _ = hiro_system_kit::thread_named("Bitcoin scan runloop")
.spawn(move || {
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
bitcoin_scan_pool.execute(move || {
let op = scan_bitcoin_chainstate_via_http_using_predicate(
predicate_spec,
&moved_config,
&moved_ctx,
);
match hiro_system_kit::nestable_block_on(op) {
Ok(_) => {}
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
}
};
});
}
let res = bitcoin_scan_pool.join();
res
})
.expect("unable to spawn thread");
loop {
let event = match observer_event_rx.recv() {
Ok(cmd) => cmd,
@@ -178,43 +243,10 @@ impl Service {
}
match chainhook {
ChainhookSpecification::Stacks(predicate_spec) => {
let end_block = match scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec,
&mut self.config,
&self.ctx,
)
.await
{
Ok(end_block) => end_block,
Err(e) => {
error!(
self.ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
continue;
}
};
info!(
self.ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", end_block.index
);
let _ = stacks_scan_op_tx.send(predicate_spec);
}
ChainhookSpecification::Bitcoin(predicate_spec) => {
match scan_bitcoin_chainstate_via_http_using_predicate(
predicate_spec,
&self.config,
&self.ctx,
)
.await
{
Ok(_) => {}
Err(e) => {
error!(
self.ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
}
};
let _ = bitcoin_scan_op_tx.send(predicate_spec);
}
}
}