mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-15 08:58:29 +08:00
feat: implement zmq runloop
This commit is contained in:
@@ -702,7 +702,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
let res = match find_compacted_block_at_block_height(ordinal_block_number, &hord_db_conn) {
|
||||
Some(res) => res,
|
||||
None => {
|
||||
return Err(format!("unable to retrieve block #{ordinal_block_number}"));
|
||||
return Err(format!("block #{ordinal_block_number} not in database"));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -107,7 +107,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
let mut traversals = HashMap::new();
|
||||
if !transactions_ids.is_empty() {
|
||||
let expected_traversals = transactions_ids.len();
|
||||
let (traversal_tx, traversal_rx) = channel::<(TransactionIdentifier, TraversalResult)>();
|
||||
let (traversal_tx, traversal_rx) = channel::<(TransactionIdentifier, _)>();
|
||||
let traversal_data_pool = ThreadPool::new(10);
|
||||
|
||||
for transaction_id in transactions_ids.into_iter() {
|
||||
@@ -123,8 +123,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
&transaction_id,
|
||||
0,
|
||||
&moved_ctx,
|
||||
)
|
||||
.unwrap();
|
||||
);
|
||||
let _ = moved_traversal_tx.send((transaction_id, traversal));
|
||||
});
|
||||
}
|
||||
@@ -132,7 +131,8 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
let mut traversals_received = 0;
|
||||
while let Ok((transaction_identifier, traversal_result)) = traversal_rx.recv() {
|
||||
traversals_received += 1;
|
||||
traversals.insert(transaction_identifier, traversal_result);
|
||||
let traversal = traversal_result?;
|
||||
traversals.insert(transaction_identifier, traversal);
|
||||
if traversals_received == expected_traversals {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use crate::indexer::bitcoin::{
|
||||
retrieve_full_block_breakdown_with_retry, standardize_bitcoin_block, BitcoinBlockFullBreakdown,
|
||||
NewBitcoinBlock,
|
||||
};
|
||||
use crate::indexer::fork_scratch_pad::ForkScratchPad;
|
||||
use crate::indexer::{self, Indexer, IndexerConfig};
|
||||
use crate::utils::{send_request, Context};
|
||||
|
||||
@@ -408,6 +409,7 @@ pub async fn start_event_observer(
|
||||
if let BitcoinBlockSignaling::ZeroMQ(ref bitcoind_zmq_url) = config.bitcoin_block_signaling {
|
||||
let bitcoind_zmq_url = bitcoind_zmq_url.clone();
|
||||
let ctx_moved = ctx.clone();
|
||||
let bitcoin_config = config.get_bitcoin_config();
|
||||
hiro_system_kit::thread_named("Bitcoind zmq listener")
|
||||
.spawn(move || {
|
||||
ctx_moved.try_log(|logger| {
|
||||
@@ -417,38 +419,78 @@ pub async fn start_event_observer(
|
||||
)
|
||||
});
|
||||
|
||||
let _ = hiro_system_kit::nestable_block_on(async move {
|
||||
let mut socket = zeromq::SubSocket::new();
|
||||
socket
|
||||
.connect(&bitcoind_zmq_url)
|
||||
.await
|
||||
.expect("Failed to connect");
|
||||
let _: Result<(), Box<dyn Error>> =
|
||||
hiro_system_kit::nestable_block_on(async move {
|
||||
let mut socket = zeromq::SubSocket::new();
|
||||
|
||||
socket.subscribe("hashblock").await?;
|
||||
ctx_moved.try_log(|logger| {
|
||||
slog::info!(logger, "Waiting for ZMQ messages from bitcoind")
|
||||
});
|
||||
socket
|
||||
.connect(&bitcoind_zmq_url)
|
||||
.await
|
||||
.expect("Failed to connect");
|
||||
|
||||
loop {
|
||||
let message = match socket.recv().await {
|
||||
Ok(message) => message,
|
||||
Err(e) => {
|
||||
ctx_moved.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Unable to receive ZMQ message: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
continue;
|
||||
socket.subscribe("").await?;
|
||||
ctx_moved.try_log(|logger| {
|
||||
slog::info!(logger, "Waiting for ZMQ messages from bitcoind")
|
||||
});
|
||||
|
||||
let mut bitcoin_blocks_pool = ForkScratchPad::new();
|
||||
|
||||
loop {
|
||||
let message = match socket.recv().await {
|
||||
Ok(message) => message,
|
||||
Err(e) => {
|
||||
ctx_moved.try_log(|logger| {
|
||||
slog::error!(
|
||||
logger,
|
||||
"Unable to receive ZMQ message: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let block_hash = hex::encode(message.get(1).unwrap().to_vec());
|
||||
|
||||
let block = match retrieve_full_block_breakdown_with_retry(
|
||||
&block_hash,
|
||||
&bitcoin_config,
|
||||
&ctx_moved,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
ctx_moved.try_log(|logger| {
|
||||
slog::warn!(
|
||||
logger,
|
||||
"unable to retrieve_full_block_breakdown: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
ctx_moved.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Bitcoin block #{} dispatched for processing",
|
||||
block.height
|
||||
)
|
||||
});
|
||||
|
||||
let header = block.get_block_header();
|
||||
let _ = observer_commands_tx
|
||||
.send(ObserverCommand::ProcessBitcoinBlock(block));
|
||||
|
||||
if let Ok(Some(event)) =
|
||||
bitcoin_blocks_pool.process_header(header, &ctx_moved)
|
||||
{
|
||||
let _ = observer_commands_tx
|
||||
.send(ObserverCommand::PropagateBitcoinChainEvent(event));
|
||||
}
|
||||
};
|
||||
let block_hash: String =
|
||||
String::from_utf8(message.get(0).unwrap().to_vec())?;
|
||||
println!("Received {}", block_hash);
|
||||
}
|
||||
Ok::<(), Box<dyn Error>>(())
|
||||
});
|
||||
}
|
||||
});
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user