feat: support a separate storage directory for observers.sqlite (#354)

* chore: observers db dir

* fix: control observers path only on db functions

* chore: test structure

* test: incorrect predicate

* fix: test predicate registration

* test: more

* fix: duplicate uuid test
This commit is contained in:
Rafael Cárdenas
2024-09-03 12:21:56 -06:00
committed by GitHub
parent 154afdf287
commit 7a65fdf107
11 changed files with 736 additions and 278 deletions

View File

@@ -1,2 +1,5 @@
[alias]
ordhook-install = "install --path components/ordhook-cli --locked --force"
[env]
RUST_TEST_THREADS = "1"

View File

@@ -579,7 +579,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
false,
)?;
let _ = initialize_observers_db(&config.expected_cache_path(), ctx);
let _ = initialize_observers_db(&config, ctx);
scan_bitcoin_chainstate_via_rpc_using_predicate(
&predicate_spec,
@@ -631,7 +631,10 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
if row.operation == "transfer_receive" {
continue;
}
println!("BRC-20 {} {} {}", row.operation, row.tick, row.avail_balance);
println!(
"BRC-20 {} {} {}",
row.operation, row.tick, row.avail_balance
);
}
}
None => todo!(),

View File

@@ -65,6 +65,10 @@ impl ConfigFile {
let config = Config {
storage: StorageConfig {
working_dir: config_file.storage.working_dir.unwrap_or("ordhook".into()),
observers_working_dir: config_file
.storage
.observers_working_dir
.unwrap_or("observers".into()),
},
http_api: match config_file.http_api {
None => PredicatesApi::Off,
@@ -176,6 +180,7 @@ pub struct LogConfigFile {
#[derive(Deserialize, Debug, Clone)]
pub struct StorageConfigFile {
pub working_dir: Option<String>,
pub observers_working_dir: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]

View File

@@ -44,6 +44,7 @@ pub struct LogConfig {
#[derive(Clone, Debug)]
pub struct StorageConfig {
pub working_dir: String,
pub observers_working_dir: String,
}
#[derive(Clone, Debug)]
@@ -161,10 +162,17 @@ impl Config {
destination_path
}
pub fn expected_observers_cache_path(&self) -> PathBuf {
let mut destination_path = PathBuf::new();
destination_path.push(&self.storage.observers_working_dir);
destination_path
}
pub fn devnet_default() -> Config {
Config {
storage: StorageConfig {
working_dir: default_cache_path(),
observers_working_dir: default_observers_cache_path(),
},
http_api: PredicatesApi::Off,
snapshot: SnapshotConfig::Build,
@@ -199,6 +207,7 @@ impl Config {
Config {
storage: StorageConfig {
working_dir: default_cache_path(),
observers_working_dir: default_observers_cache_path(),
},
http_api: PredicatesApi::Off,
snapshot: SnapshotConfig::Build,
@@ -233,6 +242,7 @@ impl Config {
Config {
storage: StorageConfig {
working_dir: default_cache_path(),
observers_working_dir: default_observers_cache_path(),
},
http_api: PredicatesApi::Off,
snapshot: SnapshotConfig::Download(SnapshotConfigDownloadUrls {
@@ -272,3 +282,9 @@ pub fn default_cache_path() -> String {
cache_path.push("ordhook");
format!("{}", cache_path.display())
}
pub fn default_observers_cache_path() -> String {
let mut cache_path = std::env::current_dir().expect("unable to get current dir");
cache_path.push("observers");
format!("{}", cache_path.display())
}

View File

@@ -9,9 +9,8 @@ use crate::{
};
use chainhook_sdk::{
types::{
BitcoinBlockData, BitcoinTransactionData, BlockIdentifier, Brc20BalanceData,
Brc20Operation, Brc20TokenDeployData, Brc20TransferData, OrdinalInscriptionRevealData,
OrdinalOperation,
BitcoinBlockData, BitcoinTransactionData, Brc20BalanceData, Brc20Operation,
Brc20TokenDeployData, Brc20TransferData, OrdinalInscriptionRevealData, OrdinalOperation,
},
utils::Context,
};

View File

@@ -190,7 +190,9 @@ async fn validate_or_download_archive_file(
if should_download {
try_info!(ctx, "Downloading {remote_archive_url}");
match download_and_decompress_archive_file(remote_archive_url, file_name, &config, &ctx).await {
match download_and_decompress_archive_file(remote_archive_url, file_name, &config, &ctx)
.await
{
Ok(_) => {}
Err(e) => {
try_error!(ctx, "{e}");

View File

@@ -157,8 +157,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
Err(e) => return Err(format!("Scan aborted: {e}")),
}
{
let observers_db_conn =
open_readwrite_observers_db_conn_or_panic(&config.expected_cache_path(), &ctx);
let observers_db_conn = open_readwrite_observers_db_conn_or_panic(&config, &ctx);
update_observer_progress(
&predicate_spec.uuid,
current_block_height,

View File

@@ -1,50 +1,178 @@
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
sync::{mpsc::Sender, Arc, Mutex},
};
use chainhook_sdk::{
chainhooks::types::{ChainhookFullSpecification, ChainhookSpecification},
observer::ObserverCommand,
chainhooks::types::{
BitcoinChainhookSpecification, ChainhookFullSpecification, ChainhookSpecification,
},
observer::{ObserverCommand, ObserverEvent},
utils::Context,
};
use rocket::config::{self, Config, LogLevel};
use rocket::serde::json::{json, Json, Value as JsonValue};
use rocket::State;
use std::error::Error;
use rocket::{
config::{self, Config, LogLevel},
Ignite, Rocket, Shutdown,
};
use rocket::{
http::Status,
response::status,
serde::json::{json, Json, Value},
};
use rocket::{response::status::Custom, State};
use crate::try_info;
use crate::{
config::PredicatesApi,
service::observers::{
insert_entry_in_observers, open_readwrite_observers_db_conn, remove_entry_from_observers,
update_observer_progress, update_observer_streaming_enabled,
},
try_error, try_info,
};
use super::observers::{
find_all_observers, find_observer_with_uuid, open_readonly_observers_db_conn, ObserverReport,
};
pub async fn start_predicate_api_server(
port: u16,
observers_db_dir_path: PathBuf,
observer_commands_tx: Sender<ObserverCommand>,
ctx: Context,
) -> Result<(), Box<dyn Error>> {
let log_level = LogLevel::Off;
pub async fn start_observers_http_server(
config: &crate::Config,
observer_commands_tx: &std::sync::mpsc::Sender<ObserverCommand>,
observer_event_rx: crossbeam_channel::Receiver<ObserverEvent>,
bitcoin_scan_op_tx: crossbeam_channel::Sender<BitcoinChainhookSpecification>,
ctx: &Context,
) -> Result<Shutdown, String> {
// Build and start HTTP server.
let ignite = build_server(config, observer_commands_tx, ctx).await;
let shutdown = ignite.shutdown();
let _ = hiro_system_kit::thread_named("observers_api-server").spawn(move || {
let _ = hiro_system_kit::nestable_block_on(ignite.launch());
});
// Spawn predicate observer event tread.
let moved_config = config.clone();
let moved_ctx = ctx.clone();
let _ = hiro_system_kit::thread_named("observers_api-events").spawn(move || loop {
let event = match observer_event_rx.recv() {
Ok(cmd) => cmd,
Err(e) => {
try_error!(&moved_ctx, "Error: broken channel {}", e.to_string());
break;
}
};
match event {
ObserverEvent::PredicateRegistered(spec) => {
let observers_db_conn =
match open_readwrite_observers_db_conn(&moved_config, &moved_ctx) {
Ok(con) => con,
Err(e) => {
try_error!(
&moved_ctx,
"unable to register predicate: {}",
e.to_string()
);
continue;
}
};
let report = ObserverReport::default();
insert_entry_in_observers(&spec, &report, &observers_db_conn, &moved_ctx);
match spec {
ChainhookSpecification::Bitcoin(predicate_spec) => {
let _ = bitcoin_scan_op_tx.send(predicate_spec);
}
_ => {}
}
}
ObserverEvent::PredicateEnabled(spec) => {
let observers_db_conn =
match open_readwrite_observers_db_conn(&moved_config, &moved_ctx) {
Ok(con) => con,
Err(e) => {
try_error!(&moved_ctx, "unable to enable observer: {}", e.to_string());
continue;
}
};
update_observer_streaming_enabled(
&spec.uuid(),
true,
&observers_db_conn,
&moved_ctx,
);
}
ObserverEvent::PredicateDeregistered(uuid) => {
let observers_db_conn =
match open_readwrite_observers_db_conn(&moved_config, &moved_ctx) {
Ok(con) => con,
Err(e) => {
try_error!(
&moved_ctx,
"unable to deregister observer: {}",
e.to_string()
);
continue;
}
};
remove_entry_from_observers(&uuid, &observers_db_conn, &moved_ctx);
}
ObserverEvent::BitcoinPredicateTriggered(data) => {
if let Some(ref tip) = data.apply.last() {
let observers_db_conn =
match open_readwrite_observers_db_conn(&moved_config, &moved_ctx) {
Ok(con) => con,
Err(e) => {
try_error!(
&moved_ctx,
"unable to update observer: {}",
e.to_string()
);
continue;
}
};
let last_block_height_update = tip.block.block_identifier.index;
update_observer_progress(
&data.chainhook.uuid,
last_block_height_update,
&observers_db_conn,
&moved_ctx,
)
}
}
ObserverEvent::Terminate => {
try_info!(&moved_ctx, "Terminating runloop");
break;
}
_ => {}
}
});
Ok(shutdown)
}
async fn build_server(
config: &crate::Config,
observer_command_tx: &std::sync::mpsc::Sender<ObserverCommand>,
ctx: &Context,
) -> Rocket<Ignite> {
let PredicatesApi::On(ref api_config) = config.http_api else {
unreachable!();
};
let moved_config = config.clone();
let moved_ctx = ctx.clone();
let moved_observer_commands_tx = observer_command_tx.clone();
let mut shutdown_config = config::Shutdown::default();
shutdown_config.ctrlc = false;
shutdown_config.grace = 1;
shutdown_config.mercy = 1;
let control_config = Config {
port,
port: api_config.http_port,
workers: 1,
address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
keep_alive: 5,
temp_dir: std::env::temp_dir().into(),
log_level,
log_level: LogLevel::Off,
cli_colors: false,
shutdown: shutdown_config,
..Config::default()
};
let routes = routes![
handle_ping,
handle_get_predicates,
@@ -52,27 +180,21 @@ pub async fn start_predicate_api_server(
handle_create_predicate,
handle_delete_bitcoin_predicate,
];
let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone()));
let ctx_cloned = ctx.clone();
let background_job_tx_mutex = Arc::new(Mutex::new(moved_observer_commands_tx));
let ignite = rocket::custom(control_config)
.manage(background_job_tx_mutex)
.manage(observers_db_dir_path)
.manage(ctx_cloned)
.manage(moved_config)
.manage(moved_ctx.clone())
.mount("/", routes)
.ignite()
.await?;
let _ = std::thread::spawn(move || {
let _ = hiro_system_kit::nestable_block_on(ignite.launch());
});
Ok(())
.await
.expect("Unable to build observers API");
ignite
}
#[get("/ping")]
fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
fn handle_ping(ctx: &State<Context>) -> Json<Value> {
try_info!(ctx, "Handling HTTP GET /ping");
Json(json!({
"status": 200,
@@ -82,147 +204,203 @@ fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
#[get("/v1/observers", format = "application/json")]
fn handle_get_predicates(
observers_db_dir_path: &State<PathBuf>,
config: &State<crate::Config>,
ctx: &State<Context>,
) -> Json<JsonValue> {
) -> Result<Json<Value>, Custom<Json<Value>>> {
try_info!(ctx, "Handling HTTP GET /v1/observers");
match open_readonly_observers_db_conn(observers_db_dir_path, ctx) {
match open_readonly_observers_db_conn(config, ctx) {
Ok(mut db_conn) => {
let observers = find_all_observers(&mut db_conn, &ctx);
let serialized_predicates = observers
.iter()
.map(|(p, s)| serialized_predicate_with_status(p, s))
.collect::<Vec<_>>();
Json(json!({
Ok(Json(json!({
"status": 200,
"result": serialized_predicates
}))
})))
}
Err(e) => Json(json!({
"status": 500,
"message": e,
})),
Err(e) => Err(Custom(
Status::InternalServerError,
Json(json!({
"status": 500,
"message": e,
})),
)),
}
}
#[post("/v1/observers", format = "application/json", data = "<predicate>")]
fn handle_create_predicate(
predicate: Json<ChainhookFullSpecification>,
observers_db_dir_path: &State<PathBuf>,
predicate: Json<Value>,
config: &State<crate::Config>,
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
) -> Result<Json<Value>, Custom<Json<Value>>> {
try_info!(ctx, "Handling HTTP POST /v1/observers");
let predicate = predicate.into_inner();
if let Err(e) = predicate.validate() {
return Json(json!({
"status": 422,
"error": e,
}));
}
let predicate_uuid = predicate.get_uuid().to_string();
if let Ok(mut predicates_db_conn) = open_readonly_observers_db_conn(observers_db_dir_path, ctx)
{
let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid));
match find_observer_with_uuid(&key, &mut predicates_db_conn, &ctx) {
Some(_) => {
return Json(json!({
"status": 409,
"error": "Predicate uuid already in use",
}))
let predicate =
match serde_json::from_value::<ChainhookFullSpecification>(predicate.into_inner()) {
Ok(predicate) => predicate,
Err(_) => {
return Err(Custom(
Status::UnprocessableEntity,
Json(json!({
"status": 422,
"error": "Invalid predicate JSON",
})),
));
}
_ => {}
}
};
if let Err(e) = predicate.validate() {
return Err(Custom(
Status::UnprocessableEntity,
Json(json!({
"status": 422,
"error": e,
})),
));
}
let background_job_tx = background_job_tx.inner();
match background_job_tx.lock() {
let mut predicates_db_conn = match open_readonly_observers_db_conn(config, ctx) {
Ok(conn) => conn,
Err(err) => {
return Err(Custom(
Status::InternalServerError,
Json(json!({
"status": 500,
"error": err.to_string(),
})),
));
}
};
let predicate_uuid = predicate.get_uuid().to_string();
if find_observer_with_uuid(&predicate_uuid, &mut predicates_db_conn, &ctx).is_some() {
return Err(status::Custom(
Status::Conflict,
Json(json!({
"status": 409,
"error": "Predicate uuid already in use",
})),
));
}
match background_job_tx.inner().lock() {
Ok(tx) => {
let _ = tx.send(ObserverCommand::RegisterPredicate(predicate));
}
_ => {}
Err(err) => {
return Err(Custom(
Status::InternalServerError,
Json(json!({
"status": 500,
"error": err.to_string(),
})),
));
}
};
Json(json!({
Ok(Json(json!({
"status": 200,
"result": predicate_uuid,
}))
})))
}
#[get("/v1/observers/<predicate_uuid>", format = "application/json")]
fn handle_get_predicate(
predicate_uuid: String,
observers_db_dir_path: &State<PathBuf>,
config: &State<crate::Config>,
ctx: &State<Context>,
) -> Json<JsonValue> {
) -> Result<Json<Value>, Custom<Json<Value>>> {
try_info!(ctx, "Handling HTTP GET /v1/observers/{}", predicate_uuid);
match open_readonly_observers_db_conn(observers_db_dir_path, ctx) {
match open_readonly_observers_db_conn(config, ctx) {
Ok(mut predicates_db_conn) => {
let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid));
let entry = match find_observer_with_uuid(&key, &mut predicates_db_conn, &ctx) {
Some((ChainhookSpecification::Bitcoin(spec), report)) => json!({
"chain": "bitcoin",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": report,
"enabled": spec.enabled,
}),
_ => {
return Json(json!({
"status": 404,
}))
}
};
Json(json!({
let entry =
match find_observer_with_uuid(&predicate_uuid, &mut predicates_db_conn, &ctx) {
Some((ChainhookSpecification::Bitcoin(spec), report)) => json!({
"chain": "bitcoin",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": report,
"enabled": spec.enabled,
}),
_ => {
return Err(Custom(
Status::NotFound,
Json(json!({
"status": 404,
})),
))
}
};
Ok(Json(json!({
"status": 200,
"result": entry
}))
})))
}
Err(e) => Json(json!({
"status": 500,
"message": e,
})),
Err(e) => Err(Custom(
Status::InternalServerError,
Json(json!({
"status": 500,
"message": e,
})),
)),
}
}
#[delete("/v1/observers/<predicate_uuid>", format = "application/json")]
fn handle_delete_bitcoin_predicate(
predicate_uuid: String,
config: &State<crate::Config>,
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
ctx: &State<Context>,
) -> Json<JsonValue> {
) -> Result<Json<Value>, Custom<Json<Value>>> {
try_info!(ctx, "Handling HTTP DELETE /v1/observers/{}", predicate_uuid);
let background_job_tx = background_job_tx.inner();
match background_job_tx.lock() {
let mut predicates_db_conn = match open_readonly_observers_db_conn(config, ctx) {
Ok(conn) => conn,
Err(err) => {
return Err(Custom(
Status::InternalServerError,
Json(json!({
"status": 500,
"error": err.to_string(),
})),
));
}
};
if find_observer_with_uuid(&predicate_uuid, &mut predicates_db_conn, &ctx).is_none() {
return Err(status::Custom(
Status::NotFound,
Json(json!({
"status": 404,
"error": "Predicate not found",
})),
));
}
match background_job_tx.inner().lock() {
Ok(tx) => {
let _ = tx.send(ObserverCommand::DeregisterBitcoinPredicate(predicate_uuid));
}
_ => {}
Err(err) => {
return Err(Custom(
Status::InternalServerError,
Json(json!({
"status": 500,
"error": err.to_string(),
})),
));
}
};
Json(json!({
Ok(Json(json!({
"status": 200,
"result": "Ok",
}))
"result": "Predicate deleted",
})))
}
fn serialized_predicate_with_status(
predicate: &ChainhookSpecification,
report: &ObserverReport,
) -> JsonValue {
) -> Value {
match (predicate, report) {
(ChainhookSpecification::Stacks(spec), report) => json!({
"chain": "stacks",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": report,
"enabled": spec.enabled,
}),
(ChainhookSpecification::Stacks(_), _) => json!({}),
(ChainhookSpecification::Bitcoin(spec), report) => json!({
"chain": "bitcoin",
"uuid": spec.uuid,
@@ -233,3 +411,377 @@ fn serialized_predicate_with_status(
}),
}
}
#[cfg(test)]
mod test {
use std::sync::mpsc::channel;
use chainhook_sdk::{
chainhooks::types::{
BitcoinChainhookSpecification, BitcoinPredicateType, ChainhookSpecification,
HookAction, HttpHook, InscriptionFeedData, OrdinalOperations,
},
observer::ObserverEvent,
types::BitcoinNetwork,
utils::Context,
};
use crossbeam_channel::{Receiver, Sender};
use reqwest::{Client, Response};
use rocket::{form::validate::Len, Shutdown};
use serde_json::{json, Value};
use crate::{
config::{Config, PredicatesApi, PredicatesApiConfig},
service::observers::{delete_observers_db, initialize_observers_db},
};
use super::start_observers_http_server;
async fn launch_server(observer_event_rx: Receiver<ObserverEvent>) -> Shutdown {
let mut config = Config::devnet_default();
config.http_api = PredicatesApi::On(PredicatesApiConfig {
http_port: 20456,
display_logs: true,
});
config.storage.observers_working_dir = "tmp".to_string();
let ctx = Context::empty();
delete_observers_db(&config);
let _ = initialize_observers_db(&config, &ctx);
let (bitcoin_scan_op_tx, _) = crossbeam_channel::unbounded();
let (observer_command_tx, _) = channel();
let shutdown = start_observers_http_server(
&config,
&observer_command_tx,
observer_event_rx,
bitcoin_scan_op_tx,
&ctx,
)
.await
.expect("start failed");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
shutdown
}
fn shutdown_server(observer_event_tx: Sender<ObserverEvent>, shutdown: Shutdown) {
let _ = observer_event_tx.send(ObserverEvent::Terminate);
shutdown.notify();
}
async fn register_predicate(
client: &Client,
observer_event_tx: &Sender<ObserverEvent>,
) -> Response {
let response = client
.post("http://localhost:20456/v1/observers")
.json(&json!({
"uuid": "00000001-0001-0001-0001-000000000001",
"name": "inscription_feed",
"version": 1,
"chain": "bitcoin",
"networks": {
"mainnet": {
"start_block": 767430,
"if_this": {
"scope": "ordinals_protocol",
"operation": "inscription_feed",
},
"then_that": {
"http_post": {
"url": "http://localhost:3700/payload",
"authorization_header": "Bearer test"
}
}
}
}
}))
.send()
.await
.unwrap();
if response.status().is_success() {
// Simulate predicate accepted by chainhook-sdk
let spec = ChainhookSpecification::Bitcoin(BitcoinChainhookSpecification {
uuid: "00000001-0001-0001-0001-000000000001".to_string(),
owner_uuid: None,
name: "inscription_feed".to_string(),
network: BitcoinNetwork::Mainnet,
version: 1,
blocks: None,
start_block: Some(767430),
end_block: None,
expire_after_occurrence: None,
predicate: BitcoinPredicateType::OrdinalsProtocol(
OrdinalOperations::InscriptionFeed(InscriptionFeedData {
meta_protocols: None,
}),
),
action: HookAction::HttpPost(HttpHook {
url: "http://localhost:3700/payload".to_string(),
authorization_header: "Bearer test".to_string(),
}),
include_proof: false,
include_inputs: false,
include_outputs: false,
include_witness: false,
enabled: true,
expired_at: None,
});
let _ = observer_event_tx.send(ObserverEvent::PredicateRegistered(spec.clone()));
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let _ = observer_event_tx.send(ObserverEvent::PredicateEnabled(spec));
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
response
}
async fn delete_predicate(
client: &Client,
observer_event_tx: &Sender<ObserverEvent>,
) -> Response {
let response = client
.delete("http://localhost:20456/v1/observers/00000001-0001-0001-0001-000000000001")
.header("content-type", "application/json")
.send()
.await
.unwrap();
if response.status().is_success() {
// Simulate predicate deregistered by chainhook-sdk
let _ = observer_event_tx.send(ObserverEvent::PredicateDeregistered(
"00000001-0001-0001-0001-000000000001".to_string(),
));
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
response
}
#[tokio::test]
async fn lists_empty_predicates() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let response = client
.get("http://localhost:20456/v1/observers")
.send()
.await
.unwrap();
assert!(response.status().is_success());
let json: Value = response.json().await.unwrap();
assert_eq!(json["status"], 200);
assert_eq!(json["result"].as_array().len(), 0);
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn rejects_arbitrary_json() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let response = client
.post("http://localhost:20456/v1/observers")
.json(&json!({
"id": 1,
}))
.send()
.await
.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::UNPROCESSABLE_ENTITY);
let json: Value = response.json().await.unwrap();
assert_eq!(json["status"], 422);
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn rejects_invalid_predicate() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let response = client
.post("http://localhost:20456/v1/observers")
.json(&json!({
"uuid": "00000001-0001-0001-0001-000000000001",
"name": "inscription_feed",
"version": 1,
"chain": "bitcoin",
"networks": {
"mainnet": {
"start_block": 767430,
"end_block": 200, // Invalid
"if_this": {
"scope": "ordinals_protocol",
"operation": "inscription_feed",
},
"then_that": {
"http_post": {
"url": "http://localhost:3700/payload",
"authorization_header": "Bearer test"
}
}
}
}
}))
.send()
.await
.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::UNPROCESSABLE_ENTITY);
let json: Value = response.json().await.unwrap();
assert_eq!(json["status"], 422);
assert_eq!(
json["error"],
"Chainhook specification field `end_block` should be greater than `start_block`."
);
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn accepts_valid_predicate() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let response = register_predicate(&client, &observer_event_tx).await;
assert_eq!(response.status(), reqwest::StatusCode::OK);
let json: Value = response.json().await.unwrap();
assert_eq!(json["status"], 200);
assert_eq!(json["result"], "00000001-0001-0001-0001-000000000001");
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn lists_predicate() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let _ = register_predicate(&client, &observer_event_tx).await;
let response = client
.get("http://localhost:20456/v1/observers")
.send()
.await
.unwrap();
assert!(response.status().is_success());
let json2: Value = response.json().await.unwrap();
assert_eq!(json2["status"], 200);
let results = json2["result"].as_array().unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0]["uuid"], "00000001-0001-0001-0001-000000000001");
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn shows_predicate() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let _ = register_predicate(&client, &observer_event_tx).await;
let response = client
.get("http://localhost:20456/v1/observers/00000001-0001-0001-0001-000000000001")
.send()
.await
.unwrap();
assert!(response.status().is_success());
let json: Value = response.json().await.unwrap();
assert_eq!(json["status"], 200);
assert_eq!(
json["result"]["uuid"],
"00000001-0001-0001-0001-000000000001"
);
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn rejects_duplicate_predicate_uuid() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let _ = register_predicate(&client, &observer_event_tx).await;
let response = register_predicate(&client, &observer_event_tx).await;
assert_eq!(response.status(), reqwest::StatusCode::CONFLICT);
let json: Value = response.json().await.unwrap();
assert_eq!(json["status"], 409);
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn deletes_registered_predicate() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let _ = register_predicate(&client, &observer_event_tx).await;
let response = delete_predicate(&client, &observer_event_tx).await;
assert_eq!(response.status(), reqwest::StatusCode::OK);
let json: Value = response.json().await.unwrap();
assert_eq!(json["status"], 200);
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn unlists_deleted_predicate() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let _ = register_predicate(&client, &observer_event_tx).await;
let _ = delete_predicate(&client, &observer_event_tx).await;
let response = client
.get("http://localhost:20456/v1/observers")
.send()
.await
.unwrap();
assert!(response.status().is_success());
let json: Value = response.json().await.unwrap();
assert_eq!(json["result"].as_array().len(), 0);
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn unshows_deleted_predicate() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let _ = register_predicate(&client, &observer_event_tx).await;
let _ = delete_predicate(&client, &observer_event_tx).await;
let response = client
.get("http://localhost:20456/v1/observers/00000001-0001-0001-0001-000000000001")
.send()
.await
.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::NOT_FOUND);
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn rejects_non_existing_predicate_delete() {
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let shutdown = launch_server(observer_event_rx).await;
let client = Client::new();
let response = delete_predicate(&client, &observer_event_tx).await;
assert_eq!(response.status(), reqwest::StatusCode::NOT_FOUND);
let json: Value = response.json().await.unwrap();
assert_eq!(json["status"], 404);
shutdown_server(observer_event_tx, shutdown);
}
#[tokio::test]
async fn accepts_ping() {
//
}
}

View File

@@ -29,12 +29,7 @@ use crate::db::{
};
use crate::db::{find_missing_blocks, run_compaction, update_sequence_metadata_with_block};
use crate::scan::bitcoin::process_block_with_predicates;
use crate::service::http_api::start_predicate_api_server;
use crate::service::observers::{
create_and_consolidate_chainhook_config_with_predicates, insert_entry_in_observers,
open_readwrite_observers_db_conn, remove_entry_from_observers, update_observer_progress,
update_observer_streaming_enabled, ObserverReport,
};
use crate::service::observers::create_and_consolidate_chainhook_config_with_predicates;
use crate::service::runloops::start_bitcoin_scan_runloop;
use crate::{try_debug, try_error, try_info};
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookOccurrencePayload;
@@ -55,6 +50,7 @@ use crossbeam_channel::unbounded;
use crossbeam_channel::{select, Sender};
use dashmap::DashMap;
use fxhash::FxHasher;
use http_api::start_observers_http_server;
use rusqlite::Transaction;
use std::collections::{BTreeMap, HashMap};
@@ -250,7 +246,7 @@ impl Service {
&self,
observer_command_tx: &std::sync::mpsc::Sender<ObserverCommand>,
observer_event_rx: crossbeam_channel::Receiver<ObserverEvent>,
predicate_activity_relayer: Option<
_predicate_activity_relayer: Option<
crossbeam_channel::Sender<BitcoinChainhookOccurrencePayload>,
>,
) -> Result<(), String> {
@@ -269,143 +265,21 @@ impl Service {
})
.expect("unable to spawn thread");
if let PredicatesApi::On(ref api_config) = self.config.http_api {
info!(
self.ctx.expect_logger(),
"Listening on port {} for chainhook predicate registrations", api_config.http_port
);
let ctx = self.ctx.clone();
let api_config = api_config.clone();
let moved_observer_command_tx = observer_command_tx.clone();
let db_dir_path = self.config.expected_cache_path();
// Test and initialize a database connection
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
let future = start_predicate_api_server(
api_config.http_port,
db_dir_path,
moved_observer_command_tx,
ctx,
);
let _ = hiro_system_kit::nestable_block_on(future);
if let PredicatesApi::On(_) = self.config.http_api {
let moved_config = self.config.clone();
let moved_ctx = self.ctx.clone();
let moved_observer_commands_tx = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("HTTP Observers API").spawn(move || {
let _ = hiro_system_kit::nestable_block_on(start_observers_http_server(
&moved_config,
&moved_observer_commands_tx,
observer_event_rx,
bitcoin_scan_op_tx,
&moved_ctx,
));
});
}
loop {
let event = match observer_event_rx.recv() {
Ok(cmd) => cmd,
Err(e) => {
error!(
self.ctx.expect_logger(),
"Error: broken channel {}",
e.to_string()
);
break;
}
};
match event {
ObserverEvent::PredicateRegistered(spec) => {
// ?? That seems weird?
// If start block specified, use it.
// If no start block specified, depending on the nature the hook, we'd like to retrieve:
// - contract-id
let observers_db_conn = match open_readwrite_observers_db_conn(
&self.config.expected_cache_path(),
&self.ctx,
) {
Ok(con) => con,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to register predicate: {}",
e.to_string()
);
continue;
}
};
let report = ObserverReport::default();
insert_entry_in_observers(&spec, &report, &observers_db_conn, &self.ctx);
match spec {
ChainhookSpecification::Stacks(_predicate_spec) => {}
ChainhookSpecification::Bitcoin(predicate_spec) => {
let _ = bitcoin_scan_op_tx.send(predicate_spec);
}
}
}
ObserverEvent::PredicateEnabled(spec) => {
let observers_db_conn = match open_readwrite_observers_db_conn(
&self.config.expected_cache_path(),
&self.ctx,
) {
Ok(con) => con,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to enable observer: {}",
e.to_string()
);
continue;
}
};
update_observer_streaming_enabled(
&spec.uuid(),
true,
&observers_db_conn,
&self.ctx,
);
}
ObserverEvent::PredicateDeregistered(uuid) => {
let observers_db_conn = match open_readwrite_observers_db_conn(
&self.config.expected_cache_path(),
&self.ctx,
) {
Ok(con) => con,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to deregister observer: {}",
e.to_string()
);
continue;
}
};
remove_entry_from_observers(&uuid, &observers_db_conn, &self.ctx);
}
ObserverEvent::BitcoinPredicateTriggered(data) => {
if let Some(ref tip) = data.apply.last() {
let observers_db_conn = match open_readwrite_observers_db_conn(
&self.config.expected_cache_path(),
&self.ctx,
) {
Ok(con) => con,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to update observer: {}",
e.to_string()
);
continue;
}
};
let last_block_height_update = tip.block.block_identifier.index;
update_observer_progress(
&data.chainhook.uuid,
last_block_height_update,
&observers_db_conn,
&self.ctx,
)
}
if let Some(ref tx) = predicate_activity_relayer {
let _ = tx.send(data);
}
}
ObserverEvent::Terminate => {
info!(self.ctx.expect_logger(), "Terminating runloop");
break;
}
_ => {}
}
}
Ok(())
}

View File

@@ -73,32 +73,32 @@ pub fn insert_entry_in_observers(
}
}
pub fn get_default_observers_db_file_path(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
fn get_default_observers_db_file_path(config: &Config) -> PathBuf {
let mut destination_path = config.expected_observers_cache_path().clone();
destination_path.push("observers.sqlite");
destination_path
}
pub fn open_readonly_observers_db_conn(
base_dir: &PathBuf,
config: &Config,
ctx: &Context,
) -> Result<Connection, String> {
let db_path = get_default_observers_db_file_path(&base_dir);
let db_path = get_default_observers_db_file_path(config);
let conn = open_existing_readonly_db(&db_path, ctx);
Ok(conn)
}
pub fn open_readwrite_observers_db_conn(
base_dir: &PathBuf,
config: &Config,
ctx: &Context,
) -> Result<Connection, String> {
let db_path = get_default_observers_db_file_path(&base_dir);
let db_path = get_default_observers_db_file_path(config);
let conn = create_or_open_readwrite_db(Some(&db_path), ctx);
Ok(conn)
}
pub fn open_readwrite_observers_db_conn_or_panic(base_dir: &PathBuf, ctx: &Context) -> Connection {
let conn = match open_readwrite_observers_db_conn(base_dir, ctx) {
pub fn open_readwrite_observers_db_conn_or_panic(config: &Config, ctx: &Context) -> Connection {
let conn = match open_readwrite_observers_db_conn(config, ctx) {
Ok(con) => con,
Err(message) => {
error!(ctx.expect_logger(), "Storage: {}", message.to_string());
@@ -108,8 +108,8 @@ pub fn open_readwrite_observers_db_conn_or_panic(base_dir: &PathBuf, ctx: &Conte
conn
}
pub fn initialize_observers_db(base_dir: &PathBuf, ctx: &Context) -> Connection {
let db_path = get_default_observers_db_file_path(&base_dir);
pub fn initialize_observers_db(config: &Config, ctx: &Context) -> Connection {
let db_path = get_default_observers_db_file_path(config);
let conn = create_or_open_readwrite_db(Some(&db_path), ctx);
// TODO: introduce initial output
if let Err(e) = conn.execute(
@@ -126,6 +126,12 @@ pub fn initialize_observers_db(base_dir: &PathBuf, ctx: &Context) -> Connection
conn
}
#[cfg(test)]
pub fn delete_observers_db(config: &Config) {
let path = get_default_observers_db_file_path(config);
let _ = std::fs::remove_file(path);
}
#[derive(Default, Clone, Serialize, Deserialize)]
pub struct ObserverReport {
pub streaming_enabled: bool,
@@ -263,7 +269,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
));
}
let observers_db_conn = initialize_observers_db(&config.expected_cache_path(), ctx);
let observers_db_conn = initialize_observers_db(config, ctx);
let mut observers_to_catchup = vec![];
let mut observers_to_clean_up = vec![];

View File

@@ -10,8 +10,8 @@ use threadpool::ThreadPool;
use crate::{
config::Config,
scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate,
service::{
observers::open_readwrite_observers_db_conn_or_panic, update_observer_streaming_enabled,
service::observers::{
open_readwrite_observers_db_conn_or_panic, update_observer_streaming_enabled,
},
try_error,
};
@@ -28,7 +28,6 @@ pub fn start_bitcoin_scan_runloop(
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx.clone();
let db_base_dir = config.expected_cache_path();
bitcoin_scan_pool.execute(move || {
let op = scan_bitcoin_chainstate_via_rpc_using_predicate(
&predicate_spec,
@@ -47,7 +46,7 @@ pub fn start_bitcoin_scan_runloop(
// Update predicate
let mut observers_db_conn =
open_readwrite_observers_db_conn_or_panic(&db_base_dir, &moved_ctx);
open_readwrite_observers_db_conn_or_panic(&moved_config, &moved_ctx);
update_observer_streaming_enabled(
&predicate_spec.uuid,
false,