mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-14 16:39:27 +08:00
fix: address redis disconnects
This commit is contained in:
@@ -181,29 +181,20 @@ impl Service {
|
||||
}
|
||||
}
|
||||
// Enable HTTP Chainhook API, if required
|
||||
let mut predicates_db_conn = match self.config.http_api {
|
||||
PredicatesApi::On(ref api_config) => {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Listening for chainhook predicate registrations on port {}",
|
||||
api_config.http_port
|
||||
);
|
||||
let ctx = self.ctx.clone();
|
||||
let api_config = api_config.clone();
|
||||
let moved_observer_command_tx = observer_command_tx.clone();
|
||||
// Test and initialize a database connection
|
||||
let redis_con = open_readwrite_predicates_db_conn_or_panic(&api_config, &self.ctx);
|
||||
|
||||
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
|
||||
let future =
|
||||
start_predicate_api_server(api_config, moved_observer_command_tx, ctx);
|
||||
let _ = hiro_system_kit::nestable_block_on(future);
|
||||
});
|
||||
|
||||
Some(redis_con)
|
||||
}
|
||||
PredicatesApi::Off => None,
|
||||
};
|
||||
if let PredicatesApi::On(ref api_config) = self.config.http_api {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Listening for chainhook predicate registrations on port {}", api_config.http_port
|
||||
);
|
||||
let ctx = self.ctx.clone();
|
||||
let api_config = api_config.clone();
|
||||
let moved_observer_command_tx = observer_command_tx.clone();
|
||||
// Test and initialize a database connection
|
||||
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
|
||||
let future = start_predicate_api_server(api_config, moved_observer_command_tx, ctx);
|
||||
let _ = hiro_system_kit::nestable_block_on(future);
|
||||
});
|
||||
}
|
||||
|
||||
let mut stacks_event = 0;
|
||||
loop {
|
||||
@@ -218,54 +209,98 @@ impl Service {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match event {
|
||||
ObserverEvent::PredicateRegistered(chainhook) => {
|
||||
ObserverEvent::PredicateRegistered(spec) => {
|
||||
// If start block specified, use it.
|
||||
// I no start block specified, depending on the nature the hook, we'd like to retrieve:
|
||||
// - contract-id
|
||||
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
|
||||
let chainhook_key = chainhook.key();
|
||||
let res: Result<(), redis::RedisError> = predicates_db_conn.hset_multiple(
|
||||
&chainhook_key,
|
||||
&[
|
||||
("specification", json!(chainhook).to_string()),
|
||||
("status", json!(PredicateStatus::Disabled).to_string()),
|
||||
],
|
||||
if let PredicatesApi::On(ref config) = self.config.http_api {
|
||||
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
|
||||
{
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"unable to register predicate: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
update_predicate_spec(
|
||||
&spec.key(),
|
||||
&spec,
|
||||
&mut predicates_db_conn,
|
||||
&self.ctx,
|
||||
);
|
||||
if let Err(e) = res {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"unable to store chainhook {chainhook_key}: {}",
|
||||
e.to_string()
|
||||
);
|
||||
update_predicate_status(
|
||||
&spec.key(),
|
||||
PredicateStatus::Disabled,
|
||||
&mut predicates_db_conn,
|
||||
&self.ctx,
|
||||
);
|
||||
}
|
||||
match spec {
|
||||
ChainhookSpecification::Stacks(predicate_spec) => {
|
||||
let _ = stacks_scan_op_tx.send(predicate_spec);
|
||||
}
|
||||
match chainhook {
|
||||
ChainhookSpecification::Stacks(predicate_spec) => {
|
||||
let _ = stacks_scan_op_tx.send(predicate_spec);
|
||||
}
|
||||
ChainhookSpecification::Bitcoin(predicate_spec) => {
|
||||
let _ = bitcoin_scan_op_tx.send(predicate_spec);
|
||||
}
|
||||
ChainhookSpecification::Bitcoin(predicate_spec) => {
|
||||
let _ = bitcoin_scan_op_tx.send(predicate_spec);
|
||||
}
|
||||
}
|
||||
}
|
||||
ObserverEvent::PredicateEnabled(spec) => {
|
||||
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
|
||||
update_predicate_spec(&spec.key(), &spec, predicates_db_conn, &self.ctx);
|
||||
if let PredicatesApi::On(ref config) = self.config.http_api {
|
||||
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
|
||||
{
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"unable to enable predicate: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
update_predicate_spec(
|
||||
&spec.key(),
|
||||
&spec,
|
||||
&mut predicates_db_conn,
|
||||
&self.ctx,
|
||||
);
|
||||
update_predicate_status(
|
||||
&spec.key(),
|
||||
PredicateStatus::InitialScanCompleted,
|
||||
predicates_db_conn,
|
||||
&mut predicates_db_conn,
|
||||
&self.ctx,
|
||||
);
|
||||
}
|
||||
}
|
||||
ObserverEvent::PredicateDeregistered(chainhook) => {
|
||||
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
|
||||
let chainhook_key = chainhook.key();
|
||||
let _: Result<(), redis::RedisError> =
|
||||
predicates_db_conn.del(chainhook_key);
|
||||
ObserverEvent::PredicateDeregistered(spec) => {
|
||||
if let PredicatesApi::On(ref config) = self.config.http_api {
|
||||
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
|
||||
{
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"unable to deregister predicate: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let predicate_key = spec.key();
|
||||
let res: Result<(), redis::RedisError> =
|
||||
predicates_db_conn.del(predicate_key);
|
||||
if let Err(e) = res {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"unable to delete predicate: {}",
|
||||
e.to_string()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
ObserverEvent::BitcoinChainEvent((_chain_update, _report)) => {
|
||||
|
||||
Reference in New Issue
Block a user