feat: introduce evaluation reports

This commit is contained in:
Ludo Galabru
2023-06-05 17:43:27 -04:00
parent f4bc2e3550
commit 54ad874ee5
6 changed files with 177 additions and 71 deletions

View File

@@ -208,11 +208,12 @@ pub async fn scan_bitcoin_chainstate_via_http_using_predicate(
confirmed_blocks: vec![],
});
let hits = evaluate_bitcoin_chainhooks_on_chain_event(
&chain_event,
vec![&predicate_spec],
ctx,
);
let (predicates_triggered, _predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(
&chain_event,
vec![&predicate_spec],
ctx,
);
info!(
ctx.expect_logger(),
@@ -222,7 +223,9 @@ pub async fn scan_bitcoin_chainstate_via_http_using_predicate(
inscriptions_revealed.join(", ")
);
match execute_predicates_action(hits, &event_observer_config, &ctx).await {
match execute_predicates_action(predicates_triggered, &event_observer_config, &ctx)
.await
{
Ok(actions) => actions_triggered += actions,
Err(_) => err_count += 1,
}
@@ -276,13 +279,16 @@ pub async fn scan_bitcoin_chainstate_via_http_using_predicate(
confirmed_blocks: vec![],
});
let hits = evaluate_bitcoin_chainhooks_on_chain_event(
&chain_event,
vec![&predicate_spec],
ctx,
);
let (predicates_triggered, _predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(
&chain_event,
vec![&predicate_spec],
ctx,
);
match execute_predicates_action(hits, &event_observer_config, &ctx).await {
match execute_predicates_action(predicates_triggered, &event_observer_config, &ctx)
.await
{
Ok(actions) => actions_triggered += actions,
Err(_) => err_count += 1,
}

View File

@@ -310,10 +310,10 @@ impl Service {
let chainhook_key = chainhook.key();
let _: Result<(), redis::RedisError> = redis_con.del(chainhook_key);
}
ObserverEvent::BitcoinChainEvent(_chain_update) => {
ObserverEvent::BitcoinChainEvent((chain_update, report)) => {
debug!(self.ctx.expect_logger(), "Bitcoin update not stored");
}
ObserverEvent::StacksChainEvent(chain_event) => {
ObserverEvent::StacksChainEvent((chain_event, report)) => {
let stacks_db_conn_rw = match open_readwrite_stacks_db_conn(
&self.config.expected_cache_path(),
&self.ctx,

View File

@@ -7,14 +7,14 @@ use crate::utils::Context;
use bitcoincore_rpc::bitcoin::util::address::Payload;
use bitcoincore_rpc::bitcoin::Address;
use chainhook_types::{
BitcoinBlockData, BitcoinChainEvent, BitcoinTransactionData, OrdinalOperation,
BitcoinBlockData, BitcoinChainEvent, BitcoinTransactionData, BlockIdentifier, OrdinalOperation,
StacksBaseChainOperation, TransactionIdentifier,
};
use clarity_repl::clarity::util::hash::to_hex;
use reqwest::{Client, Method};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
use reqwest::RequestBuilder;
@@ -57,8 +57,12 @@ pub fn evaluate_bitcoin_chainhooks_on_chain_event<'a>(
chain_event: &'a BitcoinChainEvent,
active_chainhooks: Vec<&'a BitcoinChainhookSpecification>,
ctx: &Context,
) -> Vec<BitcoinTriggerChainhook<'a>> {
let mut triggered_chainhooks = vec![];
) -> (
Vec<BitcoinTriggerChainhook<'a>>,
BTreeMap<&'a str, &'a BlockIdentifier>,
) {
let mut evaluated_predicates = BTreeMap::new();
let mut triggered_predicates = vec![];
match chain_event {
BitcoinChainEvent::ChainUpdatedWithBlocks(event) => {
for chainhook in active_chainhooks.iter() {
@@ -66,6 +70,7 @@ pub fn evaluate_bitcoin_chainhooks_on_chain_event<'a>(
let rollback = vec![];
for block in event.new_blocks.iter() {
evaluated_predicates.insert(chainhook.uuid.as_str(), &block.block_identifier);
let mut hits = vec![];
for tx in block.transactions.iter() {
if chainhook.predicate.evaluate_transaction_predicate(&tx, ctx) {
@@ -78,7 +83,7 @@ pub fn evaluate_bitcoin_chainhooks_on_chain_event<'a>(
}
if !apply.is_empty() {
triggered_chainhooks.push(BitcoinTriggerChainhook {
triggered_predicates.push(BitcoinTriggerChainhook {
chainhook,
apply,
rollback,
@@ -91,17 +96,6 @@ pub fn evaluate_bitcoin_chainhooks_on_chain_event<'a>(
let mut apply = vec![];
let mut rollback = vec![];
for block in event.blocks_to_apply.iter() {
let mut hits = vec![];
for tx in block.transactions.iter() {
if chainhook.predicate.evaluate_transaction_predicate(&tx, ctx) {
hits.push(tx);
}
}
if hits.len() > 0 {
apply.push((hits, block));
}
}
for block in event.blocks_to_rollback.iter() {
let mut hits = vec![];
for tx in block.transactions.iter() {
@@ -113,8 +107,20 @@ pub fn evaluate_bitcoin_chainhooks_on_chain_event<'a>(
rollback.push((hits, block));
}
}
for block in event.blocks_to_apply.iter() {
evaluated_predicates.insert(chainhook.uuid.as_str(), &block.block_identifier);
let mut hits = vec![];
for tx in block.transactions.iter() {
if chainhook.predicate.evaluate_transaction_predicate(&tx, ctx) {
hits.push(tx);
}
}
if hits.len() > 0 {
apply.push((hits, block));
}
}
if !apply.is_empty() || !rollback.is_empty() {
triggered_chainhooks.push(BitcoinTriggerChainhook {
triggered_predicates.push(BitcoinTriggerChainhook {
chainhook,
apply,
rollback,
@@ -123,7 +129,7 @@ pub fn evaluate_bitcoin_chainhooks_on_chain_event<'a>(
}
}
}
triggered_chainhooks
(triggered_predicates, evaluated_predicates)
}
pub fn serialize_bitcoin_payload_to_json<'a>(

View File

@@ -14,7 +14,7 @@ use clarity_repl::clarity::vm::types::{CharType, SequenceData, Value as ClarityV
use hiro_system_kit::slog;
use reqwest::{Client, Method};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::io::Cursor;
use reqwest::RequestBuilder;
@@ -64,14 +64,23 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>(
chain_event: &'a StacksChainEvent,
active_chainhooks: Vec<&'a StacksChainhookSpecification>,
ctx: &Context,
) -> Vec<StacksTriggerChainhook<'a>> {
let mut triggered_chainhooks = vec![];
) -> (
Vec<StacksTriggerChainhook<'a>>,
BTreeMap<&'a str, &'a BlockIdentifier>,
) {
let mut triggered_predicates = vec![];
let mut evaluated_predicates = BTreeMap::new();
match chain_event {
StacksChainEvent::ChainUpdatedWithBlocks(update) => {
for chainhook in active_chainhooks.iter() {
let mut apply = vec![];
let mut rollback = vec![];
for block_update in update.new_blocks.iter() {
evaluated_predicates.insert(
chainhook.uuid.as_str(),
&block_update.block.block_identifier,
);
for parents_microblock_to_apply in
block_update.parent_microblocks_to_apply.iter()
{
@@ -97,7 +106,7 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>(
));
}
if !apply.is_empty() || !rollback.is_empty() {
triggered_chainhooks.push(StacksTriggerChainhook {
triggered_predicates.push(StacksTriggerChainhook {
chainhook,
apply,
rollback,
@@ -111,6 +120,10 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>(
let rollback = vec![];
for microblock_to_apply in update.new_microblocks.iter() {
evaluated_predicates.insert(
chainhook.uuid.as_str(),
&microblock_to_apply.metadata.anchor_block_identifier,
);
apply.append(&mut evaluate_stacks_chainhook_on_blocks(
vec![microblock_to_apply],
chainhook,
@@ -118,7 +131,7 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>(
));
}
if !apply.is_empty() || !rollback.is_empty() {
triggered_chainhooks.push(StacksTriggerChainhook {
triggered_predicates.push(StacksTriggerChainhook {
chainhook,
apply,
rollback,
@@ -132,6 +145,10 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>(
let mut rollback = vec![];
for microblock_to_apply in update.microblocks_to_apply.iter() {
evaluated_predicates.insert(
chainhook.uuid.as_str(),
&microblock_to_apply.metadata.anchor_block_identifier,
);
apply.append(&mut evaluate_stacks_chainhook_on_blocks(
vec![microblock_to_apply],
chainhook,
@@ -146,7 +163,7 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>(
));
}
if !apply.is_empty() || !rollback.is_empty() {
triggered_chainhooks.push(StacksTriggerChainhook {
triggered_predicates.push(StacksTriggerChainhook {
chainhook,
apply,
rollback,
@@ -160,6 +177,10 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>(
let mut rollback = vec![];
for block_update in update.blocks_to_apply.iter() {
evaluated_predicates.insert(
chainhook.uuid.as_str(),
&block_update.block.block_identifier,
);
for parents_microblock_to_apply in
block_update.parent_microblocks_to_apply.iter()
{
@@ -192,7 +213,7 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>(
));
}
if !apply.is_empty() || !rollback.is_empty() {
triggered_chainhooks.push(StacksTriggerChainhook {
triggered_predicates.push(StacksTriggerChainhook {
chainhook,
apply,
rollback,
@@ -201,7 +222,7 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>(
}
}
}
triggered_chainhooks
(triggered_predicates, evaluated_predicates)
}
pub fn evaluate_stacks_chainhook_on_blocks<'a>(

View File

@@ -558,15 +558,18 @@ pub fn handle_create_predicate(
}
#[openapi(tag = "Chainhooks")]
#[get("/v1/chainhooks/<uuid>", format = "application/json")]
#[get("/v1/chainhooks/<predicate_uuid>", format = "application/json")]
pub fn handle_get_predicate(
uuid: String,
predicate_uuid: String,
chainhook_store: &State<Arc<RwLock<ChainhookStore>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks"));
ctx.try_log(|logger| slog::info!(logger, "GET /v1/chainhooks/<predicate_uuid>"));
if let Ok(chainhook_store_reader) = chainhook_store.inner().read() {
let predicate = match chainhook_store_reader.predicates.get_spec_with_uuid(&uuid) {
let predicate = match chainhook_store_reader
.predicates
.get_spec_with_uuid(&predicate_uuid)
{
Some(ChainhookSpecification::Stacks(spec)) => {
json!({
"chain": "stacks",
@@ -602,18 +605,18 @@ pub fn handle_get_predicate(
}
#[openapi(tag = "Chainhooks")]
#[delete("/v1/chainhooks/stacks/<uuid>", format = "application/json")]
#[delete("/v1/chainhooks/stacks/<predicate_uuid>", format = "application/json")]
pub fn handle_delete_stacks_predicate(
uuid: String,
predicate_uuid: String,
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/<hook_uuid>"));
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/<predicate_uuid>"));
let background_job_tx = background_job_tx.inner();
match background_job_tx.lock() {
Ok(tx) => {
let _ = tx.send(ObserverCommand::DeregisterStacksPredicate(uuid));
let _ = tx.send(ObserverCommand::DeregisterStacksPredicate(predicate_uuid));
}
_ => {}
};
@@ -625,18 +628,18 @@ pub fn handle_delete_stacks_predicate(
}
#[openapi(tag = "Chainhooks")]
#[delete("/v1/chainhooks/bitcoin/<hook_uuid>", format = "application/json")]
#[delete("/v1/chainhooks/bitcoin/<predicate_uuid>", format = "application/json")]
pub fn handle_delete_bitcoin_predicate(
hook_uuid: String,
predicate_uuid: String,
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/<hook_uuid>"));
ctx.try_log(|logger| slog::info!(logger, "DELETE /v1/chainhooks/stacks/<predicate_uuid>"));
let background_job_tx = background_job_tx.inner();
match background_job_tx.lock() {
Ok(tx) => {
let _ = tx.send(ObserverCommand::DeregisterBitcoinPredicate(hook_uuid));
let _ = tx.send(ObserverCommand::DeregisterBitcoinPredicate(predicate_uuid));
}
_ => {}
};

View File

@@ -41,7 +41,7 @@ use rocket::config::{self, Config, LogLevel};
use rocket::data::{Limits, ToByteUnit};
use rocket::serde::Deserialize;
use rocket::Shutdown;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr};
use std::path::PathBuf;
@@ -197,13 +197,56 @@ pub struct MempoolAdmissionData {
pub tx_description: String,
}
#[derive(Clone, Debug)]
pub struct PredicateEvaluationReport {
pub predicates_evaluated: BTreeMap<String, BTreeSet<BlockIdentifier>>,
pub predicates_triggered: BTreeMap<String, BTreeSet<BlockIdentifier>>,
}
impl PredicateEvaluationReport {
pub fn new() -> PredicateEvaluationReport {
PredicateEvaluationReport {
predicates_evaluated: BTreeMap::new(),
predicates_triggered: BTreeMap::new(),
}
}
pub fn track_evaluation(&mut self, uuid: &str, block_identifier: &BlockIdentifier) {
self.predicates_evaluated
.entry(uuid.to_string())
.and_modify(|e| {
e.insert(block_identifier.clone());
})
.or_insert_with(|| {
let mut set = BTreeSet::new();
set.insert(block_identifier.clone());
set
});
}
pub fn track_trigger(&mut self, uuid: &str, blocks: &Vec<&BlockIdentifier>) {
for block_id in blocks.into_iter() {
self.predicates_triggered
.entry(uuid.to_string())
.and_modify(|e| {
e.insert((*block_id).clone());
})
.or_insert_with(|| {
let mut set = BTreeSet::new();
set.insert((*block_id).clone());
set
});
}
}
}
#[derive(Clone, Debug)]
pub enum ObserverEvent {
Error(String),
Fatal(String),
Info(String),
BitcoinChainEvent(BitcoinChainEvent),
StacksChainEvent(StacksChainEvent),
BitcoinChainEvent((BitcoinChainEvent, PredicateEvaluationReport)),
StacksChainEvent((StacksChainEvent, PredicateEvaluationReport)),
NotifyBitcoinTransactionProxied,
HookRegistered(ChainhookSpecification),
HookDeregistered(ChainhookSpecification),
@@ -807,7 +850,7 @@ pub async fn start_observer_commands_handler(
match bitcoin_block_store.get(&header.block_identifier) {
Some(block) => {
#[cfg(feature = "ordinals")]
if let Some(ref hord_config) = config.hord_config {
if let Some(ref _hord_config) = config.hord_config {
if let Err(e) = revert_hord_db_with_augmented_bitcoin_block(
block,
&blocks_db,
@@ -904,6 +947,7 @@ pub async fn start_observer_commands_handler(
// process hooks
let mut hooks_ids_to_deregister = vec![];
let mut requests = vec![];
let mut report = PredicateEvaluationReport::new();
if config.hooks_enabled {
match chainhook_store.read() {
@@ -928,23 +972,35 @@ pub async fn start_observer_commands_handler(
)
});
let chainhooks_candidates = evaluate_bitcoin_chainhooks_on_chain_event(
&chain_event,
bitcoin_chainhooks,
&ctx,
);
let (predicates_triggered, predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(
&chain_event,
bitcoin_chainhooks,
&ctx,
);
for (uuid, block_identifier) in predicates_evaluated.into_iter() {
report.track_evaluation(uuid, block_identifier);
}
for entry in predicates_triggered.iter() {
let blocks_ids = entry
.apply
.iter()
.map(|e| &e.1.block_identifier)
.collect::<Vec<&BlockIdentifier>>();
report.track_trigger(&entry.chainhook.uuid, &blocks_ids);
}
ctx.try_log(|logger| {
slog::info!(
logger,
"{} bitcoin chainhooks positive evaluations",
chainhooks_candidates.len()
predicates_triggered.len()
)
});
let mut chainhooks_to_trigger = vec![];
for trigger in chainhooks_candidates.into_iter() {
for trigger in predicates_triggered.into_iter() {
let mut total_occurrences: u64 = *chainhooks_occurrences_tracker
.get(&trigger.chainhook.uuid)
.unwrap_or(&0);
@@ -1058,7 +1114,7 @@ pub async fn start_observer_commands_handler(
}
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::BitcoinChainEvent(chain_event));
let _ = tx.send(ObserverEvent::BitcoinChainEvent((chain_event, report)));
}
}
ObserverCommand::PropagateStacksChainEvent(chain_event) => {
@@ -1070,6 +1126,8 @@ pub async fn start_observer_commands_handler(
}
let mut hooks_ids_to_deregister = vec![];
let mut requests = vec![];
let mut report = PredicateEvaluationReport::new();
if config.hooks_enabled {
match chainhook_store.read() {
Err(e) => {
@@ -1087,15 +1145,27 @@ pub async fn start_observer_commands_handler(
.collect();
// process hooks
let chainhooks_candidates = evaluate_stacks_chainhooks_on_chain_event(
&chain_event,
stacks_chainhooks,
&ctx,
);
let (predicates_triggered, predicates_evaluated) =
evaluate_stacks_chainhooks_on_chain_event(
&chain_event,
stacks_chainhooks,
&ctx,
);
for (uuid, block_identifier) in predicates_evaluated.into_iter() {
report.track_evaluation(uuid, block_identifier);
}
for entry in predicates_triggered.iter() {
let blocks_ids = entry
.apply
.iter()
.map(|e| e.1.get_identifier())
.collect::<Vec<&BlockIdentifier>>();
report.track_trigger(&entry.chainhook.uuid, &blocks_ids);
}
let mut chainhooks_to_trigger = vec![];
for trigger in chainhooks_candidates.into_iter() {
for trigger in predicates_triggered.into_iter() {
let mut total_occurrences: u64 = *chainhooks_occurrences_tracker
.get(&trigger.chainhook.uuid)
.unwrap_or(&0);
@@ -1184,7 +1254,7 @@ pub async fn start_observer_commands_handler(
}
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::StacksChainEvent(chain_event));
let _ = tx.send(ObserverEvent::StacksChainEvent((chain_event, report)));
}
}
ObserverCommand::PropagateStacksMempoolEvent(mempool_event) => {