mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-13 08:40:17 +08:00
feat: support a separate storage directory for observers.sqlite (#354)
* chore: observers db dir * fix: control observers path only on db functions * chore: test structure * test: incorrect predicate * fix: test predicate registration * test: more * fix: duplicate uuid test
This commit is contained in:
@@ -1,2 +1,5 @@
|
||||
[alias]
|
||||
ordhook-install = "install --path components/ordhook-cli --locked --force"
|
||||
|
||||
[env]
|
||||
RUST_TEST_THREADS = "1"
|
||||
|
||||
@@ -579,7 +579,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
false,
|
||||
)?;
|
||||
|
||||
let _ = initialize_observers_db(&config.expected_cache_path(), ctx);
|
||||
let _ = initialize_observers_db(&config, ctx);
|
||||
|
||||
scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
&predicate_spec,
|
||||
@@ -631,7 +631,10 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
if row.operation == "transfer_receive" {
|
||||
continue;
|
||||
}
|
||||
println!("BRC-20 {} {} {}", row.operation, row.tick, row.avail_balance);
|
||||
println!(
|
||||
"BRC-20 {} {} {}",
|
||||
row.operation, row.tick, row.avail_balance
|
||||
);
|
||||
}
|
||||
}
|
||||
None => todo!(),
|
||||
|
||||
@@ -65,6 +65,10 @@ impl ConfigFile {
|
||||
let config = Config {
|
||||
storage: StorageConfig {
|
||||
working_dir: config_file.storage.working_dir.unwrap_or("ordhook".into()),
|
||||
observers_working_dir: config_file
|
||||
.storage
|
||||
.observers_working_dir
|
||||
.unwrap_or("observers".into()),
|
||||
},
|
||||
http_api: match config_file.http_api {
|
||||
None => PredicatesApi::Off,
|
||||
@@ -176,6 +180,7 @@ pub struct LogConfigFile {
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct StorageConfigFile {
|
||||
pub working_dir: Option<String>,
|
||||
pub observers_working_dir: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
|
||||
@@ -44,6 +44,7 @@ pub struct LogConfig {
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct StorageConfig {
|
||||
pub working_dir: String,
|
||||
pub observers_working_dir: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -161,10 +162,17 @@ impl Config {
|
||||
destination_path
|
||||
}
|
||||
|
||||
pub fn expected_observers_cache_path(&self) -> PathBuf {
|
||||
let mut destination_path = PathBuf::new();
|
||||
destination_path.push(&self.storage.observers_working_dir);
|
||||
destination_path
|
||||
}
|
||||
|
||||
pub fn devnet_default() -> Config {
|
||||
Config {
|
||||
storage: StorageConfig {
|
||||
working_dir: default_cache_path(),
|
||||
observers_working_dir: default_observers_cache_path(),
|
||||
},
|
||||
http_api: PredicatesApi::Off,
|
||||
snapshot: SnapshotConfig::Build,
|
||||
@@ -199,6 +207,7 @@ impl Config {
|
||||
Config {
|
||||
storage: StorageConfig {
|
||||
working_dir: default_cache_path(),
|
||||
observers_working_dir: default_observers_cache_path(),
|
||||
},
|
||||
http_api: PredicatesApi::Off,
|
||||
snapshot: SnapshotConfig::Build,
|
||||
@@ -233,6 +242,7 @@ impl Config {
|
||||
Config {
|
||||
storage: StorageConfig {
|
||||
working_dir: default_cache_path(),
|
||||
observers_working_dir: default_observers_cache_path(),
|
||||
},
|
||||
http_api: PredicatesApi::Off,
|
||||
snapshot: SnapshotConfig::Download(SnapshotConfigDownloadUrls {
|
||||
@@ -272,3 +282,9 @@ pub fn default_cache_path() -> String {
|
||||
cache_path.push("ordhook");
|
||||
format!("{}", cache_path.display())
|
||||
}
|
||||
|
||||
pub fn default_observers_cache_path() -> String {
|
||||
let mut cache_path = std::env::current_dir().expect("unable to get current dir");
|
||||
cache_path.push("observers");
|
||||
format!("{}", cache_path.display())
|
||||
}
|
||||
|
||||
@@ -9,9 +9,8 @@ use crate::{
|
||||
};
|
||||
use chainhook_sdk::{
|
||||
types::{
|
||||
BitcoinBlockData, BitcoinTransactionData, BlockIdentifier, Brc20BalanceData,
|
||||
Brc20Operation, Brc20TokenDeployData, Brc20TransferData, OrdinalInscriptionRevealData,
|
||||
OrdinalOperation,
|
||||
BitcoinBlockData, BitcoinTransactionData, Brc20BalanceData, Brc20Operation,
|
||||
Brc20TokenDeployData, Brc20TransferData, OrdinalInscriptionRevealData, OrdinalOperation,
|
||||
},
|
||||
utils::Context,
|
||||
};
|
||||
|
||||
@@ -190,7 +190,9 @@ async fn validate_or_download_archive_file(
|
||||
|
||||
if should_download {
|
||||
try_info!(ctx, "Downloading {remote_archive_url}");
|
||||
match download_and_decompress_archive_file(remote_archive_url, file_name, &config, &ctx).await {
|
||||
match download_and_decompress_archive_file(remote_archive_url, file_name, &config, &ctx)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
try_error!(ctx, "{e}");
|
||||
|
||||
@@ -157,8 +157,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
Err(e) => return Err(format!("Scan aborted: {e}")),
|
||||
}
|
||||
{
|
||||
let observers_db_conn =
|
||||
open_readwrite_observers_db_conn_or_panic(&config.expected_cache_path(), &ctx);
|
||||
let observers_db_conn = open_readwrite_observers_db_conn_or_panic(&config, &ctx);
|
||||
update_observer_progress(
|
||||
&predicate_spec.uuid,
|
||||
current_block_height,
|
||||
|
||||
@@ -1,50 +1,178 @@
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
path::PathBuf,
|
||||
sync::{mpsc::Sender, Arc, Mutex},
|
||||
};
|
||||
|
||||
use chainhook_sdk::{
|
||||
chainhooks::types::{ChainhookFullSpecification, ChainhookSpecification},
|
||||
observer::ObserverCommand,
|
||||
chainhooks::types::{
|
||||
BitcoinChainhookSpecification, ChainhookFullSpecification, ChainhookSpecification,
|
||||
},
|
||||
observer::{ObserverCommand, ObserverEvent},
|
||||
utils::Context,
|
||||
};
|
||||
use rocket::config::{self, Config, LogLevel};
|
||||
use rocket::serde::json::{json, Json, Value as JsonValue};
|
||||
use rocket::State;
|
||||
use std::error::Error;
|
||||
use rocket::{
|
||||
config::{self, Config, LogLevel},
|
||||
Ignite, Rocket, Shutdown,
|
||||
};
|
||||
use rocket::{
|
||||
http::Status,
|
||||
response::status,
|
||||
serde::json::{json, Json, Value},
|
||||
};
|
||||
use rocket::{response::status::Custom, State};
|
||||
|
||||
use crate::try_info;
|
||||
use crate::{
|
||||
config::PredicatesApi,
|
||||
service::observers::{
|
||||
insert_entry_in_observers, open_readwrite_observers_db_conn, remove_entry_from_observers,
|
||||
update_observer_progress, update_observer_streaming_enabled,
|
||||
},
|
||||
try_error, try_info,
|
||||
};
|
||||
|
||||
use super::observers::{
|
||||
find_all_observers, find_observer_with_uuid, open_readonly_observers_db_conn, ObserverReport,
|
||||
};
|
||||
|
||||
pub async fn start_predicate_api_server(
|
||||
port: u16,
|
||||
observers_db_dir_path: PathBuf,
|
||||
observer_commands_tx: Sender<ObserverCommand>,
|
||||
ctx: Context,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
let log_level = LogLevel::Off;
|
||||
pub async fn start_observers_http_server(
|
||||
config: &crate::Config,
|
||||
observer_commands_tx: &std::sync::mpsc::Sender<ObserverCommand>,
|
||||
observer_event_rx: crossbeam_channel::Receiver<ObserverEvent>,
|
||||
bitcoin_scan_op_tx: crossbeam_channel::Sender<BitcoinChainhookSpecification>,
|
||||
ctx: &Context,
|
||||
) -> Result<Shutdown, String> {
|
||||
// Build and start HTTP server.
|
||||
let ignite = build_server(config, observer_commands_tx, ctx).await;
|
||||
let shutdown = ignite.shutdown();
|
||||
let _ = hiro_system_kit::thread_named("observers_api-server").spawn(move || {
|
||||
let _ = hiro_system_kit::nestable_block_on(ignite.launch());
|
||||
});
|
||||
|
||||
// Spawn predicate observer event tread.
|
||||
let moved_config = config.clone();
|
||||
let moved_ctx = ctx.clone();
|
||||
let _ = hiro_system_kit::thread_named("observers_api-events").spawn(move || loop {
|
||||
let event = match observer_event_rx.recv() {
|
||||
Ok(cmd) => cmd,
|
||||
Err(e) => {
|
||||
try_error!(&moved_ctx, "Error: broken channel {}", e.to_string());
|
||||
break;
|
||||
}
|
||||
};
|
||||
match event {
|
||||
ObserverEvent::PredicateRegistered(spec) => {
|
||||
let observers_db_conn =
|
||||
match open_readwrite_observers_db_conn(&moved_config, &moved_ctx) {
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
try_error!(
|
||||
&moved_ctx,
|
||||
"unable to register predicate: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let report = ObserverReport::default();
|
||||
insert_entry_in_observers(&spec, &report, &observers_db_conn, &moved_ctx);
|
||||
match spec {
|
||||
ChainhookSpecification::Bitcoin(predicate_spec) => {
|
||||
let _ = bitcoin_scan_op_tx.send(predicate_spec);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
ObserverEvent::PredicateEnabled(spec) => {
|
||||
let observers_db_conn =
|
||||
match open_readwrite_observers_db_conn(&moved_config, &moved_ctx) {
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
try_error!(&moved_ctx, "unable to enable observer: {}", e.to_string());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
update_observer_streaming_enabled(
|
||||
&spec.uuid(),
|
||||
true,
|
||||
&observers_db_conn,
|
||||
&moved_ctx,
|
||||
);
|
||||
}
|
||||
ObserverEvent::PredicateDeregistered(uuid) => {
|
||||
let observers_db_conn =
|
||||
match open_readwrite_observers_db_conn(&moved_config, &moved_ctx) {
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
try_error!(
|
||||
&moved_ctx,
|
||||
"unable to deregister observer: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
remove_entry_from_observers(&uuid, &observers_db_conn, &moved_ctx);
|
||||
}
|
||||
ObserverEvent::BitcoinPredicateTriggered(data) => {
|
||||
if let Some(ref tip) = data.apply.last() {
|
||||
let observers_db_conn =
|
||||
match open_readwrite_observers_db_conn(&moved_config, &moved_ctx) {
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
try_error!(
|
||||
&moved_ctx,
|
||||
"unable to update observer: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let last_block_height_update = tip.block.block_identifier.index;
|
||||
update_observer_progress(
|
||||
&data.chainhook.uuid,
|
||||
last_block_height_update,
|
||||
&observers_db_conn,
|
||||
&moved_ctx,
|
||||
)
|
||||
}
|
||||
}
|
||||
ObserverEvent::Terminate => {
|
||||
try_info!(&moved_ctx, "Terminating runloop");
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(shutdown)
|
||||
}
|
||||
|
||||
async fn build_server(
|
||||
config: &crate::Config,
|
||||
observer_command_tx: &std::sync::mpsc::Sender<ObserverCommand>,
|
||||
ctx: &Context,
|
||||
) -> Rocket<Ignite> {
|
||||
let PredicatesApi::On(ref api_config) = config.http_api else {
|
||||
unreachable!();
|
||||
};
|
||||
let moved_config = config.clone();
|
||||
let moved_ctx = ctx.clone();
|
||||
let moved_observer_commands_tx = observer_command_tx.clone();
|
||||
let mut shutdown_config = config::Shutdown::default();
|
||||
shutdown_config.ctrlc = false;
|
||||
shutdown_config.grace = 1;
|
||||
shutdown_config.mercy = 1;
|
||||
|
||||
let control_config = Config {
|
||||
port,
|
||||
port: api_config.http_port,
|
||||
workers: 1,
|
||||
address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||
keep_alive: 5,
|
||||
temp_dir: std::env::temp_dir().into(),
|
||||
log_level,
|
||||
log_level: LogLevel::Off,
|
||||
cli_colors: false,
|
||||
shutdown: shutdown_config,
|
||||
..Config::default()
|
||||
};
|
||||
|
||||
let routes = routes![
|
||||
handle_ping,
|
||||
handle_get_predicates,
|
||||
@@ -52,27 +180,21 @@ pub async fn start_predicate_api_server(
|
||||
handle_create_predicate,
|
||||
handle_delete_bitcoin_predicate,
|
||||
];
|
||||
|
||||
let background_job_tx_mutex = Arc::new(Mutex::new(observer_commands_tx.clone()));
|
||||
|
||||
let ctx_cloned = ctx.clone();
|
||||
let background_job_tx_mutex = Arc::new(Mutex::new(moved_observer_commands_tx));
|
||||
|
||||
let ignite = rocket::custom(control_config)
|
||||
.manage(background_job_tx_mutex)
|
||||
.manage(observers_db_dir_path)
|
||||
.manage(ctx_cloned)
|
||||
.manage(moved_config)
|
||||
.manage(moved_ctx.clone())
|
||||
.mount("/", routes)
|
||||
.ignite()
|
||||
.await?;
|
||||
|
||||
let _ = std::thread::spawn(move || {
|
||||
let _ = hiro_system_kit::nestable_block_on(ignite.launch());
|
||||
});
|
||||
Ok(())
|
||||
.await
|
||||
.expect("Unable to build observers API");
|
||||
ignite
|
||||
}
|
||||
|
||||
#[get("/ping")]
|
||||
fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
|
||||
fn handle_ping(ctx: &State<Context>) -> Json<Value> {
|
||||
try_info!(ctx, "Handling HTTP GET /ping");
|
||||
Json(json!({
|
||||
"status": 200,
|
||||
@@ -82,147 +204,203 @@ fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
|
||||
|
||||
#[get("/v1/observers", format = "application/json")]
|
||||
fn handle_get_predicates(
|
||||
observers_db_dir_path: &State<PathBuf>,
|
||||
config: &State<crate::Config>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
) -> Result<Json<Value>, Custom<Json<Value>>> {
|
||||
try_info!(ctx, "Handling HTTP GET /v1/observers");
|
||||
match open_readonly_observers_db_conn(observers_db_dir_path, ctx) {
|
||||
match open_readonly_observers_db_conn(config, ctx) {
|
||||
Ok(mut db_conn) => {
|
||||
let observers = find_all_observers(&mut db_conn, &ctx);
|
||||
let serialized_predicates = observers
|
||||
.iter()
|
||||
.map(|(p, s)| serialized_predicate_with_status(p, s))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Json(json!({
|
||||
Ok(Json(json!({
|
||||
"status": 200,
|
||||
"result": serialized_predicates
|
||||
}))
|
||||
})))
|
||||
}
|
||||
Err(e) => Json(json!({
|
||||
"status": 500,
|
||||
"message": e,
|
||||
})),
|
||||
Err(e) => Err(Custom(
|
||||
Status::InternalServerError,
|
||||
Json(json!({
|
||||
"status": 500,
|
||||
"message": e,
|
||||
})),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[post("/v1/observers", format = "application/json", data = "<predicate>")]
|
||||
fn handle_create_predicate(
|
||||
predicate: Json<ChainhookFullSpecification>,
|
||||
observers_db_dir_path: &State<PathBuf>,
|
||||
predicate: Json<Value>,
|
||||
config: &State<crate::Config>,
|
||||
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
) -> Result<Json<Value>, Custom<Json<Value>>> {
|
||||
try_info!(ctx, "Handling HTTP POST /v1/observers");
|
||||
let predicate = predicate.into_inner();
|
||||
if let Err(e) = predicate.validate() {
|
||||
return Json(json!({
|
||||
"status": 422,
|
||||
"error": e,
|
||||
}));
|
||||
}
|
||||
|
||||
let predicate_uuid = predicate.get_uuid().to_string();
|
||||
|
||||
if let Ok(mut predicates_db_conn) = open_readonly_observers_db_conn(observers_db_dir_path, ctx)
|
||||
{
|
||||
let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid));
|
||||
match find_observer_with_uuid(&key, &mut predicates_db_conn, &ctx) {
|
||||
Some(_) => {
|
||||
return Json(json!({
|
||||
"status": 409,
|
||||
"error": "Predicate uuid already in use",
|
||||
}))
|
||||
let predicate =
|
||||
match serde_json::from_value::<ChainhookFullSpecification>(predicate.into_inner()) {
|
||||
Ok(predicate) => predicate,
|
||||
Err(_) => {
|
||||
return Err(Custom(
|
||||
Status::UnprocessableEntity,
|
||||
Json(json!({
|
||||
"status": 422,
|
||||
"error": "Invalid predicate JSON",
|
||||
})),
|
||||
));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
};
|
||||
if let Err(e) = predicate.validate() {
|
||||
return Err(Custom(
|
||||
Status::UnprocessableEntity,
|
||||
Json(json!({
|
||||
"status": 422,
|
||||
"error": e,
|
||||
})),
|
||||
));
|
||||
}
|
||||
|
||||
let background_job_tx = background_job_tx.inner();
|
||||
match background_job_tx.lock() {
|
||||
let mut predicates_db_conn = match open_readonly_observers_db_conn(config, ctx) {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
return Err(Custom(
|
||||
Status::InternalServerError,
|
||||
Json(json!({
|
||||
"status": 500,
|
||||
"error": err.to_string(),
|
||||
})),
|
||||
));
|
||||
}
|
||||
};
|
||||
let predicate_uuid = predicate.get_uuid().to_string();
|
||||
if find_observer_with_uuid(&predicate_uuid, &mut predicates_db_conn, &ctx).is_some() {
|
||||
return Err(status::Custom(
|
||||
Status::Conflict,
|
||||
Json(json!({
|
||||
"status": 409,
|
||||
"error": "Predicate uuid already in use",
|
||||
})),
|
||||
));
|
||||
}
|
||||
match background_job_tx.inner().lock() {
|
||||
Ok(tx) => {
|
||||
let _ = tx.send(ObserverCommand::RegisterPredicate(predicate));
|
||||
}
|
||||
_ => {}
|
||||
Err(err) => {
|
||||
return Err(Custom(
|
||||
Status::InternalServerError,
|
||||
Json(json!({
|
||||
"status": 500,
|
||||
"error": err.to_string(),
|
||||
})),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
Json(json!({
|
||||
Ok(Json(json!({
|
||||
"status": 200,
|
||||
"result": predicate_uuid,
|
||||
}))
|
||||
})))
|
||||
}
|
||||
|
||||
#[get("/v1/observers/<predicate_uuid>", format = "application/json")]
|
||||
fn handle_get_predicate(
|
||||
predicate_uuid: String,
|
||||
observers_db_dir_path: &State<PathBuf>,
|
||||
config: &State<crate::Config>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
) -> Result<Json<Value>, Custom<Json<Value>>> {
|
||||
try_info!(ctx, "Handling HTTP GET /v1/observers/{}", predicate_uuid);
|
||||
match open_readonly_observers_db_conn(observers_db_dir_path, ctx) {
|
||||
match open_readonly_observers_db_conn(config, ctx) {
|
||||
Ok(mut predicates_db_conn) => {
|
||||
let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid));
|
||||
let entry = match find_observer_with_uuid(&key, &mut predicates_db_conn, &ctx) {
|
||||
Some((ChainhookSpecification::Bitcoin(spec), report)) => json!({
|
||||
"chain": "bitcoin",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": report,
|
||||
"enabled": spec.enabled,
|
||||
}),
|
||||
_ => {
|
||||
return Json(json!({
|
||||
"status": 404,
|
||||
}))
|
||||
}
|
||||
};
|
||||
Json(json!({
|
||||
let entry =
|
||||
match find_observer_with_uuid(&predicate_uuid, &mut predicates_db_conn, &ctx) {
|
||||
Some((ChainhookSpecification::Bitcoin(spec), report)) => json!({
|
||||
"chain": "bitcoin",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": report,
|
||||
"enabled": spec.enabled,
|
||||
}),
|
||||
_ => {
|
||||
return Err(Custom(
|
||||
Status::NotFound,
|
||||
Json(json!({
|
||||
"status": 404,
|
||||
})),
|
||||
))
|
||||
}
|
||||
};
|
||||
Ok(Json(json!({
|
||||
"status": 200,
|
||||
"result": entry
|
||||
}))
|
||||
})))
|
||||
}
|
||||
Err(e) => Json(json!({
|
||||
"status": 500,
|
||||
"message": e,
|
||||
})),
|
||||
Err(e) => Err(Custom(
|
||||
Status::InternalServerError,
|
||||
Json(json!({
|
||||
"status": 500,
|
||||
"message": e,
|
||||
})),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[delete("/v1/observers/<predicate_uuid>", format = "application/json")]
|
||||
fn handle_delete_bitcoin_predicate(
|
||||
predicate_uuid: String,
|
||||
config: &State<crate::Config>,
|
||||
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
) -> Result<Json<Value>, Custom<Json<Value>>> {
|
||||
try_info!(ctx, "Handling HTTP DELETE /v1/observers/{}", predicate_uuid);
|
||||
let background_job_tx = background_job_tx.inner();
|
||||
match background_job_tx.lock() {
|
||||
let mut predicates_db_conn = match open_readonly_observers_db_conn(config, ctx) {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
return Err(Custom(
|
||||
Status::InternalServerError,
|
||||
Json(json!({
|
||||
"status": 500,
|
||||
"error": err.to_string(),
|
||||
})),
|
||||
));
|
||||
}
|
||||
};
|
||||
if find_observer_with_uuid(&predicate_uuid, &mut predicates_db_conn, &ctx).is_none() {
|
||||
return Err(status::Custom(
|
||||
Status::NotFound,
|
||||
Json(json!({
|
||||
"status": 404,
|
||||
"error": "Predicate not found",
|
||||
})),
|
||||
));
|
||||
}
|
||||
match background_job_tx.inner().lock() {
|
||||
Ok(tx) => {
|
||||
let _ = tx.send(ObserverCommand::DeregisterBitcoinPredicate(predicate_uuid));
|
||||
}
|
||||
_ => {}
|
||||
Err(err) => {
|
||||
return Err(Custom(
|
||||
Status::InternalServerError,
|
||||
Json(json!({
|
||||
"status": 500,
|
||||
"error": err.to_string(),
|
||||
})),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
Json(json!({
|
||||
Ok(Json(json!({
|
||||
"status": 200,
|
||||
"result": "Ok",
|
||||
}))
|
||||
"result": "Predicate deleted",
|
||||
})))
|
||||
}
|
||||
|
||||
fn serialized_predicate_with_status(
|
||||
predicate: &ChainhookSpecification,
|
||||
report: &ObserverReport,
|
||||
) -> JsonValue {
|
||||
) -> Value {
|
||||
match (predicate, report) {
|
||||
(ChainhookSpecification::Stacks(spec), report) => json!({
|
||||
"chain": "stacks",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": report,
|
||||
"enabled": spec.enabled,
|
||||
}),
|
||||
(ChainhookSpecification::Stacks(_), _) => json!({}),
|
||||
(ChainhookSpecification::Bitcoin(spec), report) => json!({
|
||||
"chain": "bitcoin",
|
||||
"uuid": spec.uuid,
|
||||
@@ -233,3 +411,377 @@ fn serialized_predicate_with_status(
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
use chainhook_sdk::{
|
||||
chainhooks::types::{
|
||||
BitcoinChainhookSpecification, BitcoinPredicateType, ChainhookSpecification,
|
||||
HookAction, HttpHook, InscriptionFeedData, OrdinalOperations,
|
||||
},
|
||||
observer::ObserverEvent,
|
||||
types::BitcoinNetwork,
|
||||
utils::Context,
|
||||
};
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use reqwest::{Client, Response};
|
||||
use rocket::{form::validate::Len, Shutdown};
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use crate::{
|
||||
config::{Config, PredicatesApi, PredicatesApiConfig},
|
||||
service::observers::{delete_observers_db, initialize_observers_db},
|
||||
};
|
||||
|
||||
use super::start_observers_http_server;
|
||||
|
||||
async fn launch_server(observer_event_rx: Receiver<ObserverEvent>) -> Shutdown {
|
||||
let mut config = Config::devnet_default();
|
||||
config.http_api = PredicatesApi::On(PredicatesApiConfig {
|
||||
http_port: 20456,
|
||||
display_logs: true,
|
||||
});
|
||||
config.storage.observers_working_dir = "tmp".to_string();
|
||||
let ctx = Context::empty();
|
||||
delete_observers_db(&config);
|
||||
let _ = initialize_observers_db(&config, &ctx);
|
||||
let (bitcoin_scan_op_tx, _) = crossbeam_channel::unbounded();
|
||||
let (observer_command_tx, _) = channel();
|
||||
let shutdown = start_observers_http_server(
|
||||
&config,
|
||||
&observer_command_tx,
|
||||
observer_event_rx,
|
||||
bitcoin_scan_op_tx,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.expect("start failed");
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
shutdown
|
||||
}
|
||||
|
||||
fn shutdown_server(observer_event_tx: Sender<ObserverEvent>, shutdown: Shutdown) {
|
||||
let _ = observer_event_tx.send(ObserverEvent::Terminate);
|
||||
shutdown.notify();
|
||||
}
|
||||
|
||||
async fn register_predicate(
|
||||
client: &Client,
|
||||
observer_event_tx: &Sender<ObserverEvent>,
|
||||
) -> Response {
|
||||
let response = client
|
||||
.post("http://localhost:20456/v1/observers")
|
||||
.json(&json!({
|
||||
"uuid": "00000001-0001-0001-0001-000000000001",
|
||||
"name": "inscription_feed",
|
||||
"version": 1,
|
||||
"chain": "bitcoin",
|
||||
"networks": {
|
||||
"mainnet": {
|
||||
"start_block": 767430,
|
||||
"if_this": {
|
||||
"scope": "ordinals_protocol",
|
||||
"operation": "inscription_feed",
|
||||
},
|
||||
"then_that": {
|
||||
"http_post": {
|
||||
"url": "http://localhost:3700/payload",
|
||||
"authorization_header": "Bearer test"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
if response.status().is_success() {
|
||||
// Simulate predicate accepted by chainhook-sdk
|
||||
let spec = ChainhookSpecification::Bitcoin(BitcoinChainhookSpecification {
|
||||
uuid: "00000001-0001-0001-0001-000000000001".to_string(),
|
||||
owner_uuid: None,
|
||||
name: "inscription_feed".to_string(),
|
||||
network: BitcoinNetwork::Mainnet,
|
||||
version: 1,
|
||||
blocks: None,
|
||||
start_block: Some(767430),
|
||||
end_block: None,
|
||||
expire_after_occurrence: None,
|
||||
predicate: BitcoinPredicateType::OrdinalsProtocol(
|
||||
OrdinalOperations::InscriptionFeed(InscriptionFeedData {
|
||||
meta_protocols: None,
|
||||
}),
|
||||
),
|
||||
action: HookAction::HttpPost(HttpHook {
|
||||
url: "http://localhost:3700/payload".to_string(),
|
||||
authorization_header: "Bearer test".to_string(),
|
||||
}),
|
||||
include_proof: false,
|
||||
include_inputs: false,
|
||||
include_outputs: false,
|
||||
include_witness: false,
|
||||
enabled: true,
|
||||
expired_at: None,
|
||||
});
|
||||
let _ = observer_event_tx.send(ObserverEvent::PredicateRegistered(spec.clone()));
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
let _ = observer_event_tx.send(ObserverEvent::PredicateEnabled(spec));
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
}
|
||||
response
|
||||
}
|
||||
|
||||
async fn delete_predicate(
|
||||
client: &Client,
|
||||
observer_event_tx: &Sender<ObserverEvent>,
|
||||
) -> Response {
|
||||
let response = client
|
||||
.delete("http://localhost:20456/v1/observers/00000001-0001-0001-0001-000000000001")
|
||||
.header("content-type", "application/json")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
if response.status().is_success() {
|
||||
// Simulate predicate deregistered by chainhook-sdk
|
||||
let _ = observer_event_tx.send(ObserverEvent::PredicateDeregistered(
|
||||
"00000001-0001-0001-0001-000000000001".to_string(),
|
||||
));
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
}
|
||||
response
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lists_empty_predicates() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let response = client
|
||||
.get("http://localhost:20456/v1/observers")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(response.status().is_success());
|
||||
let json: Value = response.json().await.unwrap();
|
||||
assert_eq!(json["status"], 200);
|
||||
assert_eq!(json["result"].as_array().len(), 0);
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_arbitrary_json() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let response = client
|
||||
.post("http://localhost:20456/v1/observers")
|
||||
.json(&json!({
|
||||
"id": 1,
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.status(), reqwest::StatusCode::UNPROCESSABLE_ENTITY);
|
||||
let json: Value = response.json().await.unwrap();
|
||||
assert_eq!(json["status"], 422);
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_invalid_predicate() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let response = client
|
||||
.post("http://localhost:20456/v1/observers")
|
||||
.json(&json!({
|
||||
"uuid": "00000001-0001-0001-0001-000000000001",
|
||||
"name": "inscription_feed",
|
||||
"version": 1,
|
||||
"chain": "bitcoin",
|
||||
"networks": {
|
||||
"mainnet": {
|
||||
"start_block": 767430,
|
||||
"end_block": 200, // Invalid
|
||||
"if_this": {
|
||||
"scope": "ordinals_protocol",
|
||||
"operation": "inscription_feed",
|
||||
},
|
||||
"then_that": {
|
||||
"http_post": {
|
||||
"url": "http://localhost:3700/payload",
|
||||
"authorization_header": "Bearer test"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.status(), reqwest::StatusCode::UNPROCESSABLE_ENTITY);
|
||||
let json: Value = response.json().await.unwrap();
|
||||
assert_eq!(json["status"], 422);
|
||||
assert_eq!(
|
||||
json["error"],
|
||||
"Chainhook specification field `end_block` should be greater than `start_block`."
|
||||
);
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn accepts_valid_predicate() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let response = register_predicate(&client, &observer_event_tx).await;
|
||||
assert_eq!(response.status(), reqwest::StatusCode::OK);
|
||||
let json: Value = response.json().await.unwrap();
|
||||
assert_eq!(json["status"], 200);
|
||||
assert_eq!(json["result"], "00000001-0001-0001-0001-000000000001");
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lists_predicate() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let _ = register_predicate(&client, &observer_event_tx).await;
|
||||
let response = client
|
||||
.get("http://localhost:20456/v1/observers")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(response.status().is_success());
|
||||
let json2: Value = response.json().await.unwrap();
|
||||
assert_eq!(json2["status"], 200);
|
||||
let results = json2["result"].as_array().unwrap();
|
||||
assert_eq!(results.len(), 1);
|
||||
assert_eq!(results[0]["uuid"], "00000001-0001-0001-0001-000000000001");
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shows_predicate() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let _ = register_predicate(&client, &observer_event_tx).await;
|
||||
let response = client
|
||||
.get("http://localhost:20456/v1/observers/00000001-0001-0001-0001-000000000001")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(response.status().is_success());
|
||||
let json: Value = response.json().await.unwrap();
|
||||
assert_eq!(json["status"], 200);
|
||||
assert_eq!(
|
||||
json["result"]["uuid"],
|
||||
"00000001-0001-0001-0001-000000000001"
|
||||
);
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_duplicate_predicate_uuid() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let _ = register_predicate(&client, &observer_event_tx).await;
|
||||
let response = register_predicate(&client, &observer_event_tx).await;
|
||||
assert_eq!(response.status(), reqwest::StatusCode::CONFLICT);
|
||||
let json: Value = response.json().await.unwrap();
|
||||
assert_eq!(json["status"], 409);
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deletes_registered_predicate() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let _ = register_predicate(&client, &observer_event_tx).await;
|
||||
let response = delete_predicate(&client, &observer_event_tx).await;
|
||||
assert_eq!(response.status(), reqwest::StatusCode::OK);
|
||||
let json: Value = response.json().await.unwrap();
|
||||
assert_eq!(json["status"], 200);
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unlists_deleted_predicate() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let _ = register_predicate(&client, &observer_event_tx).await;
|
||||
let _ = delete_predicate(&client, &observer_event_tx).await;
|
||||
let response = client
|
||||
.get("http://localhost:20456/v1/observers")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(response.status().is_success());
|
||||
let json: Value = response.json().await.unwrap();
|
||||
assert_eq!(json["result"].as_array().len(), 0);
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unshows_deleted_predicate() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let _ = register_predicate(&client, &observer_event_tx).await;
|
||||
let _ = delete_predicate(&client, &observer_event_tx).await;
|
||||
let response = client
|
||||
.get("http://localhost:20456/v1/observers/00000001-0001-0001-0001-000000000001")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.status(), reqwest::StatusCode::NOT_FOUND);
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_non_existing_predicate_delete() {
|
||||
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
|
||||
let shutdown = launch_server(observer_event_rx).await;
|
||||
|
||||
let client = Client::new();
|
||||
let response = delete_predicate(&client, &observer_event_tx).await;
|
||||
assert_eq!(response.status(), reqwest::StatusCode::NOT_FOUND);
|
||||
let json: Value = response.json().await.unwrap();
|
||||
assert_eq!(json["status"], 404);
|
||||
|
||||
shutdown_server(observer_event_tx, shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn accepts_ping() {
|
||||
//
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,12 +29,7 @@ use crate::db::{
|
||||
};
|
||||
use crate::db::{find_missing_blocks, run_compaction, update_sequence_metadata_with_block};
|
||||
use crate::scan::bitcoin::process_block_with_predicates;
|
||||
use crate::service::http_api::start_predicate_api_server;
|
||||
use crate::service::observers::{
|
||||
create_and_consolidate_chainhook_config_with_predicates, insert_entry_in_observers,
|
||||
open_readwrite_observers_db_conn, remove_entry_from_observers, update_observer_progress,
|
||||
update_observer_streaming_enabled, ObserverReport,
|
||||
};
|
||||
use crate::service::observers::create_and_consolidate_chainhook_config_with_predicates;
|
||||
use crate::service::runloops::start_bitcoin_scan_runloop;
|
||||
use crate::{try_debug, try_error, try_info};
|
||||
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookOccurrencePayload;
|
||||
@@ -55,6 +50,7 @@ use crossbeam_channel::unbounded;
|
||||
use crossbeam_channel::{select, Sender};
|
||||
use dashmap::DashMap;
|
||||
use fxhash::FxHasher;
|
||||
use http_api::start_observers_http_server;
|
||||
use rusqlite::Transaction;
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
@@ -250,7 +246,7 @@ impl Service {
|
||||
&self,
|
||||
observer_command_tx: &std::sync::mpsc::Sender<ObserverCommand>,
|
||||
observer_event_rx: crossbeam_channel::Receiver<ObserverEvent>,
|
||||
predicate_activity_relayer: Option<
|
||||
_predicate_activity_relayer: Option<
|
||||
crossbeam_channel::Sender<BitcoinChainhookOccurrencePayload>,
|
||||
>,
|
||||
) -> Result<(), String> {
|
||||
@@ -269,143 +265,21 @@ impl Service {
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
if let PredicatesApi::On(ref api_config) = self.config.http_api {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Listening on port {} for chainhook predicate registrations", api_config.http_port
|
||||
);
|
||||
let ctx = self.ctx.clone();
|
||||
let api_config = api_config.clone();
|
||||
let moved_observer_command_tx = observer_command_tx.clone();
|
||||
let db_dir_path = self.config.expected_cache_path();
|
||||
// Test and initialize a database connection
|
||||
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
|
||||
let future = start_predicate_api_server(
|
||||
api_config.http_port,
|
||||
db_dir_path,
|
||||
moved_observer_command_tx,
|
||||
ctx,
|
||||
);
|
||||
let _ = hiro_system_kit::nestable_block_on(future);
|
||||
if let PredicatesApi::On(_) = self.config.http_api {
|
||||
let moved_config = self.config.clone();
|
||||
let moved_ctx = self.ctx.clone();
|
||||
let moved_observer_commands_tx = observer_command_tx.clone();
|
||||
let _ = hiro_system_kit::thread_named("HTTP Observers API").spawn(move || {
|
||||
let _ = hiro_system_kit::nestable_block_on(start_observers_http_server(
|
||||
&moved_config,
|
||||
&moved_observer_commands_tx,
|
||||
observer_event_rx,
|
||||
bitcoin_scan_op_tx,
|
||||
&moved_ctx,
|
||||
));
|
||||
});
|
||||
}
|
||||
|
||||
loop {
|
||||
let event = match observer_event_rx.recv() {
|
||||
Ok(cmd) => cmd,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"Error: broken channel {}",
|
||||
e.to_string()
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
match event {
|
||||
ObserverEvent::PredicateRegistered(spec) => {
|
||||
// ?? That seems weird?
|
||||
// If start block specified, use it.
|
||||
// If no start block specified, depending on the nature the hook, we'd like to retrieve:
|
||||
// - contract-id
|
||||
let observers_db_conn = match open_readwrite_observers_db_conn(
|
||||
&self.config.expected_cache_path(),
|
||||
&self.ctx,
|
||||
) {
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"unable to register predicate: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let report = ObserverReport::default();
|
||||
insert_entry_in_observers(&spec, &report, &observers_db_conn, &self.ctx);
|
||||
match spec {
|
||||
ChainhookSpecification::Stacks(_predicate_spec) => {}
|
||||
ChainhookSpecification::Bitcoin(predicate_spec) => {
|
||||
let _ = bitcoin_scan_op_tx.send(predicate_spec);
|
||||
}
|
||||
}
|
||||
}
|
||||
ObserverEvent::PredicateEnabled(spec) => {
|
||||
let observers_db_conn = match open_readwrite_observers_db_conn(
|
||||
&self.config.expected_cache_path(),
|
||||
&self.ctx,
|
||||
) {
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"unable to enable observer: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
update_observer_streaming_enabled(
|
||||
&spec.uuid(),
|
||||
true,
|
||||
&observers_db_conn,
|
||||
&self.ctx,
|
||||
);
|
||||
}
|
||||
ObserverEvent::PredicateDeregistered(uuid) => {
|
||||
let observers_db_conn = match open_readwrite_observers_db_conn(
|
||||
&self.config.expected_cache_path(),
|
||||
&self.ctx,
|
||||
) {
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"unable to deregister observer: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
remove_entry_from_observers(&uuid, &observers_db_conn, &self.ctx);
|
||||
}
|
||||
ObserverEvent::BitcoinPredicateTriggered(data) => {
|
||||
if let Some(ref tip) = data.apply.last() {
|
||||
let observers_db_conn = match open_readwrite_observers_db_conn(
|
||||
&self.config.expected_cache_path(),
|
||||
&self.ctx,
|
||||
) {
|
||||
Ok(con) => con,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"unable to update observer: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let last_block_height_update = tip.block.block_identifier.index;
|
||||
update_observer_progress(
|
||||
&data.chainhook.uuid,
|
||||
last_block_height_update,
|
||||
&observers_db_conn,
|
||||
&self.ctx,
|
||||
)
|
||||
}
|
||||
if let Some(ref tx) = predicate_activity_relayer {
|
||||
let _ = tx.send(data);
|
||||
}
|
||||
}
|
||||
ObserverEvent::Terminate => {
|
||||
info!(self.ctx.expect_logger(), "Terminating runloop");
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -73,32 +73,32 @@ pub fn insert_entry_in_observers(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_default_observers_db_file_path(base_dir: &PathBuf) -> PathBuf {
|
||||
let mut destination_path = base_dir.clone();
|
||||
fn get_default_observers_db_file_path(config: &Config) -> PathBuf {
|
||||
let mut destination_path = config.expected_observers_cache_path().clone();
|
||||
destination_path.push("observers.sqlite");
|
||||
destination_path
|
||||
}
|
||||
|
||||
pub fn open_readonly_observers_db_conn(
|
||||
base_dir: &PathBuf,
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
) -> Result<Connection, String> {
|
||||
let db_path = get_default_observers_db_file_path(&base_dir);
|
||||
let db_path = get_default_observers_db_file_path(config);
|
||||
let conn = open_existing_readonly_db(&db_path, ctx);
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
pub fn open_readwrite_observers_db_conn(
|
||||
base_dir: &PathBuf,
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
) -> Result<Connection, String> {
|
||||
let db_path = get_default_observers_db_file_path(&base_dir);
|
||||
let db_path = get_default_observers_db_file_path(config);
|
||||
let conn = create_or_open_readwrite_db(Some(&db_path), ctx);
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
pub fn open_readwrite_observers_db_conn_or_panic(base_dir: &PathBuf, ctx: &Context) -> Connection {
|
||||
let conn = match open_readwrite_observers_db_conn(base_dir, ctx) {
|
||||
pub fn open_readwrite_observers_db_conn_or_panic(config: &Config, ctx: &Context) -> Connection {
|
||||
let conn = match open_readwrite_observers_db_conn(config, ctx) {
|
||||
Ok(con) => con,
|
||||
Err(message) => {
|
||||
error!(ctx.expect_logger(), "Storage: {}", message.to_string());
|
||||
@@ -108,8 +108,8 @@ pub fn open_readwrite_observers_db_conn_or_panic(base_dir: &PathBuf, ctx: &Conte
|
||||
conn
|
||||
}
|
||||
|
||||
pub fn initialize_observers_db(base_dir: &PathBuf, ctx: &Context) -> Connection {
|
||||
let db_path = get_default_observers_db_file_path(&base_dir);
|
||||
pub fn initialize_observers_db(config: &Config, ctx: &Context) -> Connection {
|
||||
let db_path = get_default_observers_db_file_path(config);
|
||||
let conn = create_or_open_readwrite_db(Some(&db_path), ctx);
|
||||
// TODO: introduce initial output
|
||||
if let Err(e) = conn.execute(
|
||||
@@ -126,6 +126,12 @@ pub fn initialize_observers_db(base_dir: &PathBuf, ctx: &Context) -> Connection
|
||||
conn
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn delete_observers_db(config: &Config) {
|
||||
let path = get_default_observers_db_file_path(config);
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Serialize, Deserialize)]
|
||||
pub struct ObserverReport {
|
||||
pub streaming_enabled: bool,
|
||||
@@ -263,7 +269,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
|
||||
));
|
||||
}
|
||||
|
||||
let observers_db_conn = initialize_observers_db(&config.expected_cache_path(), ctx);
|
||||
let observers_db_conn = initialize_observers_db(config, ctx);
|
||||
|
||||
let mut observers_to_catchup = vec![];
|
||||
let mut observers_to_clean_up = vec![];
|
||||
|
||||
@@ -10,8 +10,8 @@ use threadpool::ThreadPool;
|
||||
use crate::{
|
||||
config::Config,
|
||||
scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate,
|
||||
service::{
|
||||
observers::open_readwrite_observers_db_conn_or_panic, update_observer_streaming_enabled,
|
||||
service::observers::{
|
||||
open_readwrite_observers_db_conn_or_panic, update_observer_streaming_enabled,
|
||||
},
|
||||
try_error,
|
||||
};
|
||||
@@ -28,7 +28,6 @@ pub fn start_bitcoin_scan_runloop(
|
||||
let moved_ctx = ctx.clone();
|
||||
let moved_config = config.clone();
|
||||
let observer_command_tx = observer_command_tx.clone();
|
||||
let db_base_dir = config.expected_cache_path();
|
||||
bitcoin_scan_pool.execute(move || {
|
||||
let op = scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
&predicate_spec,
|
||||
@@ -47,7 +46,7 @@ pub fn start_bitcoin_scan_runloop(
|
||||
|
||||
// Update predicate
|
||||
let mut observers_db_conn =
|
||||
open_readwrite_observers_db_conn_or_panic(&db_base_dir, &moved_ctx);
|
||||
open_readwrite_observers_db_conn_or_panic(&moved_config, &moved_ctx);
|
||||
update_observer_streaming_enabled(
|
||||
&predicate_spec.uuid,
|
||||
false,
|
||||
|
||||
Reference in New Issue
Block a user