fix: enforce db reconnection in http endpoints

This commit is contained in:
Ludo Galabru
2023-06-08 05:16:26 -04:00
parent 3e7b0d03f9
commit bcd2a45a86
2 changed files with 82 additions and 76 deletions

View File

@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
sync::{mpsc::Sender, Arc, Mutex, RwLock},
sync::{mpsc::Sender, Arc, Mutex},
};
use chainhook_event_observer::{
@@ -20,10 +20,10 @@ use std::error::Error;
use crate::config::PredicatesApiConfig;
use super::{open_readwrite_predicates_db_conn_or_panic, PredicateStatus};
use super::{open_readwrite_predicates_db_conn, PredicateStatus};
pub async fn start_predicate_api_server(
api_config: &PredicatesApiConfig,
api_config: PredicatesApiConfig,
observer_commands_tx: Sender<ObserverCommand>,
ctx: Context,
) -> Result<(), Box<dyn Error>> {
@@ -56,16 +56,13 @@ pub async fn start_predicate_api_server(
];
let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone()));
let redis_con_rw_lock = Arc::new(RwLock::new(open_readwrite_predicates_db_conn_or_panic(
api_config, &ctx,
)));
let ctx_cloned = ctx.clone();
let ignite = rocket::custom(control_config)
.manage(background_job_tx_mutex)
.manage(api_config)
.manage(ctx_cloned)
.manage(redis_con_rw_lock)
.mount("/", routes)
.ignite()
.await?;
@@ -89,36 +86,37 @@ fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
#[openapi(tag = "Chainhooks")]
#[get("/v1/chainhooks", format = "application/json")]
fn handle_get_predicates(
predicate_db: &State<Arc<RwLock<Connection>>>,
api_config: &State<PredicatesApiConfig>,
ctx: &State<Context>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "Handling HTTP GET /v1/chainhooks"));
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, &ctx) {
Ok(predicates) => predicates,
Err(e) => {
ctx.try_log(|logger| slog::warn!(logger, "unable to retrieve predicates: {e}"));
return Json(json!({
"status": 500,
"message": "unable to retrieve predicates",
}));
}
};
match open_readwrite_predicates_db_conn(api_config) {
Ok(mut predicates_db_conn) => {
let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, &ctx) {
Ok(predicates) => predicates,
Err(e) => {
ctx.try_log(|logger| slog::warn!(logger, "unable to retrieve predicates: {e}"));
return Json(json!({
"status": 500,
"message": "unable to retrieve predicates",
}));
}
};
let serialized_predicates = predicates
.iter()
.map(|(p, _)| p.into_serialized_json())
.collect::<Vec<_>>();
let serialized_predicates = predicates
.iter()
.map(|(p, _)| p.into_serialized_json())
.collect::<Vec<_>>();
Json(json!({
"status": 200,
"result": serialized_predicates
}))
} else {
Json(json!({
Json(json!({
"status": 200,
"result": serialized_predicates
}))
}
Err(e) => Json(json!({
"status": 500,
"message": "too many requests",
}))
"message": e,
})),
}
}
@@ -126,7 +124,7 @@ fn handle_get_predicates(
#[post("/v1/chainhooks", format = "application/json", data = "<predicate>")]
fn handle_create_predicate(
predicate: Json<ChainhookFullSpecification>,
predicate_db: &State<Arc<RwLock<Connection>>>,
api_config: &State<PredicatesApiConfig>,
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
@@ -141,7 +139,7 @@ fn handle_create_predicate(
let predicate_uuid = predicate.get_uuid().to_string();
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) {
match get_entry_from_predicates_db(
&ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid),
&mut predicates_db_conn,
@@ -175,7 +173,7 @@ fn handle_create_predicate(
#[get("/v1/chainhooks/<predicate_uuid>", format = "application/json")]
fn handle_get_predicate(
predicate_uuid: String,
predicate_db: &State<Arc<RwLock<Connection>>>,
api_config: &State<PredicatesApiConfig>,
ctx: &State<Context>,
) -> Json<JsonValue> {
ctx.try_log(|logger| {
@@ -186,43 +184,44 @@ fn handle_get_predicate(
)
});
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
let entry = match get_entry_from_predicates_db(
&ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid),
&mut predicates_db_conn,
&ctx,
) {
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({
"chain": "stacks",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": status,
"enabled": spec.enabled,
}),
Ok(Some((ChainhookSpecification::Bitcoin(spec), status))) => json!({
"chain": "bitcoin",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": status,
"enabled": spec.enabled,
}),
_ => {
return Json(json!({
"status": 404,
}))
}
};
Json(json!({
"status": 200,
"result": entry
}))
} else {
Json(json!({
match open_readwrite_predicates_db_conn(api_config) {
Ok(mut predicates_db_conn) => {
let entry = match get_entry_from_predicates_db(
&ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid),
&mut predicates_db_conn,
&ctx,
) {
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({
"chain": "stacks",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": status,
"enabled": spec.enabled,
}),
Ok(Some((ChainhookSpecification::Bitcoin(spec), status))) => json!({
"chain": "bitcoin",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": status,
"enabled": spec.enabled,
}),
_ => {
return Json(json!({
"status": 404,
}))
}
};
Json(json!({
"status": 200,
"result": entry
}))
}
Err(e) => Json(json!({
"status": 500,
"message": "too many requests",
}))
"message": e,
})),
}
}

View File

@@ -196,7 +196,7 @@ impl Service {
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
let future =
start_predicate_api_server(&api_config, moved_observer_command_tx, ctx);
start_predicate_api_server(api_config, moved_observer_command_tx, ctx);
let _ = hiro_system_kit::nestable_block_on(future);
});
@@ -262,7 +262,7 @@ impl Service {
predicates_db_conn.del(chainhook_key);
}
}
ObserverEvent::BitcoinChainEvent((chain_update, report)) => {
ObserverEvent::BitcoinChainEvent((_chain_update, _report)) => {
debug!(self.ctx.expect_logger(), "Bitcoin update not stored");
}
ObserverEvent::StacksChainEvent((chain_event, report)) => {
@@ -403,14 +403,21 @@ pub fn retrieve_predicate_status(
}
}
pub fn open_readwrite_predicates_db_conn(
config: &PredicatesApiConfig,
) -> Result<Connection, String> {
let redis_uri = &config.database_uri;
let client = redis::Client::open(redis_uri.clone()).unwrap();
client
.get_connection()
.map_err(|e| format!("unable to connect to db: {}", e.to_string()))
}
pub fn open_readwrite_predicates_db_conn_or_panic(
config: &PredicatesApiConfig,
ctx: &Context,
) -> Connection {
// Test and initialize a database connection
let redis_uri = &config.database_uri;
let client = redis::Client::open(redis_uri.clone()).unwrap();
let redis_con = match client.get_connection() {
let redis_con = match open_readwrite_predicates_db_conn(config) {
Ok(con) => con,
Err(message) => {
error!(ctx.expect_logger(), "Redis: {}", message.to_string());