fix: address redis disconnects

This commit is contained in:
Ludo Galabru
2023-06-14 20:51:37 -04:00
parent ae913be968
commit a6b4a5fb38

View File

@@ -181,29 +181,20 @@ impl Service {
}
}
// Enable HTTP Chainhook API, if required
let mut predicates_db_conn = match self.config.http_api {
PredicatesApi::On(ref api_config) => {
info!(
self.ctx.expect_logger(),
"Listening for chainhook predicate registrations on port {}",
api_config.http_port
);
let ctx = self.ctx.clone();
let api_config = api_config.clone();
let moved_observer_command_tx = observer_command_tx.clone();
// Test and initialize a database connection
let redis_con = open_readwrite_predicates_db_conn_or_panic(&api_config, &self.ctx);
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
let future =
start_predicate_api_server(api_config, moved_observer_command_tx, ctx);
let _ = hiro_system_kit::nestable_block_on(future);
});
Some(redis_con)
}
PredicatesApi::Off => None,
};
if let PredicatesApi::On(ref api_config) = self.config.http_api {
info!(
self.ctx.expect_logger(),
"Listening for chainhook predicate registrations on port {}", api_config.http_port
);
let ctx = self.ctx.clone();
let api_config = api_config.clone();
let moved_observer_command_tx = observer_command_tx.clone();
// Test and initialize a database connection
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
let future = start_predicate_api_server(api_config, moved_observer_command_tx, ctx);
let _ = hiro_system_kit::nestable_block_on(future);
});
}
let mut stacks_event = 0;
loop {
@@ -218,54 +209,98 @@ impl Service {
break;
}
};
match event {
ObserverEvent::PredicateRegistered(chainhook) => {
ObserverEvent::PredicateRegistered(spec) => {
// If start block specified, use it.
// I no start block specified, depending on the nature the hook, we'd like to retrieve:
// - contract-id
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
let chainhook_key = chainhook.key();
let res: Result<(), redis::RedisError> = predicates_db_conn.hset_multiple(
&chainhook_key,
&[
("specification", json!(chainhook).to_string()),
("status", json!(PredicateStatus::Disabled).to_string()),
],
if let PredicatesApi::On(ref config) = self.config.http_api {
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
{
Ok(con) => con,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to register predicate: {}",
e.to_string()
);
continue;
}
};
update_predicate_spec(
&spec.key(),
&spec,
&mut predicates_db_conn,
&self.ctx,
);
if let Err(e) = res {
error!(
self.ctx.expect_logger(),
"unable to store chainhook {chainhook_key}: {}",
e.to_string()
);
update_predicate_status(
&spec.key(),
PredicateStatus::Disabled,
&mut predicates_db_conn,
&self.ctx,
);
}
match spec {
ChainhookSpecification::Stacks(predicate_spec) => {
let _ = stacks_scan_op_tx.send(predicate_spec);
}
match chainhook {
ChainhookSpecification::Stacks(predicate_spec) => {
let _ = stacks_scan_op_tx.send(predicate_spec);
}
ChainhookSpecification::Bitcoin(predicate_spec) => {
let _ = bitcoin_scan_op_tx.send(predicate_spec);
}
ChainhookSpecification::Bitcoin(predicate_spec) => {
let _ = bitcoin_scan_op_tx.send(predicate_spec);
}
}
}
ObserverEvent::PredicateEnabled(spec) => {
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
update_predicate_spec(&spec.key(), &spec, predicates_db_conn, &self.ctx);
if let PredicatesApi::On(ref config) = self.config.http_api {
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
{
Ok(con) => con,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to enable predicate: {}",
e.to_string()
);
continue;
}
};
update_predicate_spec(
&spec.key(),
&spec,
&mut predicates_db_conn,
&self.ctx,
);
update_predicate_status(
&spec.key(),
PredicateStatus::InitialScanCompleted,
predicates_db_conn,
&mut predicates_db_conn,
&self.ctx,
);
}
}
ObserverEvent::PredicateDeregistered(chainhook) => {
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
let chainhook_key = chainhook.key();
let _: Result<(), redis::RedisError> =
predicates_db_conn.del(chainhook_key);
ObserverEvent::PredicateDeregistered(spec) => {
if let PredicatesApi::On(ref config) = self.config.http_api {
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
{
Ok(con) => con,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to deregister predicate: {}",
e.to_string()
);
continue;
}
};
let predicate_key = spec.key();
let res: Result<(), redis::RedisError> =
predicates_db_conn.del(predicate_key);
if let Err(e) = res {
error!(
self.ctx.expect_logger(),
"unable to delete predicate: {}",
e.to_string()
);
}
}
}
ObserverEvent::BitcoinChainEvent((_chain_update, _report)) => {