mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-15 00:49:30 +08:00
fix: serialize handlers in one thread
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -457,9 +457,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "chainhook-sdk"
|
||||
version = "0.8.3"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "704ec19f0ceb15420ee8bef92132820320ba3b8e5952171cef94c2f70c6c2abd"
|
||||
checksum = "c896431709407800d80ef410062867e518b8a21d7b8e1ae9bfe0d8e70f7d8b6f"
|
||||
dependencies = [
|
||||
"base58 0.2.0",
|
||||
"base64 0.13.1",
|
||||
|
||||
@@ -12,8 +12,8 @@ redis = "0.21.5"
|
||||
serde-redis = "0.12.0"
|
||||
hex = "0.4.3"
|
||||
rand = "0.8.5"
|
||||
chainhook-sdk = { version = "=0.8.3", default-features = false, features = ["zeromq"] }
|
||||
# chainhook-sdk = { version = "=0.8.3", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
|
||||
chainhook-sdk = { version = "=0.8.4", default-features = false, features = ["zeromq"] }
|
||||
# chainhook-sdk = { version = "=0.8.4", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
|
||||
hiro-system-kit = "0.1.0"
|
||||
reqwest = { version = "0.11", features = ["stream", "json"] }
|
||||
tokio = { version = "=1.24", features = ["full"] }
|
||||
|
||||
@@ -33,6 +33,7 @@ use chainhook_sdk::observer::{
|
||||
};
|
||||
use chainhook_sdk::types::{BitcoinBlockData, BlockIdentifier};
|
||||
use chainhook_sdk::utils::Context;
|
||||
use crossbeam_channel::select;
|
||||
use crossbeam_channel::unbounded;
|
||||
use dashmap::DashMap;
|
||||
use fxhash::FxHasher;
|
||||
@@ -132,9 +133,9 @@ impl Service {
|
||||
|
||||
// Sidecar channels setup
|
||||
let (observer_command_tx, observer_command_rx) = channel();
|
||||
let (block_mutator_in_tx, block_mutator_in_rx) = channel();
|
||||
let (block_mutator_out_tx, block_mutator_out_rx) = channel();
|
||||
let (chain_event_notifier_tx, chain_event_notifier_rx) = channel();
|
||||
let (block_mutator_in_tx, block_mutator_in_rx) = crossbeam_channel::unbounded();
|
||||
let (block_mutator_out_tx, block_mutator_out_rx) = crossbeam_channel::unbounded();
|
||||
let (chain_event_notifier_tx, chain_event_notifier_rx) = crossbeam_channel::unbounded();
|
||||
let observer_sidecar = ObserverSidecar {
|
||||
bitcoin_blocks_mutator: Some((block_mutator_in_tx, block_mutator_out_rx)),
|
||||
bitcoin_chain_event_notifier: Some(chain_event_notifier_tx),
|
||||
@@ -198,46 +199,27 @@ impl Service {
|
||||
let config = self.config.clone();
|
||||
let cache_l2 = traversals_cache.clone();
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Sidecar block mutator").spawn(move || loop {
|
||||
let (mut blocks_to_mutate, blocks_ids_to_rollback) = match block_mutator_in_rx.recv() {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
error!(
|
||||
ctx.expect_logger(),
|
||||
"Error: broken channel {}",
|
||||
e.to_string()
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
chainhook_sidecar_mutate_blocks(
|
||||
&mut blocks_to_mutate,
|
||||
&blocks_ids_to_rollback,
|
||||
&cache_l2,
|
||||
&config,
|
||||
&ctx,
|
||||
);
|
||||
let _ = block_mutator_out_tx.send(blocks_to_mutate);
|
||||
});
|
||||
|
||||
let ctx = self.ctx.clone();
|
||||
let config = self.config.clone();
|
||||
let _ =
|
||||
hiro_system_kit::thread_named("Chain event notification handler").spawn(move || loop {
|
||||
let command = match chain_event_notifier_rx.recv() {
|
||||
Ok(cmd) => cmd,
|
||||
Err(e) => {
|
||||
error!(
|
||||
ctx.expect_logger(),
|
||||
"Error: broken channel {}",
|
||||
e.to_string()
|
||||
let _ = hiro_system_kit::thread_named("Observer Sidecar Runloop").spawn(move || loop {
|
||||
select! {
|
||||
recv(block_mutator_in_rx) -> msg => {
|
||||
if let Ok((mut blocks_to_mutate, blocks_ids_to_rollback)) = msg {
|
||||
chainhook_sidecar_mutate_blocks(
|
||||
&mut blocks_to_mutate,
|
||||
&blocks_ids_to_rollback,
|
||||
&cache_l2,
|
||||
&config,
|
||||
&ctx,
|
||||
);
|
||||
break;
|
||||
let _ = block_mutator_out_tx.send(blocks_to_mutate);
|
||||
}
|
||||
};
|
||||
|
||||
chainhook_sidecar_mutate_ordhook_db(command, &config, &ctx)
|
||||
});
|
||||
}
|
||||
recv(chain_event_notifier_rx) -> msg => {
|
||||
if let Ok(command) = msg {
|
||||
chainhook_sidecar_mutate_ordhook_db(command, &config, &ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
loop {
|
||||
let event = match observer_event_rx.recv() {
|
||||
|
||||
Reference in New Issue
Block a user