feat: expose scanning status in GET endpoint

This commit is contained in:
Ludo Galabru
2023-06-06 01:47:26 -04:00
parent 1eabce25c3
commit 156c463cc0
7 changed files with 227 additions and 122 deletions

View File

@@ -7,7 +7,7 @@ working_dir = "cache"
# dynamically predicates.
# Disable by default.
#
# [http-api]
# [http_api]
# http_port = 20456
# database_uri = "redis://localhost:6379/"

View File

@@ -263,8 +263,12 @@ impl Config {
}
pub fn expected_api_database_uri(&self) -> &str {
&self.expected_api_config().database_uri
}
pub fn expected_api_config(&self) -> &PredicatesApiConfig {
match self.http_api {
PredicatesApi::On(ref config) => config.database_uri.as_str(),
PredicatesApi::On(ref config) => config,
_ => unreachable!(),
}
}

View File

@@ -3,7 +3,11 @@ use std::collections::{HashMap, VecDeque};
use crate::{
archive::download_stacks_dataset_if_required,
block::{Record, RecordKind},
config::Config,
config::{Config, PredicatesApi, PredicatesApiConfig},
service::{
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
ScanningData,
},
storage::{
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present,
@@ -11,7 +15,7 @@ use crate::{
},
};
use chainhook_event_observer::{
chainhooks::stacks::evaluate_stacks_chainhook_on_blocks,
chainhooks::{stacks::evaluate_stacks_chainhook_on_blocks, types::ChainhookSpecification},
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
rocksdb::DB,
utils::Context,
@@ -140,9 +144,15 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
},
};
let proofs = HashMap::new();
let mut predicates_db_conn = match config.http_api {
PredicatesApi::On(ref api_config) => {
Some(open_readwrite_predicates_db_conn_or_panic(api_config, ctx))
}
PredicatesApi::Off => None,
};
let mut actions_triggered = 0;
let proofs = HashMap::new();
let mut occurrences_found = 0;
let mut blocks_scanned = 0;
info!(
ctx.expect_logger(),
@@ -182,7 +192,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
error!(ctx.expect_logger(), "unable to handle action {}", e);
}
Ok(action) => {
actions_triggered += 1;
occurrences_found += 1;
let res = match action {
StacksChainhookOccurrence::Http(request) => {
send_request(request, 3, 1, &ctx).await
@@ -202,6 +212,18 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
if blocks_scanned % 5000 == 0 {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
});
update_predicate_status(&predicate_spec.key(), status, predicates_db_conn)
}
}
cursor += 1;
// Update end_block, in case a new block was discovered during the scan
if cursor == end_block {
@@ -224,9 +246,17 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
}
info!(
ctx.expect_logger(),
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
"{blocks_scanned} blocks scanned, {occurrences_found} occurrences found"
);
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
});
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
update_predicate_status(&predicate_spec.key(), status, predicates_db_conn)
}
Ok(last_block_scanned)
}
@@ -253,7 +283,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
let proofs = HashMap::new();
let mut actions_triggered = 0;
let mut occurrences_found = 0;
let mut blocks_scanned = 0;
info!(
ctx.expect_logger(),
@@ -294,7 +324,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
error!(ctx.expect_logger(), "unable to handle action {}", e);
}
Ok(action) => {
actions_triggered += 1;
occurrences_found += 1;
let res = match action {
StacksChainhookOccurrence::Http(request) => {
send_request(request, 3, 1, &ctx).await
@@ -316,7 +346,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
}
info!(
ctx.expect_logger(),
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
"{blocks_scanned} blocks scanned, {occurrences_found} occurrences found"
);
Ok(last_block_scanned)

View File

@@ -1,15 +1,16 @@
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
sync::{mpsc::Sender, Arc, Mutex, RwLock},
};
use chainhook_event_observer::{
chainhooks::types::{ChainhookFullSpecification, ChainhookSpecification},
observer::{ChainhookStore, ObserverCommand},
observer::ObserverCommand,
utils::Context,
};
use hiro_system_kit::slog;
use redis::Commands;
use redis::{Commands, Connection};
use rocket::config::{self, Config, LogLevel};
use rocket::serde::json::{json, Json, Value as JsonValue};
use rocket::State;
@@ -19,16 +20,14 @@ use std::error::Error;
use crate::config::PredicatesApiConfig;
use super::{open_readwrite_predicates_db_conn_or_panic, PredicateStatus};
pub async fn start_predicate_api_server(
api_config: &PredicatesApiConfig,
observer_commands_tx: Sender<ObserverCommand>,
ctx: Context,
) -> Result<(), Box<dyn Error>> {
let log_level = if api_config.display_logs {
LogLevel::Critical
} else {
LogLevel::Off
};
let log_level = LogLevel::Off;
let mut shutdown_config = config::Shutdown::default();
shutdown_config.ctrlc = false;
@@ -57,11 +56,16 @@ pub async fn start_predicate_api_server(
];
let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone()));
let redis_con_rw_lock = Arc::new(RwLock::new(open_readwrite_predicates_db_conn_or_panic(
api_config, &ctx,
)));
let ctx_cloned = ctx.clone();
let ignite = rocket::custom(control_config)
.manage(background_job_tx_mutex)
.manage(ctx_cloned)
.manage(redis_con_rw_lock)
.mount("/", routes)
.ignite()
.await?;
@@ -85,7 +89,7 @@ fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
#[openapi(tag = "Chainhooks")]
#[get("/v1/chainhooks", format = "application/json")]
fn handle_get_predicates(
chainhook_store: &State<Arc<RwLock<ChainhookStore>>>,
predicate_db: &State<Arc<RwLock<Connection>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks"));
@@ -137,7 +141,7 @@ fn handle_get_predicates(
#[post("/v1/chainhooks", format = "application/json", data = "<predicate>")]
fn handle_create_predicate(
predicate: Json<ChainhookFullSpecification>,
chainhook_store: &State<Arc<RwLock<ChainhookStore>>>,
predicate_db: &State<Arc<RwLock<Connection>>>,
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
@@ -150,15 +154,15 @@ fn handle_create_predicate(
}));
}
if let Ok(chainhook_store_reader) = chainhook_store.inner().read() {
if let Some(_) = chainhook_store_reader
.predicates
.get_spec_with_uuid(predicate.get_uuid())
{
return Json(json!({
"status": 409,
"error": "uuid already in use",
}));
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
match get_entry_from_predicates_db(&predicate.get_uuid(), &mut predicates_db_conn, &ctx) {
Ok(Some(_)) => {
return Json(json!({
"status": 409,
"error": "Predicate uuid already in use",
}))
}
_ => {}
}
}
@@ -180,41 +184,31 @@ fn handle_create_predicate(
#[get("/v1/chainhooks/<predicate_uuid>", format = "application/json")]
fn handle_get_predicate(
predicate_uuid: String,
chainhook_store: &State<Arc<RwLock<ChainhookStore>>>,
predicate_db: &State<Arc<RwLock<Connection>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks/<predicate_uuid>"));
if let Ok(chainhook_store_reader) = chainhook_store.inner().read() {
let predicate = match chainhook_store_reader
.predicates
.get_spec_with_uuid(&predicate_uuid)
{
Some(ChainhookSpecification::Stacks(spec)) => {
json!({
"chain": "stacks",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
})
}
Some(ChainhookSpecification::Bitcoin(spec)) => {
json!({
"chain": "bitcoin",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
})
}
None => {
return Json(json!({
"status": 404,
}))
}
};
return Json(json!({
"status": 200,
"result": predicate
}));
ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks/{}", predicate_uuid));
if let Ok(mut predicates_db_conn) = predicate_db.inner().write() {
match get_entry_from_predicates_db(&predicate_uuid, &mut predicates_db_conn, &ctx) {
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => Json(json!({
"chain": "stacks",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": status
})),
Ok(Some((ChainhookSpecification::Bitcoin(spec), status))) => Json(json!({
"chain": "bitcoin",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": status
})),
_ => Json(json!({
"status": 404,
})),
}
} else {
Json(json!({
"status": 500,
@@ -230,7 +224,7 @@ fn handle_delete_stacks_predicate(
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/<predicate_uuid>"));
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/{}", predicate_uuid));
let background_job_tx = background_job_tx.inner();
match background_job_tx.lock() {
@@ -253,7 +247,7 @@ fn handle_delete_bitcoin_predicate(
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/<predicate_uuid>"));
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/bitcoin/{}", predicate_uuid));
let background_job_tx = background_job_tx.inner();
match background_job_tx.lock() {
@@ -269,26 +263,58 @@ fn handle_delete_bitcoin_predicate(
}))
}
pub fn load_predicates_from_redis(
config: &crate::config::Config,
pub fn get_entry_from_predicates_db(
uuid: &str,
predicate_db_conn: &mut Connection,
_ctx: &Context,
) -> Result<Option<(ChainhookSpecification, PredicateStatus)>, String> {
let entry: HashMap<String, String> = predicate_db_conn
.hgetall(ChainhookSpecification::either_stx_or_btc_key(uuid))
.map_err(|e| {
format!(
"unable to load chainhook associated with key {}: {}",
uuid,
e.to_string()
)
})?;
let encoded_spec = match entry.get("specification") {
None => return Ok(None),
Some(payload) => payload,
};
let spec = match ChainhookSpecification::deserialize_specification(&encoded_spec) {
Err(e) => unimplemented!(),
Ok(spec) => spec,
};
let encoded_status = match entry.get("status") {
None => unimplemented!(),
Some(payload) => payload,
};
let status = match serde_json::from_str(&encoded_status) {
Err(e) => unimplemented!(),
Ok(status) => status,
};
Ok(Some((spec, status)))
}
pub fn get_entries_from_predicates_db(
predicate_db_conn: &mut Connection,
ctx: &Context,
) -> Result<Vec<ChainhookSpecification>, String> {
let redis_uri = config.expected_api_database_uri();
let client = redis::Client::open(redis_uri.clone())
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?;
let mut redis_con = client
.get_connection()
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?;
let chainhooks_to_load: Vec<String> = redis_con
.scan_match("chainhook:*:*:*")
let chainhooks_to_load: Vec<String> = predicate_db_conn
.scan_match(ChainhookSpecification::either_stx_or_btc_key("*"))
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?
.into_iter()
.collect();
let mut predicates = vec![];
for key in chainhooks_to_load.iter() {
let chainhook = match redis_con.hget::<_, _, String>(key, "specification") {
Ok(spec) => match ChainhookSpecification::deserialize_specification(&spec, key) {
let chainhook = match predicate_db_conn.hget::<_, _, String>(key, "specification") {
Ok(spec) => match ChainhookSpecification::deserialize_specification(&spec) {
Ok(spec) => spec,
Err(e) => {
error!(
@@ -314,3 +340,16 @@ pub fn load_predicates_from_redis(
}
Ok(predicates)
}
pub fn load_predicates_from_redis(
config: &crate::config::Config,
ctx: &Context,
) -> Result<Vec<ChainhookSpecification>, String> {
let redis_uri: &str = config.expected_api_database_uri();
let client = redis::Client::open(redis_uri.clone())
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?;
let mut predicate_db_conn = client
.get_connection()
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?;
get_entries_from_predicates_db(&mut predicate_db_conn, ctx)
}

View File

@@ -1,28 +1,21 @@
mod http_api;
mod runloops;
use crate::config::{Config, PredicatesApi};
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::scan::stacks::{
consolidate_local_stacks_chainstate_using_csv,
scan_stacks_chainstate_via_rocksdb_using_predicate,
};
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
use crate::storage::{
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks,
insert_unconfirmed_entry_in_stacks_blocks, open_readonly_stacks_db_conn,
open_readwrite_stacks_db_conn,
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, open_readwrite_stacks_db_conn,
};
use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
use chainhook_event_observer::chainhooks::types::ChainhookSpecification;
use chainhook_event_observer::observer::{start_event_observer, ObserverCommand, ObserverEvent};
use chainhook_event_observer::observer::{start_event_observer, ObserverEvent};
use chainhook_event_observer::utils::Context;
use chainhook_types::{BitcoinBlockSignaling, StacksChainEvent};
use redis::{Commands, Connection};
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
@@ -198,6 +191,8 @@ impl Service {
let ctx = self.ctx.clone();
let api_config = api_config.clone();
let moved_observer_command_tx = observer_command_tx.clone();
// Test and initialize a database connection
let redis_con = open_readwrite_predicates_db_conn_or_panic(&api_config, &self.ctx);
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
let future =
@@ -205,16 +200,6 @@ impl Service {
let _ = hiro_system_kit::nestable_block_on(future);
});
// Test and initialize a database connection
let redis_uri = self.config.expected_api_database_uri();
let client = redis::Client::open(redis_uri.clone()).unwrap();
let redis_con = match client.get_connection() {
Ok(con) => con,
Err(message) => {
error!(self.ctx.expect_logger(), "Redis: {}", message.to_string());
panic!();
}
};
Some(redis_con)
}
PredicatesApi::Off => None,
@@ -245,8 +230,7 @@ impl Service {
&chainhook_key,
&[
("specification", json!(chainhook).to_string()),
("last_evaluation", json!(0).to_string()),
("last_trigger", json!(0).to_string()),
("status", json!(PredicateStatus::Disabled).to_string()),
],
);
if let Err(e) = res {
@@ -342,19 +326,62 @@ impl Service {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PredicateStatus {
InitialScan(u64, u64, u64),
Active(u64),
Scanning(ScanningData),
Streaming(StreamingData),
Disabled,
}
pub fn update_predicate(uuid: String, status: PredicateStatus, redis_con: &Connection) {
// let res: Result<(), redis::RedisError> = redis_con.hset_multiple(
// &chainhook_key,
// &[
// ("specification", json!(chainhook).to_string()),
// ("last_evaluation", json!(0).to_string()),
// ("last_trigger", json!(0).to_string()),
// ],
// );
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScanningData {
pub start_block: u64,
pub cursor: u64,
pub end_block: u64,
pub occurrences_found: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingData {
pub last_occurence: u64,
pub last_evaluation: u64,
}
pub fn update_predicate_status(
predicate_key: &str,
status: PredicateStatus,
predicates_db_conn: &mut Connection,
) {
let res: Result<(), redis::RedisError> =
predicates_db_conn.hset_multiple(&predicate_key, &[("status", json!(status).to_string())]);
}
pub fn retrieve_predicate_status(
predicate_key: &str,
predicates_db_conn: &mut Connection,
) -> Option<PredicateStatus> {
match predicates_db_conn.hget::<_, _, String>(predicate_key.to_string(), "status") {
Ok(ref payload) => match serde_json::from_str(payload) {
Ok(data) => Some(data),
Err(_) => None,
},
Err(_) => None,
}
}
pub fn open_readwrite_predicates_db_conn_or_panic(
config: &PredicatesApiConfig,
ctx: &Context,
) -> Connection {
// Test and initialize a database connection
let redis_uri = &config.database_uri;
let client = redis::Client::open(redis_uri.clone()).unwrap();
let redis_con = match client.get_connection() {
Ok(con) => con,
Err(message) => {
error!(ctx.expect_logger(), "Redis: {}", message.to_string());
panic!();
}
};
redis_con
}

View File

@@ -10,7 +10,7 @@ use chainhook_event_observer::{
use threadpool::ThreadPool;
use crate::{
config::{Config, PredicatesApiConfig},
config::Config,
scan::{
bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate,
stacks::scan_stacks_chainstate_via_rocksdb_using_predicate,

View File

@@ -191,25 +191,26 @@ impl ChainhookSpecification {
}
}
pub fn stacks_key_prefix() -> &'static str {
"predicate:stx:"
pub fn either_stx_or_btc_key(uuid: &str) -> String {
format!("predicate:{}", uuid)
}
pub fn bitcoin_key_prefix() -> &'static str {
"predicate:btc:"
pub fn stacks_key(uuid: &str) -> String {
format!("predicate:{}", uuid)
}
pub fn bitcoin_key(uuid: &str) -> String {
format!("predicate:{}", uuid)
}
pub fn key(&self) -> String {
match &self {
Self::Bitcoin(data) => format!("{}{}", Self::bitcoin_key_prefix(), data.uuid),
Self::Stacks(data) => format!("{}{}", Self::stacks_key_prefix(), data.uuid),
Self::Bitcoin(data) => Self::bitcoin_key(&data.uuid),
Self::Stacks(data) => Self::stacks_key(&data.uuid),
}
}
pub fn deserialize_specification(
spec: &str,
_key: &str,
) -> Result<ChainhookSpecification, String> {
pub fn deserialize_specification(spec: &str) -> Result<ChainhookSpecification, String> {
let spec: ChainhookSpecification = serde_json::from_str(spec)
.map_err(|e| format!("unable to deserialize Stacks chainhook {}", e.to_string()))?;
Ok(spec)
@@ -714,6 +715,10 @@ pub struct StacksChainhookSpecification {
}
impl StacksChainhookSpecification {
pub fn key(&self) -> String {
ChainhookSpecification::stacks_key(&self.uuid)
}
pub fn is_predicate_targeting_block_header(&self) -> bool {
match &self.predicate {
StacksPredicate::BlockHeight(_)