mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-13 16:19:01 +08:00
feat: use thread pools for scans
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -518,6 +518,7 @@ dependencies = [
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"tar",
|
||||
"threadpool",
|
||||
"tokio",
|
||||
"toml",
|
||||
"uuid 1.3.0",
|
||||
|
||||
@@ -37,6 +37,7 @@ ansi_term = "0.12.1"
|
||||
atty = "0.2.14"
|
||||
crossbeam-channel = "0.5.6"
|
||||
uuid = { version = "1.3.0", features = ["v4", "fast-rng"] }
|
||||
threadpool = "1.8.1"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.3"
|
||||
|
||||
@@ -9,11 +9,14 @@ use chainhook_event_observer::observer::{start_event_observer, ApiKey, ObserverE
|
||||
use chainhook_event_observer::utils::Context;
|
||||
use chainhook_types::{BitcoinBlockSignaling, StacksBlockData, StacksChainEvent};
|
||||
use redis::{Commands, Connection};
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
|
||||
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
|
||||
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 12;
|
||||
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 12;
|
||||
|
||||
pub struct Service {
|
||||
config: Config,
|
||||
@@ -110,14 +113,6 @@ impl Service {
|
||||
);
|
||||
}
|
||||
|
||||
// let ordinal_index = match initialize_ordinal_index(&event_observer_config, None, &self.ctx)
|
||||
// {
|
||||
// Ok(index) => index,
|
||||
// Err(e) => {
|
||||
// panic!()
|
||||
// }
|
||||
// };
|
||||
|
||||
let context_cloned = self.ctx.clone();
|
||||
let event_observer_config_moved = event_observer_config.clone();
|
||||
let _ = std::thread::spawn(move || {
|
||||
@@ -131,6 +126,76 @@ impl Service {
|
||||
let _ = hiro_system_kit::nestable_block_on(future);
|
||||
});
|
||||
|
||||
// Stacks scan operation threadpool
|
||||
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
|
||||
let stacks_scan_pool = ThreadPool::new(STACKS_SCAN_THREAD_POOL_SIZE);
|
||||
let ctx = self.ctx.clone();
|
||||
let config = self.config.clone();
|
||||
let _ = hiro_system_kit::thread_named("Stacks scan runloop")
|
||||
.spawn(move || {
|
||||
while let Ok(predicate_spec) = stacks_scan_op_rx.recv() {
|
||||
let moved_ctx = ctx.clone();
|
||||
let mut moved_config = config.clone();
|
||||
stacks_scan_pool.execute(move || {
|
||||
let op = scan_stacks_chainstate_via_csv_using_predicate(
|
||||
predicate_spec,
|
||||
&mut moved_config,
|
||||
&moved_ctx,
|
||||
);
|
||||
let end_block = match hiro_system_kit::nestable_block_on(op) {
|
||||
Ok(end_block) => end_block,
|
||||
Err(e) => {
|
||||
error!(
|
||||
moved_ctx.expect_logger(),
|
||||
"Unable to evaluate predicate on Stacks chainstate: {e}",
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
info!(
|
||||
moved_ctx.expect_logger(),
|
||||
"Stacks chainstate scan completed up to block: {}", end_block.index
|
||||
);
|
||||
});
|
||||
}
|
||||
let res = stacks_scan_pool.join();
|
||||
res
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
// Bitcoin scan operation threadpool
|
||||
let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded();
|
||||
let bitcoin_scan_pool = ThreadPool::new(BITCOIN_SCAN_THREAD_POOL_SIZE);
|
||||
let ctx = self.ctx.clone();
|
||||
let config = self.config.clone();
|
||||
let _ = hiro_system_kit::thread_named("Bitcoin scan runloop")
|
||||
.spawn(move || {
|
||||
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
|
||||
let moved_ctx = ctx.clone();
|
||||
let moved_config = config.clone();
|
||||
bitcoin_scan_pool.execute(move || {
|
||||
let op = scan_bitcoin_chainstate_via_http_using_predicate(
|
||||
predicate_spec,
|
||||
&moved_config,
|
||||
&moved_ctx,
|
||||
);
|
||||
|
||||
match hiro_system_kit::nestable_block_on(op) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!(
|
||||
moved_ctx.expect_logger(),
|
||||
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
|
||||
);
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
let res = bitcoin_scan_pool.join();
|
||||
res
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
loop {
|
||||
let event = match observer_event_rx.recv() {
|
||||
Ok(cmd) => cmd,
|
||||
@@ -178,43 +243,10 @@ impl Service {
|
||||
}
|
||||
match chainhook {
|
||||
ChainhookSpecification::Stacks(predicate_spec) => {
|
||||
let end_block = match scan_stacks_chainstate_via_csv_using_predicate(
|
||||
predicate_spec,
|
||||
&mut self.config,
|
||||
&self.ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(end_block) => end_block,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Stacks chainstate scan completed up to block: {}", end_block.index
|
||||
);
|
||||
let _ = stacks_scan_op_tx.send(predicate_spec);
|
||||
}
|
||||
ChainhookSpecification::Bitcoin(predicate_spec) => {
|
||||
match scan_bitcoin_chainstate_via_http_using_predicate(
|
||||
predicate_spec,
|
||||
&self.config,
|
||||
&self.ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
|
||||
);
|
||||
}
|
||||
};
|
||||
let _ = bitcoin_scan_op_tx.send(predicate_spec);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user