feat: allow specifying endpoint (hostnames) in event observer config

This commit is contained in:
Matthew Little
2020-04-10 16:24:20 +02:00
parent 0283758153
commit 00d3c506b2
3 changed files with 24 additions and 18 deletions

View File

@@ -122,8 +122,7 @@ impl Config {
.collect();
observers.push(EventObserverConfig {
address: observer.address,
port: observer.port,
endpoint: observer.endpoint,
events_keys
});
}
@@ -326,15 +325,13 @@ pub struct MempoolConfig {
#[derive(Clone, Deserialize)]
pub struct EventObserverConfigFile {
pub port: u16,
pub address: String,
pub endpoint: String,
pub events_keys: Vec<String>,
}
#[derive(Clone, Default)]
pub struct EventObserverConfig {
pub port: u16,
pub address: String,
pub endpoint: String,
pub events_keys: Vec<EventKeyType>,
}

View File

@@ -1,7 +1,7 @@
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::thread::spawn;
use std::net::{SocketAddr};
use std::net::{SocketAddr, ToSocketAddrs};
use mio::tcp::TcpStream;
use serde_json::json;
use serde::Serialize;
@@ -18,20 +18,16 @@ use super::config::{EventObserverConfig, EventKeyType};
#[derive(Debug)]
struct EventObserver {
sock_addr: SocketAddr
endpoint: String
}
impl EventObserver {
pub fn new(address: &str, port: u16) -> EventObserver {
let sock_addr = SocketAddr::new(address.parse().unwrap(), port);
EventObserver { sock_addr }
}
pub fn send(&mut self, filtered_events: Vec<&(Txid, &StacksTransactionEvent)>, chain_tip: &StacksBlock, chain_tip_info: &StacksHeaderInfo, receipts: &Vec<StacksTransactionReceipt>) {
// Initiate a tcp socket
let stream = TcpStream::connect(&self.sock_addr).unwrap();
// Initiate a tcp socket, first using std::net TCP connect for smart DNS resolution
let std_stream = std::net::TcpStream::connect(&self.endpoint).unwrap();
// Then wrap as mio TCP stream
let stream = TcpStream::from_stream(std_stream).unwrap();
// Serialize events to JSON
let serialized_events: Vec<serde_json::Value> = filtered_events.iter().map(|(txid, event)|
event.json_serialize(txid)
@@ -181,8 +177,9 @@ impl EventDispatcher {
}
pub fn register_observer(&mut self, conf: &EventObserverConfig) {
let event_observer = EventObserver::new(&conf.address, conf.port);
// let event_observer = EventObserver::new(&conf.address, conf.port);
let event_observer = EventObserver { endpoint: conf.endpoint.clone() };
let observer_index = self.registered_observers.len() as u16;
for event_key_type in conf.events_keys.iter() {

View File

@@ -1,4 +1,5 @@
use super::{Keychain, MemPool, MemPoolFS, Config, LeaderTenure, BurnchainState, EventDispatcher};
use super::config::{EventObserverConfig, EventKeyType};
use std::collections::HashMap;
use std::sync::mpsc::{channel, Sender, Receiver};
@@ -136,6 +137,17 @@ impl Node {
let mut event_dispatcher = EventDispatcher::new();
// check for observer config in env vars
match std::env::var("STACKS_EVENT_OBSERVER") {
Ok(val) => {
event_dispatcher.register_observer(&EventObserverConfig {
endpoint: val,
events_keys: vec![EventKeyType::AnyEvent],
});
},
_ => ()
}
for observer in &config.events_observers {
event_dispatcher.register_observer(observer);
}