feat: implement zmq runloop

This commit is contained in:
Ludo Galabru
2023-04-04 10:00:25 -04:00
parent d2e328aa57
commit c6c1c0ecce
3 changed files with 76 additions and 34 deletions

View File

@@ -702,7 +702,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
let res = match find_compacted_block_at_block_height(ordinal_block_number, &hord_db_conn) {
Some(res) => res,
None => {
return Err(format!("unable to retrieve block #{ordinal_block_number}"));
return Err(format!("block #{ordinal_block_number} not in database"));
}
};

View File

@@ -107,7 +107,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
let mut traversals = HashMap::new();
if !transactions_ids.is_empty() {
let expected_traversals = transactions_ids.len();
let (traversal_tx, traversal_rx) = channel::<(TransactionIdentifier, TraversalResult)>();
let (traversal_tx, traversal_rx) = channel::<(TransactionIdentifier, _)>();
let traversal_data_pool = ThreadPool::new(10);
for transaction_id in transactions_ids.into_iter() {
@@ -123,8 +123,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
&transaction_id,
0,
&moved_ctx,
)
.unwrap();
);
let _ = moved_traversal_tx.send((transaction_id, traversal));
});
}
@@ -132,7 +131,8 @@ pub fn update_hord_db_and_augment_bitcoin_block(
let mut traversals_received = 0;
while let Ok((transaction_identifier, traversal_result)) = traversal_rx.recv() {
traversals_received += 1;
traversals.insert(transaction_identifier, traversal_result);
let traversal = traversal_result?;
traversals.insert(transaction_identifier, traversal);
if traversals_received == expected_traversals {
break;
}

View File

@@ -18,6 +18,7 @@ use crate::indexer::bitcoin::{
retrieve_full_block_breakdown_with_retry, standardize_bitcoin_block, BitcoinBlockFullBreakdown,
NewBitcoinBlock,
};
use crate::indexer::fork_scratch_pad::ForkScratchPad;
use crate::indexer::{self, Indexer, IndexerConfig};
use crate::utils::{send_request, Context};
@@ -408,6 +409,7 @@ pub async fn start_event_observer(
if let BitcoinBlockSignaling::ZeroMQ(ref bitcoind_zmq_url) = config.bitcoin_block_signaling {
let bitcoind_zmq_url = bitcoind_zmq_url.clone();
let ctx_moved = ctx.clone();
let bitcoin_config = config.get_bitcoin_config();
hiro_system_kit::thread_named("Bitcoind zmq listener")
.spawn(move || {
ctx_moved.try_log(|logger| {
@@ -417,38 +419,78 @@ pub async fn start_event_observer(
)
});
let _ = hiro_system_kit::nestable_block_on(async move {
let mut socket = zeromq::SubSocket::new();
socket
.connect(&bitcoind_zmq_url)
.await
.expect("Failed to connect");
let _: Result<(), Box<dyn Error>> =
hiro_system_kit::nestable_block_on(async move {
let mut socket = zeromq::SubSocket::new();
socket.subscribe("hashblock").await?;
ctx_moved.try_log(|logger| {
slog::info!(logger, "Waiting for ZMQ messages from bitcoind")
});
socket
.connect(&bitcoind_zmq_url)
.await
.expect("Failed to connect");
loop {
let message = match socket.recv().await {
Ok(message) => message,
Err(e) => {
ctx_moved.try_log(|logger| {
slog::info!(
logger,
"Unable to receive ZMQ message: {}",
e.to_string()
)
});
continue;
socket.subscribe("").await?;
ctx_moved.try_log(|logger| {
slog::info!(logger, "Waiting for ZMQ messages from bitcoind")
});
let mut bitcoin_blocks_pool = ForkScratchPad::new();
loop {
let message = match socket.recv().await {
Ok(message) => message,
Err(e) => {
ctx_moved.try_log(|logger| {
slog::error!(
logger,
"Unable to receive ZMQ message: {}",
e.to_string()
)
});
continue;
}
};
let block_hash = hex::encode(message.get(1).unwrap().to_vec());
let block = match retrieve_full_block_breakdown_with_retry(
&block_hash,
&bitcoin_config,
&ctx_moved,
)
.await
{
Ok(block) => block,
Err(e) => {
ctx_moved.try_log(|logger| {
slog::warn!(
logger,
"unable to retrieve_full_block_breakdown: {}",
e.to_string()
)
});
continue;
}
};
ctx_moved.try_log(|logger| {
slog::info!(
logger,
"Bitcoin block #{} dispatched for processing",
block.height
)
});
let header = block.get_block_header();
let _ = observer_commands_tx
.send(ObserverCommand::ProcessBitcoinBlock(block));
if let Ok(Some(event)) =
bitcoin_blocks_pool.process_header(header, &ctx_moved)
{
let _ = observer_commands_tx
.send(ObserverCommand::PropagateBitcoinChainEvent(event));
}
};
let block_hash: String =
String::from_utf8(message.get(0).unwrap().to_vec())?;
println!("Received {}", block_hash);
}
Ok::<(), Box<dyn Error>>(())
});
}
});
})
.expect("unable to spawn thread");
}