feat: zmq sockets

This commit is contained in:
Ludo Galabru
2023-04-03 22:30:38 -04:00
parent de4286e3b8
commit d2e328aa57
2 changed files with 60 additions and 9 deletions

View File

@@ -17,7 +17,8 @@ use chainhook_event_observer::{
chainhooks::types::ChainhookSpecification,
};
use chainhook_types::{
BlockIdentifier, StacksBlockData, StacksBlockMetadata, StacksChainEvent, StacksTransactionData,
BitcoinBlockSignaling, BlockIdentifier, StacksBlockData, StacksBlockMetadata, StacksChainEvent,
StacksTransactionData,
};
use redis::{Commands, Connection};
use reqwest::Client as HttpClient;
@@ -115,8 +116,23 @@ impl Service {
info!(
self.ctx.expect_logger(),
"Listening for new blockchain events on port {}", event_observer_config.ingestion_port
"Listening on port {} for Stacks chain events", event_observer_config.ingestion_port
);
match event_observer_config.bitcoin_block_signaling {
BitcoinBlockSignaling::ZeroMQ(ref url) => {
info!(
self.ctx.expect_logger(),
"Observing Bitcoin chain events via ZeroMQ: {}", url
);
}
BitcoinBlockSignaling::Stacks(ref _url) => {
info!(
self.ctx.expect_logger(),
"Observing Bitcoin chain events via Stacks node"
);
}
}
info!(
self.ctx.expect_logger(),
"Listening for chainhook predicate registrations on port {}",

View File

@@ -50,6 +50,7 @@ use std::str::FromStr;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use zeromq::{Socket, SocketRecv};
pub const DEFAULT_INGESTION_PORT: u16 = 20445;
pub const DEFAULT_CONTROL_PORT: u16 = 20446;
@@ -255,11 +256,6 @@ pub async fn start_event_observer(
observer_events_tx: Option<crossbeam_channel::Sender<ObserverEvent>>,
ctx: Context,
) -> Result<(), Box<dyn Error>> {
ctx.try_log(|logger| slog::info!(logger, "Event observer starting with config {:?}", config));
// let ordinal_index = if cfg!(feature = "ordinals") {
// Start indexer with a receiver in background thread
let indexer_config = IndexerConfig {
stacks_node_rpc_url: config.stacks_node_rpc_url.clone(),
bitcoind_rpc_url: config.bitcoind_rpc_url.clone(),
@@ -410,10 +406,49 @@ pub async fn start_event_observer(
});
if let BitcoinBlockSignaling::ZeroMQ(ref bitcoind_zmq_url) = config.bitcoin_block_signaling {
let bitcoind_zmq_endpoint = bitcoind_zmq_url.parse::<zeromq::Endpoint>()?;
let bitcoind_zmq_url = bitcoind_zmq_url.clone();
let ctx_moved = ctx.clone();
hiro_system_kit::thread_named("Bitcoind zmq listener")
.spawn(move || {
// bitcoind_zmq_endpoint
ctx_moved.try_log(|logger| {
slog::info!(
logger,
"Waiting for ZMQ connection acknowledgment from bitcoind"
)
});
let _ = hiro_system_kit::nestable_block_on(async move {
let mut socket = zeromq::SubSocket::new();
socket
.connect(&bitcoind_zmq_url)
.await
.expect("Failed to connect");
socket.subscribe("hashblock").await?;
ctx_moved.try_log(|logger| {
slog::info!(logger, "Waiting for ZMQ messages from bitcoind")
});
loop {
let message = match socket.recv().await {
Ok(message) => message,
Err(e) => {
ctx_moved.try_log(|logger| {
slog::info!(
logger,
"Unable to receive ZMQ message: {}",
e.to_string()
)
});
continue;
}
};
let block_hash: String =
String::from_utf8(message.get(0).unwrap().to_vec())?;
println!("Received {}", block_hash);
}
Ok::<(), Box<dyn Error>>(())
});
})
.expect("unable to spawn thread");
}