mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-14 16:39:27 +08:00
fix: enforce db reconnection in http endpoints
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
sync::{mpsc::Sender, Arc, Mutex, RwLock},
|
||||
sync::{mpsc::Sender, Arc, Mutex},
|
||||
};
|
||||
|
||||
use chainhook_event_observer::{
|
||||
@@ -20,10 +20,10 @@ use std::error::Error;
|
||||
|
||||
use crate::config::PredicatesApiConfig;
|
||||
|
||||
use super::{open_readwrite_predicates_db_conn_or_panic, PredicateStatus};
|
||||
use super::{open_readwrite_predicates_db_conn, PredicateStatus};
|
||||
|
||||
pub async fn start_predicate_api_server(
|
||||
api_config: &PredicatesApiConfig,
|
||||
api_config: PredicatesApiConfig,
|
||||
observer_commands_tx: Sender<ObserverCommand>,
|
||||
ctx: Context,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
@@ -56,16 +56,13 @@ pub async fn start_predicate_api_server(
|
||||
];
|
||||
|
||||
let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone()));
|
||||
let redis_con_rw_lock = Arc::new(RwLock::new(open_readwrite_predicates_db_conn_or_panic(
|
||||
api_config, &ctx,
|
||||
)));
|
||||
|
||||
let ctx_cloned = ctx.clone();
|
||||
|
||||
let ignite = rocket::custom(control_config)
|
||||
.manage(background_job_tx_mutex)
|
||||
.manage(api_config)
|
||||
.manage(ctx_cloned)
|
||||
.manage(redis_con_rw_lock)
|
||||
.mount("/", routes)
|
||||
.ignite()
|
||||
.await?;
|
||||
@@ -89,36 +86,37 @@ fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
|
||||
#[openapi(tag = "Chainhooks")]
|
||||
#[get("/v1/chainhooks", format = "application/json")]
|
||||
fn handle_get_predicates(
|
||||
predicate_db: &State<Arc<RwLock<Connection>>>,
|
||||
api_config: &State<PredicatesApiConfig>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
ctx.try_log(|logger| slog::info!(logger, "Handling HTTP GET /v1/chainhooks"));
|
||||
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
|
||||
let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, &ctx) {
|
||||
Ok(predicates) => predicates,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| slog::warn!(logger, "unable to retrieve predicates: {e}"));
|
||||
return Json(json!({
|
||||
"status": 500,
|
||||
"message": "unable to retrieve predicates",
|
||||
}));
|
||||
}
|
||||
};
|
||||
match open_readwrite_predicates_db_conn(api_config) {
|
||||
Ok(mut predicates_db_conn) => {
|
||||
let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, &ctx) {
|
||||
Ok(predicates) => predicates,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| slog::warn!(logger, "unable to retrieve predicates: {e}"));
|
||||
return Json(json!({
|
||||
"status": 500,
|
||||
"message": "unable to retrieve predicates",
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
let serialized_predicates = predicates
|
||||
.iter()
|
||||
.map(|(p, _)| p.into_serialized_json())
|
||||
.collect::<Vec<_>>();
|
||||
let serialized_predicates = predicates
|
||||
.iter()
|
||||
.map(|(p, _)| p.into_serialized_json())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Json(json!({
|
||||
"status": 200,
|
||||
"result": serialized_predicates
|
||||
}))
|
||||
} else {
|
||||
Json(json!({
|
||||
Json(json!({
|
||||
"status": 200,
|
||||
"result": serialized_predicates
|
||||
}))
|
||||
}
|
||||
Err(e) => Json(json!({
|
||||
"status": 500,
|
||||
"message": "too many requests",
|
||||
}))
|
||||
"message": e,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,7 +124,7 @@ fn handle_get_predicates(
|
||||
#[post("/v1/chainhooks", format = "application/json", data = "<predicate>")]
|
||||
fn handle_create_predicate(
|
||||
predicate: Json<ChainhookFullSpecification>,
|
||||
predicate_db: &State<Arc<RwLock<Connection>>>,
|
||||
api_config: &State<PredicatesApiConfig>,
|
||||
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
@@ -141,7 +139,7 @@ fn handle_create_predicate(
|
||||
|
||||
let predicate_uuid = predicate.get_uuid().to_string();
|
||||
|
||||
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
|
||||
if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) {
|
||||
match get_entry_from_predicates_db(
|
||||
&ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid),
|
||||
&mut predicates_db_conn,
|
||||
@@ -175,7 +173,7 @@ fn handle_create_predicate(
|
||||
#[get("/v1/chainhooks/<predicate_uuid>", format = "application/json")]
|
||||
fn handle_get_predicate(
|
||||
predicate_uuid: String,
|
||||
predicate_db: &State<Arc<RwLock<Connection>>>,
|
||||
api_config: &State<PredicatesApiConfig>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
ctx.try_log(|logger| {
|
||||
@@ -186,43 +184,44 @@ fn handle_get_predicate(
|
||||
)
|
||||
});
|
||||
|
||||
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
|
||||
let entry = match get_entry_from_predicates_db(
|
||||
&ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid),
|
||||
&mut predicates_db_conn,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({
|
||||
"chain": "stacks",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": status,
|
||||
"enabled": spec.enabled,
|
||||
}),
|
||||
Ok(Some((ChainhookSpecification::Bitcoin(spec), status))) => json!({
|
||||
"chain": "bitcoin",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": status,
|
||||
"enabled": spec.enabled,
|
||||
}),
|
||||
_ => {
|
||||
return Json(json!({
|
||||
"status": 404,
|
||||
}))
|
||||
}
|
||||
};
|
||||
Json(json!({
|
||||
"status": 200,
|
||||
"result": entry
|
||||
}))
|
||||
} else {
|
||||
Json(json!({
|
||||
match open_readwrite_predicates_db_conn(api_config) {
|
||||
Ok(mut predicates_db_conn) => {
|
||||
let entry = match get_entry_from_predicates_db(
|
||||
&ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid),
|
||||
&mut predicates_db_conn,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({
|
||||
"chain": "stacks",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": status,
|
||||
"enabled": spec.enabled,
|
||||
}),
|
||||
Ok(Some((ChainhookSpecification::Bitcoin(spec), status))) => json!({
|
||||
"chain": "bitcoin",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": status,
|
||||
"enabled": spec.enabled,
|
||||
}),
|
||||
_ => {
|
||||
return Json(json!({
|
||||
"status": 404,
|
||||
}))
|
||||
}
|
||||
};
|
||||
Json(json!({
|
||||
"status": 200,
|
||||
"result": entry
|
||||
}))
|
||||
}
|
||||
Err(e) => Json(json!({
|
||||
"status": 500,
|
||||
"message": "too many requests",
|
||||
}))
|
||||
"message": e,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -196,7 +196,7 @@ impl Service {
|
||||
|
||||
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
|
||||
let future =
|
||||
start_predicate_api_server(&api_config, moved_observer_command_tx, ctx);
|
||||
start_predicate_api_server(api_config, moved_observer_command_tx, ctx);
|
||||
let _ = hiro_system_kit::nestable_block_on(future);
|
||||
});
|
||||
|
||||
@@ -262,7 +262,7 @@ impl Service {
|
||||
predicates_db_conn.del(chainhook_key);
|
||||
}
|
||||
}
|
||||
ObserverEvent::BitcoinChainEvent((chain_update, report)) => {
|
||||
ObserverEvent::BitcoinChainEvent((_chain_update, _report)) => {
|
||||
debug!(self.ctx.expect_logger(), "Bitcoin update not stored");
|
||||
}
|
||||
ObserverEvent::StacksChainEvent((chain_event, report)) => {
|
||||
@@ -403,14 +403,21 @@ pub fn retrieve_predicate_status(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn open_readwrite_predicates_db_conn(
|
||||
config: &PredicatesApiConfig,
|
||||
) -> Result<Connection, String> {
|
||||
let redis_uri = &config.database_uri;
|
||||
let client = redis::Client::open(redis_uri.clone()).unwrap();
|
||||
client
|
||||
.get_connection()
|
||||
.map_err(|e| format!("unable to connect to db: {}", e.to_string()))
|
||||
}
|
||||
|
||||
pub fn open_readwrite_predicates_db_conn_or_panic(
|
||||
config: &PredicatesApiConfig,
|
||||
ctx: &Context,
|
||||
) -> Connection {
|
||||
// Test and initialize a database connection
|
||||
let redis_uri = &config.database_uri;
|
||||
let client = redis::Client::open(redis_uri.clone()).unwrap();
|
||||
let redis_con = match client.get_connection() {
|
||||
let redis_con = match open_readwrite_predicates_db_conn(config) {
|
||||
Ok(con) => con,
|
||||
Err(message) => {
|
||||
error!(ctx.expect_logger(), "Redis: {}", message.to_string());
|
||||
|
||||
Reference in New Issue
Block a user