mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 22:43:06 +08:00
feat: zmq sockets
This commit is contained in:
@@ -17,7 +17,8 @@ use chainhook_event_observer::{
|
||||
chainhooks::types::ChainhookSpecification,
|
||||
};
|
||||
use chainhook_types::{
|
||||
BlockIdentifier, StacksBlockData, StacksBlockMetadata, StacksChainEvent, StacksTransactionData,
|
||||
BitcoinBlockSignaling, BlockIdentifier, StacksBlockData, StacksBlockMetadata, StacksChainEvent,
|
||||
StacksTransactionData,
|
||||
};
|
||||
use redis::{Commands, Connection};
|
||||
use reqwest::Client as HttpClient;
|
||||
@@ -115,8 +116,23 @@ impl Service {
|
||||
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Listening for new blockchain events on port {}", event_observer_config.ingestion_port
|
||||
"Listening on port {} for Stacks chain events", event_observer_config.ingestion_port
|
||||
);
|
||||
match event_observer_config.bitcoin_block_signaling {
|
||||
BitcoinBlockSignaling::ZeroMQ(ref url) => {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Observing Bitcoin chain events via ZeroMQ: {}", url
|
||||
);
|
||||
}
|
||||
BitcoinBlockSignaling::Stacks(ref _url) => {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Observing Bitcoin chain events via Stacks node"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Listening for chainhook predicate registrations on port {}",
|
||||
|
||||
@@ -50,6 +50,7 @@ use std::str::FromStr;
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::Duration;
|
||||
use zeromq::{Socket, SocketRecv};
|
||||
|
||||
pub const DEFAULT_INGESTION_PORT: u16 = 20445;
|
||||
pub const DEFAULT_CONTROL_PORT: u16 = 20446;
|
||||
@@ -255,11 +256,6 @@ pub async fn start_event_observer(
|
||||
observer_events_tx: Option<crossbeam_channel::Sender<ObserverEvent>>,
|
||||
ctx: Context,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
ctx.try_log(|logger| slog::info!(logger, "Event observer starting with config {:?}", config));
|
||||
|
||||
// let ordinal_index = if cfg!(feature = "ordinals") {
|
||||
// Start indexer with a receiver in background thread
|
||||
|
||||
let indexer_config = IndexerConfig {
|
||||
stacks_node_rpc_url: config.stacks_node_rpc_url.clone(),
|
||||
bitcoind_rpc_url: config.bitcoind_rpc_url.clone(),
|
||||
@@ -410,10 +406,49 @@ pub async fn start_event_observer(
|
||||
});
|
||||
|
||||
if let BitcoinBlockSignaling::ZeroMQ(ref bitcoind_zmq_url) = config.bitcoin_block_signaling {
|
||||
let bitcoind_zmq_endpoint = bitcoind_zmq_url.parse::<zeromq::Endpoint>()?;
|
||||
let bitcoind_zmq_url = bitcoind_zmq_url.clone();
|
||||
let ctx_moved = ctx.clone();
|
||||
hiro_system_kit::thread_named("Bitcoind zmq listener")
|
||||
.spawn(move || {
|
||||
// bitcoind_zmq_endpoint
|
||||
ctx_moved.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Waiting for ZMQ connection acknowledgment from bitcoind"
|
||||
)
|
||||
});
|
||||
|
||||
let _ = hiro_system_kit::nestable_block_on(async move {
|
||||
let mut socket = zeromq::SubSocket::new();
|
||||
socket
|
||||
.connect(&bitcoind_zmq_url)
|
||||
.await
|
||||
.expect("Failed to connect");
|
||||
|
||||
socket.subscribe("hashblock").await?;
|
||||
ctx_moved.try_log(|logger| {
|
||||
slog::info!(logger, "Waiting for ZMQ messages from bitcoind")
|
||||
});
|
||||
|
||||
loop {
|
||||
let message = match socket.recv().await {
|
||||
Ok(message) => message,
|
||||
Err(e) => {
|
||||
ctx_moved.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Unable to receive ZMQ message: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let block_hash: String =
|
||||
String::from_utf8(message.get(0).unwrap().to_vec())?;
|
||||
println!("Received {}", block_hash);
|
||||
}
|
||||
Ok::<(), Box<dyn Error>>(())
|
||||
});
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user