feat: upgrade service start implementation + documentation

This commit is contained in:
Ludo Galabru
2023-04-04 11:14:02 -04:00
parent 6516155055
commit 02db65e417
6 changed files with 142 additions and 79 deletions

View File

@@ -604,7 +604,24 @@ Tbe first time this command run, a chainstate archive will be downloaded, uncomp
The subsequent scans will use the cached chainstate if already present, speeding up iterations and the overall feedback loop.
---
## Running `chainhook` in production mode
## Run `chainhook` as a service for streaming new blocks
To be documented.
`chainhook` can be ran as a background service for streaming and processing new canonical blocks appended to the Bitcoin and Stacks blockchains.
When running chainhook as a service, `if_this` / `then_that` predicates can be registered by passing the path of the `json` file in the command line:
```bash
$ chainhook service start --predicate-path=./path/to/predicate-1.json --predicate-path=./path/to/predicate-2.json --config-path=./path/to/config.toml
```
Predicates can also be added dynamically. When the `--predicate-path` option is not passed or when the `--start-http-api` option is passed, `chainhook` will instantiate a REST API allowing developers to list, add and removes preducates at runtime:
```bash
$ chainhook service start --config-path=./path/to/config.toml
```
```bash
$ chainhook service start --predicate-path=./path/to/predicate-1.json --start-http-api --config-path=./path/to/config.toml
```
A comprehensive OpenAPI spcification explaining how to interact with the Chainhook REST API can be found [here](./docs/chainhook-openapi.json).

View File

