fix: serialize handlers in one thread

This commit is contained in:
Ludo Galabru
2023-08-23 10:32:22 +02:00
parent 3443915a73
commit cdfc264cff
3 changed files with 27 additions and 45 deletions

4
Cargo.lock generated
View File

@@ -457,9 +457,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chainhook-sdk"
version = "0.8.3"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704ec19f0ceb15420ee8bef92132820320ba3b8e5952171cef94c2f70c6c2abd"
checksum = "c896431709407800d80ef410062867e518b8a21d7b8e1ae9bfe0d8e70f7d8b6f"
dependencies = [
"base58 0.2.0",
"base64 0.13.1",

View File

@@ -12,8 +12,8 @@ redis = "0.21.5"
serde-redis = "0.12.0"
hex = "0.4.3"
rand = "0.8.5"
chainhook-sdk = { version = "=0.8.3", default-features = false, features = ["zeromq"] }
# chainhook-sdk = { version = "=0.8.3", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
chainhook-sdk = { version = "=0.8.4", default-features = false, features = ["zeromq"] }
# chainhook-sdk = { version = "=0.8.4", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
hiro-system-kit = "0.1.0"
reqwest = { version = "0.11", features = ["stream", "json"] }
tokio = { version = "=1.24", features = ["full"] }

View File

@@ -33,6 +33,7 @@ use chainhook_sdk::observer::{
};
use chainhook_sdk::types::{BitcoinBlockData, BlockIdentifier};
use chainhook_sdk::utils::Context;
use crossbeam_channel::select;
use crossbeam_channel::unbounded;
use dashmap::DashMap;
use fxhash::FxHasher;
@@ -132,9 +133,9 @@ impl Service {
// Sidecar channels setup
let (observer_command_tx, observer_command_rx) = channel();
let (block_mutator_in_tx, block_mutator_in_rx) = channel();
let (block_mutator_out_tx, block_mutator_out_rx) = channel();
let (chain_event_notifier_tx, chain_event_notifier_rx) = channel();
let (block_mutator_in_tx, block_mutator_in_rx) = crossbeam_channel::unbounded();
let (block_mutator_out_tx, block_mutator_out_rx) = crossbeam_channel::unbounded();
let (chain_event_notifier_tx, chain_event_notifier_rx) = crossbeam_channel::unbounded();
let observer_sidecar = ObserverSidecar {
bitcoin_blocks_mutator: Some((block_mutator_in_tx, block_mutator_out_rx)),
bitcoin_chain_event_notifier: Some(chain_event_notifier_tx),
@@ -198,46 +199,27 @@ impl Service {
let config = self.config.clone();
let cache_l2 = traversals_cache.clone();
let _ = hiro_system_kit::thread_named("Sidecar block mutator").spawn(move || loop {
let (mut blocks_to_mutate, blocks_ids_to_rollback) = match block_mutator_in_rx.recv() {
Ok(block) => block,
Err(e) => {
error!(
ctx.expect_logger(),
"Error: broken channel {}",
e.to_string()
);
break;
}
};
chainhook_sidecar_mutate_blocks(
&mut blocks_to_mutate,
&blocks_ids_to_rollback,
&cache_l2,
&config,
&ctx,
);
let _ = block_mutator_out_tx.send(blocks_to_mutate);
});
let ctx = self.ctx.clone();
let config = self.config.clone();
let _ =
hiro_system_kit::thread_named("Chain event notification handler").spawn(move || loop {
let command = match chain_event_notifier_rx.recv() {
Ok(cmd) => cmd,
Err(e) => {
error!(
ctx.expect_logger(),
"Error: broken channel {}",
e.to_string()
let _ = hiro_system_kit::thread_named("Observer Sidecar Runloop").spawn(move || loop {
select! {
recv(block_mutator_in_rx) -> msg => {
if let Ok((mut blocks_to_mutate, blocks_ids_to_rollback)) = msg {
chainhook_sidecar_mutate_blocks(
&mut blocks_to_mutate,
&blocks_ids_to_rollback,
&cache_l2,
&config,
&ctx,
);
break;
let _ = block_mutator_out_tx.send(blocks_to_mutate);
}
};
chainhook_sidecar_mutate_ordhook_db(command, &config, &ctx)
});
}
recv(chain_event_notifier_rx) -> msg => {
if let Ok(command) = msg {
chainhook_sidecar_mutate_ordhook_db(command, &config, &ctx)
}
}
}
});
loop {
let event = match observer_event_rx.recv() {