mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 16:52:57 +08:00
feat: expose scanning status in GET endpoint
This commit is contained in:
@@ -7,7 +7,7 @@ working_dir = "cache"
|
||||
# dynamically predicates.
|
||||
# Disable by default.
|
||||
#
|
||||
# [http-api]
|
||||
# [http_api]
|
||||
# http_port = 20456
|
||||
# database_uri = "redis://localhost:6379/"
|
||||
|
||||
|
||||
@@ -263,8 +263,12 @@ impl Config {
|
||||
}
|
||||
|
||||
pub fn expected_api_database_uri(&self) -> &str {
|
||||
&self.expected_api_config().database_uri
|
||||
}
|
||||
|
||||
pub fn expected_api_config(&self) -> &PredicatesApiConfig {
|
||||
match self.http_api {
|
||||
PredicatesApi::On(ref config) => config.database_uri.as_str(),
|
||||
PredicatesApi::On(ref config) => config,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,11 @@ use std::collections::{HashMap, VecDeque};
|
||||
use crate::{
|
||||
archive::download_stacks_dataset_if_required,
|
||||
block::{Record, RecordKind},
|
||||
config::Config,
|
||||
config::{Config, PredicatesApi, PredicatesApiConfig},
|
||||
service::{
|
||||
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
|
||||
ScanningData,
|
||||
},
|
||||
storage::{
|
||||
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
|
||||
get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present,
|
||||
@@ -11,7 +15,7 @@ use crate::{
|
||||
},
|
||||
};
|
||||
use chainhook_event_observer::{
|
||||
chainhooks::stacks::evaluate_stacks_chainhook_on_blocks,
|
||||
chainhooks::{stacks::evaluate_stacks_chainhook_on_blocks, types::ChainhookSpecification},
|
||||
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
|
||||
rocksdb::DB,
|
||||
utils::Context,
|
||||
@@ -140,9 +144,15 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
|
||||
},
|
||||
};
|
||||
|
||||
let proofs = HashMap::new();
|
||||
let mut predicates_db_conn = match config.http_api {
|
||||
PredicatesApi::On(ref api_config) => {
|
||||
Some(open_readwrite_predicates_db_conn_or_panic(api_config, ctx))
|
||||
}
|
||||
PredicatesApi::Off => None,
|
||||
};
|
||||
|
||||
let mut actions_triggered = 0;
|
||||
let proofs = HashMap::new();
|
||||
let mut occurrences_found = 0;
|
||||
let mut blocks_scanned = 0;
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
@@ -182,7 +192,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
|
||||
error!(ctx.expect_logger(), "unable to handle action {}", e);
|
||||
}
|
||||
Ok(action) => {
|
||||
actions_triggered += 1;
|
||||
occurrences_found += 1;
|
||||
let res = match action {
|
||||
StacksChainhookOccurrence::Http(request) => {
|
||||
send_request(request, 3, 1, &ctx).await
|
||||
@@ -202,6 +212,18 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
|
||||
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
|
||||
}
|
||||
|
||||
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
|
||||
if blocks_scanned % 5000 == 0 {
|
||||
let status = PredicateStatus::Scanning(ScanningData {
|
||||
start_block,
|
||||
end_block,
|
||||
cursor,
|
||||
occurrences_found,
|
||||
});
|
||||
update_predicate_status(&predicate_spec.key(), status, predicates_db_conn)
|
||||
}
|
||||
}
|
||||
|
||||
cursor += 1;
|
||||
// Update end_block, in case a new block was discovered during the scan
|
||||
if cursor == end_block {
|
||||
@@ -224,9 +246,17 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
|
||||
}
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
|
||||
"{blocks_scanned} blocks scanned, {occurrences_found} occurrences found"
|
||||
);
|
||||
|
||||
let status = PredicateStatus::Scanning(ScanningData {
|
||||
start_block,
|
||||
end_block,
|
||||
cursor,
|
||||
occurrences_found,
|
||||
});
|
||||
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
|
||||
update_predicate_status(&predicate_spec.key(), status, predicates_db_conn)
|
||||
}
|
||||
Ok(last_block_scanned)
|
||||
}
|
||||
|
||||
@@ -253,7 +283,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
|
||||
|
||||
let proofs = HashMap::new();
|
||||
|
||||
let mut actions_triggered = 0;
|
||||
let mut occurrences_found = 0;
|
||||
let mut blocks_scanned = 0;
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
@@ -294,7 +324,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
|
||||
error!(ctx.expect_logger(), "unable to handle action {}", e);
|
||||
}
|
||||
Ok(action) => {
|
||||
actions_triggered += 1;
|
||||
occurrences_found += 1;
|
||||
let res = match action {
|
||||
StacksChainhookOccurrence::Http(request) => {
|
||||
send_request(request, 3, 1, &ctx).await
|
||||
@@ -316,7 +346,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
|
||||
}
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
|
||||
"{blocks_scanned} blocks scanned, {occurrences_found} occurrences found"
|
||||
);
|
||||
|
||||
Ok(last_block_scanned)
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
sync::{mpsc::Sender, Arc, Mutex, RwLock},
|
||||
};
|
||||
|
||||
use chainhook_event_observer::{
|
||||
chainhooks::types::{ChainhookFullSpecification, ChainhookSpecification},
|
||||
observer::{ChainhookStore, ObserverCommand},
|
||||
observer::ObserverCommand,
|
||||
utils::Context,
|
||||
};
|
||||
use hiro_system_kit::slog;
|
||||
use redis::Commands;
|
||||
use redis::{Commands, Connection};
|
||||
use rocket::config::{self, Config, LogLevel};
|
||||
use rocket::serde::json::{json, Json, Value as JsonValue};
|
||||
use rocket::State;
|
||||
@@ -19,16 +20,14 @@ use std::error::Error;
|
||||
|
||||
use crate::config::PredicatesApiConfig;
|
||||
|
||||
use super::{open_readwrite_predicates_db_conn_or_panic, PredicateStatus};
|
||||
|
||||
pub async fn start_predicate_api_server(
|
||||
api_config: &PredicatesApiConfig,
|
||||
observer_commands_tx: Sender<ObserverCommand>,
|
||||
ctx: Context,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
let log_level = if api_config.display_logs {
|
||||
LogLevel::Critical
|
||||
} else {
|
||||
LogLevel::Off
|
||||
};
|
||||
let log_level = LogLevel::Off;
|
||||
|
||||
let mut shutdown_config = config::Shutdown::default();
|
||||
shutdown_config.ctrlc = false;
|
||||
@@ -57,11 +56,16 @@ pub async fn start_predicate_api_server(
|
||||
];
|
||||
|
||||
let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone()));
|
||||
let redis_con_rw_lock = Arc::new(RwLock::new(open_readwrite_predicates_db_conn_or_panic(
|
||||
api_config, &ctx,
|
||||
)));
|
||||
|
||||
let ctx_cloned = ctx.clone();
|
||||
|
||||
let ignite = rocket::custom(control_config)
|
||||
.manage(background_job_tx_mutex)
|
||||
.manage(ctx_cloned)
|
||||
.manage(redis_con_rw_lock)
|
||||
.mount("/", routes)
|
||||
.ignite()
|
||||
.await?;
|
||||
@@ -85,7 +89,7 @@ fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
|
||||
#[openapi(tag = "Chainhooks")]
|
||||
#[get("/v1/chainhooks", format = "application/json")]
|
||||
fn handle_get_predicates(
|
||||
chainhook_store: &State<Arc<RwLock<ChainhookStore>>>,
|
||||
predicate_db: &State<Arc<RwLock<Connection>>>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks"));
|
||||
@@ -137,7 +141,7 @@ fn handle_get_predicates(
|
||||
#[post("/v1/chainhooks", format = "application/json", data = "<predicate>")]
|
||||
fn handle_create_predicate(
|
||||
predicate: Json<ChainhookFullSpecification>,
|
||||
chainhook_store: &State<Arc<RwLock<ChainhookStore>>>,
|
||||
predicate_db: &State<Arc<RwLock<Connection>>>,
|
||||
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
@@ -150,15 +154,15 @@ fn handle_create_predicate(
|
||||
}));
|
||||
}
|
||||
|
||||
if let Ok(chainhook_store_reader) = chainhook_store.inner().read() {
|
||||
if let Some(_) = chainhook_store_reader
|
||||
.predicates
|
||||
.get_spec_with_uuid(predicate.get_uuid())
|
||||
{
|
||||
return Json(json!({
|
||||
"status": 409,
|
||||
"error": "uuid already in use",
|
||||
}));
|
||||
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
|
||||
match get_entry_from_predicates_db(&predicate.get_uuid(), &mut predicates_db_conn, &ctx) {
|
||||
Ok(Some(_)) => {
|
||||
return Json(json!({
|
||||
"status": 409,
|
||||
"error": "Predicate uuid already in use",
|
||||
}))
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,41 +184,31 @@ fn handle_create_predicate(
|
||||
#[get("/v1/chainhooks/<predicate_uuid>", format = "application/json")]
|
||||
fn handle_get_predicate(
|
||||
predicate_uuid: String,
|
||||
chainhook_store: &State<Arc<RwLock<ChainhookStore>>>,
|
||||
predicate_db: &State<Arc<RwLock<Connection>>>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks/<predicate_uuid>"));
|
||||
if let Ok(chainhook_store_reader) = chainhook_store.inner().read() {
|
||||
let predicate = match chainhook_store_reader
|
||||
.predicates
|
||||
.get_spec_with_uuid(&predicate_uuid)
|
||||
{
|
||||
Some(ChainhookSpecification::Stacks(spec)) => {
|
||||
json!({
|
||||
"chain": "stacks",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
})
|
||||
}
|
||||
Some(ChainhookSpecification::Bitcoin(spec)) => {
|
||||
json!({
|
||||
"chain": "bitcoin",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
})
|
||||
}
|
||||
None => {
|
||||
return Json(json!({
|
||||
"status": 404,
|
||||
}))
|
||||
}
|
||||
};
|
||||
return Json(json!({
|
||||
"status": 200,
|
||||
"result": predicate
|
||||
}));
|
||||
ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks/{}", predicate_uuid));
|
||||
|
||||
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
|
||||
match get_entry_from_predicates_db(&predicate_uuid, &mut predicates_db_conn, &ctx) {
|
||||
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => Json(json!({
|
||||
"chain": "stacks",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": status
|
||||
})),
|
||||
Ok(Some((ChainhookSpecification::Bitcoin(spec), status))) => Json(json!({
|
||||
"chain": "bitcoin",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": status
|
||||
})),
|
||||
_ => Json(json!({
|
||||
"status": 404,
|
||||
})),
|
||||
}
|
||||
} else {
|
||||
Json(json!({
|
||||
"status": 500,
|
||||
@@ -230,7 +224,7 @@ fn handle_delete_stacks_predicate(
|
||||
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/<predicate_uuid>"));
|
||||
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/{}", predicate_uuid));
|
||||
|
||||
let background_job_tx = background_job_tx.inner();
|
||||
match background_job_tx.lock() {
|
||||
@@ -253,7 +247,7 @@ fn handle_delete_bitcoin_predicate(
|
||||
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/<predicate_uuid>"));
|
||||
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/bitcoin/{}", predicate_uuid));
|
||||
|
||||
let background_job_tx = background_job_tx.inner();
|
||||
match background_job_tx.lock() {
|
||||
@@ -269,26 +263,58 @@ fn handle_delete_bitcoin_predicate(
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn load_predicates_from_redis(
|
||||
config: &crate::config::Config,
|
||||
pub fn get_entry_from_predicates_db(
|
||||
uuid: &str,
|
||||
predicate_db_conn: &mut Connection,
|
||||
_ctx: &Context,
|
||||
) -> Result<Option<(ChainhookSpecification, PredicateStatus)>, String> {
|
||||
let entry: HashMap<String, String> = predicate_db_conn
|
||||
.hgetall(ChainhookSpecification::either_stx_or_btc_key(uuid))
|
||||
.map_err(|e| {
|
||||
format!(
|
||||
"unable to load chainhook associated with key {}: {}",
|
||||
uuid,
|
||||
e.to_string()
|
||||
)
|
||||
})?;
|
||||
|
||||
let encoded_spec = match entry.get("specification") {
|
||||
None => return Ok(None),
|
||||
Some(payload) => payload,
|
||||
};
|
||||
|
||||
let spec = match ChainhookSpecification::deserialize_specification(&encoded_spec) {
|
||||
Err(e) => unimplemented!(),
|
||||
Ok(spec) => spec,
|
||||
};
|
||||
|
||||
let encoded_status = match entry.get("status") {
|
||||
None => unimplemented!(),
|
||||
Some(payload) => payload,
|
||||
};
|
||||
|
||||
let status = match serde_json::from_str(&encoded_status) {
|
||||
Err(e) => unimplemented!(),
|
||||
Ok(status) => status,
|
||||
};
|
||||
|
||||
Ok(Some((spec, status)))
|
||||
}
|
||||
|
||||
pub fn get_entries_from_predicates_db(
|
||||
predicate_db_conn: &mut Connection,
|
||||
ctx: &Context,
|
||||
) -> Result<Vec<ChainhookSpecification>, String> {
|
||||
let redis_uri = config.expected_api_database_uri();
|
||||
let client = redis::Client::open(redis_uri.clone())
|
||||
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?;
|
||||
let mut redis_con = client
|
||||
.get_connection()
|
||||
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?;
|
||||
let chainhooks_to_load: Vec<String> = redis_con
|
||||
.scan_match("chainhook:*:*:*")
|
||||
let chainhooks_to_load: Vec<String> = predicate_db_conn
|
||||
.scan_match(ChainhookSpecification::either_stx_or_btc_key("*"))
|
||||
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let mut predicates = vec![];
|
||||
for key in chainhooks_to_load.iter() {
|
||||
let chainhook = match redis_con.hget::<_, _, String>(key, "specification") {
|
||||
Ok(spec) => match ChainhookSpecification::deserialize_specification(&spec, key) {
|
||||
let chainhook = match predicate_db_conn.hget::<_, _, String>(key, "specification") {
|
||||
Ok(spec) => match ChainhookSpecification::deserialize_specification(&spec) {
|
||||
Ok(spec) => spec,
|
||||
Err(e) => {
|
||||
error!(
|
||||
@@ -314,3 +340,16 @@ pub fn load_predicates_from_redis(
|
||||
}
|
||||
Ok(predicates)
|
||||
}
|
||||
|
||||
pub fn load_predicates_from_redis(
|
||||
config: &crate::config::Config,
|
||||
ctx: &Context,
|
||||
) -> Result<Vec<ChainhookSpecification>, String> {
|
||||
let redis_uri: &str = config.expected_api_database_uri();
|
||||
let client = redis::Client::open(redis_uri.clone())
|
||||
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?;
|
||||
let mut predicate_db_conn = client
|
||||
.get_connection()
|
||||
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?;
|
||||
get_entries_from_predicates_db(&mut predicate_db_conn, ctx)
|
||||
}
|
||||
|
||||
@@ -1,28 +1,21 @@
|
||||
mod http_api;
|
||||
mod runloops;
|
||||
|
||||
use crate::config::{Config, PredicatesApi};
|
||||
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
|
||||
use crate::scan::stacks::{
|
||||
consolidate_local_stacks_chainstate_using_csv,
|
||||
scan_stacks_chainstate_via_rocksdb_using_predicate,
|
||||
};
|
||||
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
|
||||
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
|
||||
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
|
||||
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
|
||||
use crate::storage::{
|
||||
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks,
|
||||
insert_unconfirmed_entry_in_stacks_blocks, open_readonly_stacks_db_conn,
|
||||
open_readwrite_stacks_db_conn,
|
||||
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, open_readwrite_stacks_db_conn,
|
||||
};
|
||||
|
||||
use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
|
||||
|
||||
use chainhook_event_observer::chainhooks::types::ChainhookSpecification;
|
||||
use chainhook_event_observer::observer::{start_event_observer, ObserverCommand, ObserverEvent};
|
||||
use chainhook_event_observer::observer::{start_event_observer, ObserverEvent};
|
||||
use chainhook_event_observer::utils::Context;
|
||||
use chainhook_types::{BitcoinBlockSignaling, StacksChainEvent};
|
||||
use redis::{Commands, Connection};
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
@@ -198,6 +191,8 @@ impl Service {
|
||||
let ctx = self.ctx.clone();
|
||||
let api_config = api_config.clone();
|
||||
let moved_observer_command_tx = observer_command_tx.clone();
|
||||
// Test and initialize a database connection
|
||||
let redis_con = open_readwrite_predicates_db_conn_or_panic(&api_config, &self.ctx);
|
||||
|
||||
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
|
||||
let future =
|
||||
@@ -205,16 +200,6 @@ impl Service {
|
||||
let _ = hiro_system_kit::nestable_block_on(future);
|
||||
});
|
||||
|
||||
// Test and initialize a database connection
|
||||
let redis_uri = self.config.expected_api_database_uri();
|
||||
let client = redis::Client::open(redis_uri.clone()).unwrap();
|
||||
let redis_con = match client.get_connection() {
|
||||
Ok(con) => con,
|
||||
Err(message) => {
|
||||
error!(self.ctx.expect_logger(), "Redis: {}", message.to_string());
|
||||
panic!();
|
||||
}
|
||||
};
|
||||
Some(redis_con)
|
||||
}
|
||||
PredicatesApi::Off => None,
|
||||
@@ -245,8 +230,7 @@ impl Service {
|
||||
&chainhook_key,
|
||||
&[
|
||||
("specification", json!(chainhook).to_string()),
|
||||
("last_evaluation", json!(0).to_string()),
|
||||
("last_trigger", json!(0).to_string()),
|
||||
("status", json!(PredicateStatus::Disabled).to_string()),
|
||||
],
|
||||
);
|
||||
if let Err(e) = res {
|
||||
@@ -342,19 +326,62 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum PredicateStatus {
|
||||
InitialScan(u64, u64, u64),
|
||||
Active(u64),
|
||||
Scanning(ScanningData),
|
||||
Streaming(StreamingData),
|
||||
Disabled,
|
||||
}
|
||||
|
||||
pub fn update_predicate(uuid: String, status: PredicateStatus, redis_con: &Connection) {
|
||||
// let res: Result<(), redis::RedisError> = redis_con.hset_multiple(
|
||||
// &chainhook_key,
|
||||
// &[
|
||||
// ("specification", json!(chainhook).to_string()),
|
||||
// ("last_evaluation", json!(0).to_string()),
|
||||
// ("last_trigger", json!(0).to_string()),
|
||||
// ],
|
||||
// );
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ScanningData {
|
||||
pub start_block: u64,
|
||||
pub cursor: u64,
|
||||
pub end_block: u64,
|
||||
pub occurrences_found: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct StreamingData {
|
||||
pub last_occurence: u64,
|
||||
pub last_evaluation: u64,
|
||||
}
|
||||
|
||||
pub fn update_predicate_status(
|
||||
predicate_key: &str,
|
||||
status: PredicateStatus,
|
||||
predicates_db_conn: &mut Connection,
|
||||
) {
|
||||
let res: Result<(), redis::RedisError> =
|
||||
predicates_db_conn.hset_multiple(&predicate_key, &[("status", json!(status).to_string())]);
|
||||
}
|
||||
|
||||
pub fn retrieve_predicate_status(
|
||||
predicate_key: &str,
|
||||
predicates_db_conn: &mut Connection,
|
||||
) -> Option<PredicateStatus> {
|
||||
match predicates_db_conn.hget::<_, _, String>(predicate_key.to_string(), "status") {
|
||||
Ok(ref payload) => match serde_json::from_str(payload) {
|
||||
Ok(data) => Some(data),
|
||||
Err(_) => None,
|
||||
},
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn open_readwrite_predicates_db_conn_or_panic(
|
||||
config: &PredicatesApiConfig,
|
||||
ctx: &Context,
|
||||
) -> Connection {
|
||||
// Test and initialize a database connection
|
||||
let redis_uri = &config.database_uri;
|
||||
let client = redis::Client::open(redis_uri.clone()).unwrap();
|
||||
let redis_con = match client.get_connection() {
|
||||
Ok(con) => con,
|
||||
Err(message) => {
|
||||
error!(ctx.expect_logger(), "Redis: {}", message.to_string());
|
||||
panic!();
|
||||
}
|
||||
};
|
||||
redis_con
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use chainhook_event_observer::{
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
use crate::{
|
||||
config::{Config, PredicatesApiConfig},
|
||||
config::Config,
|
||||
scan::{
|
||||
bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate,
|
||||
stacks::scan_stacks_chainstate_via_rocksdb_using_predicate,
|
||||
|
||||
@@ -191,25 +191,26 @@ impl ChainhookSpecification {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stacks_key_prefix() -> &'static str {
|
||||
"predicate:stx:"
|
||||
pub fn either_stx_or_btc_key(uuid: &str) -> String {
|
||||
format!("predicate:{}", uuid)
|
||||
}
|
||||
|
||||
pub fn bitcoin_key_prefix() -> &'static str {
|
||||
"predicate:btc:"
|
||||
pub fn stacks_key(uuid: &str) -> String {
|
||||
format!("predicate:{}", uuid)
|
||||
}
|
||||
|
||||
pub fn bitcoin_key(uuid: &str) -> String {
|
||||
format!("predicate:{}", uuid)
|
||||
}
|
||||
|
||||
pub fn key(&self) -> String {
|
||||
match &self {
|
||||
Self::Bitcoin(data) => format!("{}{}", Self::bitcoin_key_prefix(), data.uuid),
|
||||
Self::Stacks(data) => format!("{}{}", Self::stacks_key_prefix(), data.uuid),
|
||||
Self::Bitcoin(data) => Self::bitcoin_key(&data.uuid),
|
||||
Self::Stacks(data) => Self::stacks_key(&data.uuid),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deserialize_specification(
|
||||
spec: &str,
|
||||
_key: &str,
|
||||
) -> Result<ChainhookSpecification, String> {
|
||||
pub fn deserialize_specification(spec: &str) -> Result<ChainhookSpecification, String> {
|
||||
let spec: ChainhookSpecification = serde_json::from_str(spec)
|
||||
.map_err(|e| format!("unable to deserialize Stacks chainhook {}", e.to_string()))?;
|
||||
Ok(spec)
|
||||
@@ -714,6 +715,10 @@ pub struct StacksChainhookSpecification {
|
||||
}
|
||||
|
||||
impl StacksChainhookSpecification {
|
||||
pub fn key(&self) -> String {
|
||||
ChainhookSpecification::stacks_key(&self.uuid)
|
||||
}
|
||||
|
||||
pub fn is_predicate_targeting_block_header(&self) -> bool {
|
||||
match &self.predicate {
|
||||
StacksPredicate::BlockHeight(_)
|
||||
|
||||
Reference in New Issue
Block a user