@@ -166,6 +166,12 @@ struct StartCommand {
conflicts_with = "devnet"
)]
pub config_path: Option<String>,
/// Specify relative path of the chainhooks (yaml format) to evaluate
#[clap(long = "predicate-path")]
pub predicates_paths: Vec<String>,
/// Start REST API for managing predicates
#[clap(long = "start-http-api")]
pub start_http_api: bool,
}
#[derive(Subcommand, PartialEq, Clone, Debug)]
@@ -321,10 +327,19 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
match opts.command {
Command::Service(subcmd) => match subcmd {
ServiceCommand::Start(cmd) => {
let config =
let mut config =
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
// We disable the API if a predicate was passed, and the --enable-
if cmd.predicates_paths.len() > 0 && !cmd.start_http_api {
config.chainhooks.enable_http_api = false;
}
let mut service = Service::new(config, ctx);
return service.run().await;
let predicates = cmd
.predicates_paths
.iter()
.map(|p| load_predicate_from_path(p))
.collect::<Result<Vec<ChainhookFullSpecification>, _>>()?;
return service.run(predicates).await;
}
},
Command::Config(subcmd) => match subcmd {
@@ -455,17 +470,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
PredicatesCommand::Scan(cmd) => {
let mut config =
Config::default(false, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
let file = std::fs::File::open(&cmd.predicate_path)
.map_err(|e| format!("unable to read file {}\n{:?}", cmd.predicate_path, e))?;
let mut file_reader = BufReader::new(file);
let mut file_buffer = vec![];
file_reader
.read_to_end(&mut file_buffer)
.map_err(|e| format!("unable to read file {}\n{:?}", cmd.predicate_path, e))?;
let predicate: ChainhookFullSpecification = serde_json::from_slice(&file_buffer)
.map_err(|e| {
format!("unable to parse json file {}\n{:?}", cmd.predicate_path, e)
})?;
let predicate = load_predicate_from_path(&cmd.predicate_path)?;
match predicate {
ChainhookFullSpecification::Bitcoin(predicate) => {
scan_bitcoin_chain_with_predicate(predicate, &config, &ctx).await?;
@@ -654,3 +659,18 @@ pub fn install_ctrlc_handler(terminate_tx: Sender<DigestingCommand>, ctx: Contex
})
.expect("Error setting Ctrl-C handler");
}
pub fn load_predicate_from_path(
predicate_path: &str,
) -> Result<ChainhookFullSpecification, String> {
let file = std::fs::File::open(&predicate_path)
.map_err(|e| format!("unable to read file {}\n{:?}", predicate_path, e))?;
let mut file_reader = BufReader::new(file);
let mut file_buffer = vec![];
file_reader
.read_to_end(&mut file_buffer)
.map_err(|e| format!("unable to read file {}\n{:?}", predicate_path, e))?;
let predicate: ChainhookFullSpecification = serde_json::from_slice(&file_buffer)
.map_err(|e| format!("unable to parse json file {}\n{:?}", predicate_path, e))?;
Ok(predicate)
}

View File

@@ -68,6 +68,7 @@ pub struct TsvUrlConfig {
pub struct ChainhooksConfig {
pub max_stacks_registrations: u16,
pub max_bitcoin_registrations: u16,
pub enable_http_api: bool,
}
impl Config {
@@ -97,6 +98,7 @@ impl Config {
chainhook_config: None,
ingestion_port: DEFAULT_INGESTION_PORT,
control_port: DEFAULT_CONTROL_PORT,
control_api_enabled: self.chainhooks.enable_http_api,
bitcoind_rpc_username: self.network.bitcoind_rpc_username.clone(),
bitcoind_rpc_password: self.network.bitcoind_rpc_password.clone(),
bitcoind_rpc_url: self.network.bitcoind_rpc_url.clone(),
@@ -149,6 +151,7 @@ impl Config {
.chainhooks
.max_bitcoin_registrations
.unwrap_or(100),
enable_http_api: true,
},
network: IndexerConfig {
stacks_node_rpc_url: config_file.network.stacks_node_rpc_url.to_string(),
@@ -273,6 +276,7 @@ impl Config {
chainhooks: ChainhooksConfig {
max_stacks_registrations: 50,
max_bitcoin_registrations: 50,
enable_http_api: true,
},
network: IndexerConfig {
stacks_node_rpc_url: "http://0.0.0.0:20443".into(),
@@ -302,6 +306,7 @@ impl Config {
chainhooks: ChainhooksConfig {
max_stacks_registrations: 10,
max_bitcoin_registrations: 10,
enable_http_api: true,
},
network: IndexerConfig {
stacks_node_rpc_url: "http://0.0.0.0:20443".into(),
@@ -331,6 +336,7 @@ impl Config {
chainhooks: ChainhooksConfig {
max_stacks_registrations: 10,
max_bitcoin_registrations: 10,
enable_http_api: true,
},
network: IndexerConfig {
stacks_node_rpc_url: "http://0.0.0.0:20443".into(),

View File

@@ -39,70 +39,39 @@ impl Service {
Self { config, ctx }
}
pub async fn run(&mut self) -> Result<(), String> {
pub async fn run(
&mut self,
mut predicates: Vec<ChainhookFullSpecification>,
) -> Result<(), String> {
let mut chainhook_config = ChainhookConfig::new();
{
let redis_config = self.config.expected_redis_config();
let client = redis::Client::open(redis_config.uri.clone()).unwrap();
let mut redis_con = match client.get_connection() {
Ok(con) => con,
Err(message) => {
if predicates.is_empty() {
let mut registered_predicates = load_predicates_from_redis(&self.config, &self.ctx)?;
predicates.append(&mut registered_predicates);
}
for predicate in predicates.into_iter() {
match chainhook_config.register_hook(
(
&self.config.network.bitcoin_network,
&self.config.network.stacks_network,
),
predicate,
&ApiKey(None),
) {
Ok(spec) => {
info!(
self.ctx.expect_logger(),
"Predicate {} retrieved from storage and loaded",
spec.uuid(),
);
}
Err(e) => {
error!(
self.ctx.expect_logger(),
"Unable to connect to redis server: {}",
message.to_string()
"Failed loading predicate from storage: {}",
e.to_string()
);
std::thread::sleep(std::time::Duration::from_secs(1));
std::process::exit(1);
}
};
let chainhooks_to_load: Vec<String> = redis_con
.scan_match("chainhook:*:*:*")
.expect("unable to retrieve prunable entries")
.into_iter()
.collect();
for key in chainhooks_to_load.iter() {
let chainhook = match redis_con.hget::<_, _, String>(key, "specification") {
Ok(spec) => {
ChainhookFullSpecification::deserialize_specification(&spec, key).unwrap()
// todo
}
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to load chainhook associated with key {}: {}",
key,
e.to_string()
);
continue;
}
};
match chainhook_config.register_hook(
(
&self.config.network.bitcoin_network,
&self.config.network.stacks_network,
),
chainhook,
&ApiKey(None),
) {
Ok(spec) => {
info!(
self.ctx.expect_logger(),
"Predicate {} retrieved from storage and loaded",
spec.uuid(),
);
}
Err(e) => {
error!(
self.ctx.expect_logger(),
"Failed loading predicate from storage: {}",
e.to_string()
);
}
}
}
}
@@ -133,11 +102,13 @@ impl Service {
}
}
info!(
self.ctx.expect_logger(),
"Listening for chainhook predicate registrations on port {}",
event_observer_config.control_port
);
if self.config.chainhooks.enable_http_api {
info!(
self.ctx.expect_logger(),
"Listening for chainhook predicate registrations on port {}",
event_observer_config.control_port
);
}
// let ordinal_index = match initialize_ordinal_index(&event_observer_config, None, &self.ctx)
// {
@@ -618,3 +589,50 @@ fn update_storage_with_confirmed_stacks_blocks(
redis_con.set(&format!("stx:tip"), block.block_identifier.index);
}
}
fn load_predicates_from_redis(
config: &Config,
ctx: &Context,
) -> Result<Vec<ChainhookFullSpecification>, String> {
let redis_config = config.expected_redis_config();
let client = redis::Client::open(redis_config.uri.clone()).unwrap();
let mut redis_con = match client.get_connection() {
Ok(con) => con,
Err(message) => {
error!(
ctx.expect_logger(),
"Unable to connect to redis server: {}",
message.to_string()
);
std::thread::sleep(std::time::Duration::from_secs(1));
std::process::exit(1);
}
};
let chainhooks_to_load: Vec<String> = redis_con
.scan_match("chainhook:*:*:*")
.expect("unable to retrieve prunable entries")
.into_iter()
.collect();
let mut predicates = vec![];
for key in chainhooks_to_load.iter() {
let chainhook = match redis_con.hget::<_, _, String>(key, "specification") {
Ok(spec) => {
ChainhookFullSpecification::deserialize_specification(&spec, key).unwrap()
// todo
}
Err(e) => {
error!(
ctx.expect_logger(),
"unable to load chainhook associated with key {}: {}",
key,
e.to_string()
);
continue;
}
};
predicates.push(chainhook);
}
Ok(predicates)
}

View File

@@ -131,6 +131,7 @@ pub struct EventObserverConfig {
pub event_handlers: Vec<EventHandler>,
pub ingestion_port: u16,
pub control_port: u16,
pub control_api_enabled: bool,
pub bitcoind_rpc_username: String,
pub bitcoind_rpc_password: String,
pub bitcoind_rpc_url: String,

View File

@@ -33,6 +33,7 @@ fn generate_test_config() -> (EventObserverConfig, ChainhookStore) {
event_handlers: vec![],
ingestion_port: 0,
control_port: 0,
control_api_enabled: false,
bitcoind_rpc_username: "user".into(),
bitcoind_rpc_password: "user".into(),
bitcoind_rpc_url: "http://localhost:18443".into(),