fix: disable steam scan when scanning past blocks

This commit is contained in:
Ludo Galabru
2023-04-13 22:20:03 -04:00
parent f18bc00f5a
commit e2949d213a
8 changed files with 164 additions and 70 deletions

View File

@@ -540,7 +540,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
};
scan_bitcoin_chainstate_via_http_using_predicate(
predicate_spec,
&predicate_spec,
&config,
&ctx,
)
@@ -560,7 +560,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
};
scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec,
&predicate_spec,
&mut config,
&ctx,
)

View File

@@ -27,7 +27,7 @@ use chainhook_types::{BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData};
use std::collections::{BTreeMap, HashMap};
pub async fn scan_bitcoin_chainstate_via_http_using_predicate(
predicate_spec: BitcoinChainhookSpecification,
predicate_spec: &BitcoinChainhookSpecification,
config: &Config,
ctx: &Context,
) -> Result<(), String> {
@@ -149,7 +149,7 @@ pub async fn scan_bitcoin_chainstate_via_http_using_predicate(
// Only consider inscriptions in the interval specified
let local_traverals = match inscriptions_cache.remove(&cursor) {
Some(entry) => entry,
None => continue
None => continue,
};
for (transaction_identifier, traversal_result) in local_traverals.into_iter() {
traversals.insert(transaction_identifier, traversal_result);

View File

@@ -23,7 +23,7 @@ use chainhook_event_observer::{
use chainhook_types::BlockIdentifier;
pub async fn scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec: StacksChainhookSpecification,
predicate_spec: &StacksChainhookSpecification,
config: &mut Config,
ctx: &Context,
) -> Result<BlockIdentifier, String> {

View File

@@ -5,7 +5,9 @@ use crate::scan::stacks::scan_stacks_chainstate_via_csv_using_predicate;
use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
use chainhook_event_observer::chainhooks::types::ChainhookSpecification;
use chainhook_event_observer::observer::{start_event_observer, ApiKey, ObserverEvent};
use chainhook_event_observer::observer::{
start_event_observer, ApiKey, ObserverCommand, ObserverEvent,
};
use chainhook_event_observer::utils::Context;
use chainhook_types::{BitcoinBlockSignaling, StacksBlockData, StacksChainEvent};
use redis::{Commands, Connection};
@@ -115,10 +117,11 @@ impl Service {
let context_cloned = self.ctx.clone();
let event_observer_config_moved = event_observer_config.clone();
let observer_command_tx_moved = observer_command_tx.clone();
let _ = std::thread::spawn(move || {
let future = start_event_observer(
event_observer_config_moved,
observer_command_tx,
observer_command_tx_moved,
observer_command_rx,
Some(observer_event_tx),
context_cloned,
@@ -131,14 +134,16 @@ impl Service {
let stacks_scan_pool = ThreadPool::new(STACKS_SCAN_THREAD_POOL_SIZE);
let ctx = self.ctx.clone();
let config = self.config.clone();
let observer_command_tx_moved = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("Stacks scan runloop")
.spawn(move || {
while let Ok(predicate_spec) = stacks_scan_op_rx.recv() {
while let Ok((predicate_spec, api_key)) = stacks_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let mut moved_config = config.clone();
let observer_command_tx = observer_command_tx_moved.clone();
stacks_scan_pool.execute(move || {
let op = scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec,
&predicate_spec,
&mut moved_config,
&moved_ctx,
);
@@ -156,6 +161,10 @@ impl Service {
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", end_block.index
);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
api_key,
));
});
}
let res = stacks_scan_pool.join();
@@ -168,14 +177,16 @@ impl Service {
let bitcoin_scan_pool = ThreadPool::new(BITCOIN_SCAN_THREAD_POOL_SIZE);
let ctx = self.ctx.clone();
let config = self.config.clone();
let moved_observer_command_tx = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("Bitcoin scan runloop")
.spawn(move || {
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
while let Ok((predicate_spec, api_key)) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = moved_observer_command_tx.clone();
bitcoin_scan_pool.execute(move || {
let op = scan_bitcoin_chainstate_via_http_using_predicate(
predicate_spec,
&predicate_spec,
&moved_config,
&moved_ctx,
);
@@ -189,6 +200,10 @@ impl Service {
);
}
};
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Bitcoin(predicate_spec),
api_key,
));
});
}
let res = bitcoin_scan_pool.join();
@@ -219,7 +234,7 @@ impl Service {
}
};
match event {
ObserverEvent::HookRegistered(chainhook) => {
ObserverEvent::HookRegistered(chainhook, api_key) => {
// If start block specified, use it.
// I no start block specified, depending on the nature the hook, we'd like to retrieve:
// - contract-id
@@ -243,10 +258,10 @@ impl Service {
}
match chainhook {
ChainhookSpecification::Stacks(predicate_spec) => {
let _ = stacks_scan_op_tx.send(predicate_spec);
let _ = stacks_scan_op_tx.send((predicate_spec, api_key));
}
ChainhookSpecification::Bitcoin(predicate_spec) => {
let _ = bitcoin_scan_op_tx.send(predicate_spec);
let _ = bitcoin_scan_op_tx.send((predicate_spec, api_key));
}
}
}

View File

@@ -11,7 +11,7 @@ use schemars::JsonSchema;
use crate::observer::ApiKey;
#[derive(Clone, Debug, JsonSchema)]
#[derive(Clone, Debug)]
pub struct ChainhookConfig {
pub stacks_chainhooks: Vec<StacksChainhookSpecification>,
pub bitcoin_chainhooks: Vec<BitcoinChainhookSpecification>,
@@ -68,6 +68,30 @@ impl ChainhookConfig {
Ok(spec)
}
pub fn enable_specification(
&mut self,
predicate_spec: &ChainhookSpecification,
) {
match predicate_spec {
ChainhookSpecification::Stacks(spec_to_enable) => {
for spec in self.stacks_chainhooks.iter_mut() {
if spec.uuid.eq(&spec_to_enable.uuid) {
spec.enabled = true;
break;
}
}
}
ChainhookSpecification::Bitcoin(spec_to_enable) => {
for spec in self.bitcoin_chainhooks.iter_mut() {
if spec.uuid.eq(&spec_to_enable.uuid) {
spec.enabled = true;
break;
}
}
}
};
}
pub fn register_specification(&mut self, spec: ChainhookSpecification) -> Result<(), String> {
match spec {
ChainhookSpecification::Stacks(spec) => {
@@ -131,7 +155,7 @@ impl Serialize for ChainhookConfig {
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum ChainhookSpecification {
Bitcoin(BitcoinChainhookSpecification),
@@ -201,7 +225,7 @@ impl ChainhookSpecification {
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct BitcoinChainhookSpecification {
pub uuid: String,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -221,6 +245,7 @@ pub struct BitcoinChainhookSpecification {
pub include_inputs: bool,
pub include_outputs: bool,
pub include_witness: bool,
pub enabled: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
@@ -291,6 +316,7 @@ impl BitcoinChainhookFullSpecification {
include_inputs: spec.include_inputs.unwrap_or(false),
include_outputs: spec.include_outputs.unwrap_or(false),
include_witness: spec.include_witness.unwrap_or(false),
enabled: false,
})
}
}
@@ -349,6 +375,7 @@ impl StacksChainhookFullSpecification {
expire_after_occurrence: spec.expire_after_occurrence,
predicate: spec.predicate,
action: spec.action,
enabled: false,
})
}
}
@@ -645,7 +672,7 @@ pub enum BlockIdentifierHashRule {
BuildsOff(String),
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, JsonSchema)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct StacksChainhookSpecification {
pub uuid: String,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -666,6 +693,7 @@ pub struct StacksChainhookSpecification {
#[serde(rename = "predicate")]
pub predicate: StacksPredicate,
pub action: HookAction,
pub enabled: bool,
}
impl StacksChainhookSpecification {

View File

@@ -198,7 +198,11 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
Some(traversal) => traversal,
None => {
ctx.try_log(|logger| {
slog::info!(logger, "Unable to retrieve cached inscription data for inscription {}", new_tx.transaction_identifier.hash);
slog::info!(
logger,
"Unable to retrieve cached inscription data for inscription {}",
new_tx.transaction_identifier.hash
);
});
ordinals_events_indexes_to_discard.push_front(ordinal_event_index);
continue;

View File

@@ -176,9 +176,10 @@ pub enum ObserverCommand {
PropagateBitcoinChainEvent(BlockchainEvent),
PropagateStacksChainEvent(StacksChainEvent),
PropagateStacksMempoolEvent(StacksChainMempoolEvent),
RegisterHook(ChainhookFullSpecification, ApiKey),
DeregisterBitcoinHook(String, ApiKey),
DeregisterStacksHook(String, ApiKey),
RegisterPredicate(ChainhookFullSpecification, ApiKey),
EnablePredicate(ChainhookSpecification, ApiKey),
DeregisterBitcoinPredicate(String, ApiKey),
DeregisterStacksPredicate(String, ApiKey),
NotifyBitcoinTransactionProxied,
Terminate,
}
@@ -203,7 +204,7 @@ pub enum ObserverEvent {
BitcoinChainEvent(BitcoinChainEvent),
StacksChainEvent(StacksChainEvent),
NotifyBitcoinTransactionProxied,
HookRegistered(ChainhookSpecification),
HookRegistered(ChainhookSpecification, ApiKey),
HookDeregistered(ChainhookSpecification),
BitcoinChainhookTriggered(BitcoinChainhookOccurrencePayload),
StacksChainhookTriggered(StacksChainhookOccurrencePayload),
@@ -895,6 +896,7 @@ pub async fn start_observer_commands_handler(
.values()
.map(|v| &v.bitcoin_chainhooks)
.flatten()
.filter(|p| p.enabled)
.collect::<Vec<_>>();
ctx.try_log(|logger| {
slog::info!(
@@ -1053,6 +1055,7 @@ pub async fn start_observer_commands_handler(
.values()
.map(|v| &v.stacks_chainhooks)
.flatten()
.filter(|p| p.enabled)
.collect();
// process hooks
@@ -1181,13 +1184,13 @@ pub async fn start_observer_commands_handler(
let _ = tx.send(ObserverEvent::NotifyBitcoinTransactionProxied);
}
}
ObserverCommand::RegisterHook(hook, api_key) => match chainhook_store.write() {
ObserverCommand::RegisterPredicate(hook, api_key) => match chainhook_store.write() {
Err(e) => {
ctx.try_log(|logger| slog::error!(logger, "unable to obtain lock {:?}", e));
continue;
}
Ok(mut chainhook_store_writer) => {
ctx.try_log(|logger| slog::info!(logger, "Handling RegisterHook command"));
ctx.try_log(|logger| slog::info!(logger, "Handling RegisterPredicate command"));
let hook_formation = match chainhook_store_writer.entries.get_mut(&api_key) {
Some(hook_formation) => hook_formation,
None => {
@@ -1227,45 +1230,71 @@ pub async fn start_observer_commands_handler(
)
});
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::HookRegistered(spec));
let _ = tx.send(ObserverEvent::HookRegistered(spec, api_key));
} else {
hook_formation.enable_specification(&spec);
}
}
},
ObserverCommand::DeregisterStacksHook(hook_uuid, api_key) => {
match chainhook_store.write() {
Err(e) => {
ctx.try_log(|logger| slog::error!(logger, "unable to obtain lock {:?}", e));
continue;
}
Ok(mut chainhook_store_writer) => {
ctx.try_log(|logger| {
slog::info!(logger, "Handling DeregisterStacksHook command")
});
let hook_formation = match chainhook_store_writer.entries.get_mut(&api_key)
{
Some(hook_formation) => hook_formation,
None => {
ctx.try_log(|logger| {
slog::error!(
logger,
"Unable to retrieve chainhooks associated with {:?}",
api_key
)
});
continue;
}
};
chainhooks_lookup.remove(&hook_uuid);
let hook = hook_formation.deregister_stacks_hook(hook_uuid);
if let (Some(tx), Some(hook)) = (&observer_events_tx, hook) {
let _ = tx.send(ObserverEvent::HookDeregistered(
ChainhookSpecification::Stacks(hook),
));
ObserverCommand::EnablePredicate(predicate_spec, api_key) => match chainhook_store
.write()
{
Err(e) => {
ctx.try_log(|logger| slog::error!(logger, "unable to obtain lock {:?}", e));
continue;
}
Ok(mut chainhook_store_writer) => {
ctx.try_log(|logger| slog::info!(logger, "Enabling Predicate"));
let hook_formation = match chainhook_store_writer.entries.get_mut(&api_key) {
Some(hook_formation) => hook_formation,
None => {
ctx.try_log(|logger| {
slog::error!(
logger,
"Unable to retrieve chainhooks associated with {:?}",
api_key
)
});
continue;
}
};
hook_formation.enable_specification(&predicate_spec);
}
},
ObserverCommand::DeregisterStacksPredicate(hook_uuid, api_key) => match chainhook_store
.write()
{
Err(e) => {
ctx.try_log(|logger| slog::error!(logger, "unable to obtain lock {:?}", e));
continue;
}
Ok(mut chainhook_store_writer) => {
ctx.try_log(|logger| {
slog::info!(logger, "Handling DeregisterStacksPredicate command")
});
let hook_formation = match chainhook_store_writer.entries.get_mut(&api_key) {
Some(hook_formation) => hook_formation,
None => {
ctx.try_log(|logger| {
slog::error!(
logger,
"Unable to retrieve chainhooks associated with {:?}",
api_key
)
});
continue;
}
};
chainhooks_lookup.remove(&hook_uuid);
let hook = hook_formation.deregister_stacks_hook(hook_uuid);
if let (Some(tx), Some(hook)) = (&observer_events_tx, hook) {
let _ = tx.send(ObserverEvent::HookDeregistered(
ChainhookSpecification::Stacks(hook),
));
}
}
}
ObserverCommand::DeregisterBitcoinHook(hook_uuid, api_key) => {
},
ObserverCommand::DeregisterBitcoinPredicate(hook_uuid, api_key) => {
match chainhook_store.write() {
Err(e) => {
ctx.try_log(|logger| slog::error!(logger, "unable to obtain lock {:?}", e));
@@ -1273,7 +1302,7 @@ pub async fn start_observer_commands_handler(
}
Ok(mut chainhook_store_writer) => {
ctx.try_log(|logger| {
slog::info!(logger, "Handling DeregisterBitcoinHook command")
slog::info!(logger, "Handling DeregisterBitcoinPredicate command")
});
let hook_formation = match chainhook_store_writer.entries.get_mut(&api_key)
{
@@ -1837,7 +1866,7 @@ pub fn handle_create_hook(
let background_job_tx = background_job_tx.inner();
match background_job_tx.lock() {
Ok(tx) => {
let _ = tx.send(ObserverCommand::RegisterHook(hook, api_key));
let _ = tx.send(ObserverCommand::RegisterPredicate(hook, api_key));
}
_ => {}
};
@@ -1861,7 +1890,9 @@ pub fn handle_delete_stacks_hook(
let background_job_tx = background_job_tx.inner();
match background_job_tx.lock() {
Ok(tx) => {
let _ = tx.send(ObserverCommand::DeregisterStacksHook(hook_uuid, api_key));
let _ = tx.send(ObserverCommand::DeregisterStacksPredicate(
hook_uuid, api_key,
));
}
_ => {}
};
@@ -1885,7 +1916,9 @@ pub fn handle_delete_bitcoin_hook(
let background_job_tx = background_job_tx.inner();
match background_job_tx.lock() {
Ok(tx) => {
let _ = tx.send(ObserverCommand::DeregisterBitcoinHook(hook_uuid, api_key));
let _ = tx.send(ObserverCommand::DeregisterBitcoinPredicate(
hook_uuid, api_key,
));
}
_ => {}
};
@@ -1899,6 +1932,12 @@ pub fn handle_delete_bitcoin_hook(
#[derive(Clone, Debug, PartialEq, Eq, Hash, OpenApiFromRequest)]
pub struct ApiKey(pub Option<String>);
impl ApiKey {
pub fn none() -> ApiKey {
ApiKey(None)
}
}
#[derive(Debug)]
pub enum ApiKeyError {
Missing,

View File

@@ -126,7 +126,7 @@ fn generate_and_register_new_stacks_chainhook(
) -> StacksChainhookSpecification {
let contract_identifier = format!("{}.{}", accounts::deployer_stx_address(), contract_name);
let chainhook = stacks_chainhook_contract_call(id, &contract_identifier, None, method);
let _ = observer_commands_tx.send(ObserverCommand::RegisterHook(
let _ = observer_commands_tx.send(ObserverCommand::RegisterPredicate(
ChainhookFullSpecification::Stacks(chainhook.clone()),
ApiKey(None),
));
@@ -134,7 +134,7 @@ fn generate_and_register_new_stacks_chainhook(
.into_selected_network_specification(&StacksNetwork::Devnet)
.unwrap();
assert!(match observer_events_rx.recv() {
Ok(ObserverEvent::HookRegistered(registered_chainhook)) => {
Ok(ObserverEvent::HookRegistered(registered_chainhook, ApiKey(None))) => {
assert_eq!(
ChainhookSpecification::Stacks(chainhook.clone()),
registered_chainhook
@@ -143,6 +143,10 @@ fn generate_and_register_new_stacks_chainhook(
}
_ => false,
});
let _ = observer_commands_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(chainhook.clone()),
ApiKey(None),
));
chainhook
}
@@ -154,7 +158,7 @@ fn generate_and_register_new_bitcoin_chainhook(
expire_after_occurrence: Option<u64>,
) -> BitcoinChainhookSpecification {
let chainhook = bitcoin_chainhook_p2pkh(id, &p2pkh_address, expire_after_occurrence);
let _ = observer_commands_tx.send(ObserverCommand::RegisterHook(
let _ = observer_commands_tx.send(ObserverCommand::RegisterPredicate(
ChainhookFullSpecification::Bitcoin(chainhook.clone()),
ApiKey(None),
));
@@ -162,7 +166,7 @@ fn generate_and_register_new_bitcoin_chainhook(
.into_selected_network_specification(&BitcoinNetwork::Regtest)
.unwrap();
assert!(match observer_events_rx.recv() {
Ok(ObserverEvent::HookRegistered(registered_chainhook)) => {
Ok(ObserverEvent::HookRegistered(registered_chainhook, ApiKey(None))) => {
assert_eq!(
ChainhookSpecification::Bitcoin(chainhook.clone()),
registered_chainhook
@@ -171,6 +175,10 @@ fn generate_and_register_new_bitcoin_chainhook(
}
_ => false,
});
let _ = observer_commands_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Bitcoin(chainhook.clone()),
ApiKey(None),
));
chainhook
}
@@ -331,7 +339,7 @@ fn test_stacks_chainhook_register_deregister() {
});
// Deregister the hook
let _ = observer_commands_tx.send(ObserverCommand::DeregisterStacksHook(
let _ = observer_commands_tx.send(ObserverCommand::DeregisterStacksPredicate(
chainhook.uuid.clone(),
ApiKey(None),
));
@@ -434,7 +442,7 @@ fn test_stacks_chainhook_auto_deregister() {
let contract_identifier = format!("{}.{}", accounts::deployer_stx_address(), "counter");
let chainhook = stacks_chainhook_contract_call(0, &contract_identifier, Some(1), "increment");
let _ = observer_commands_tx.send(ObserverCommand::RegisterHook(
let _ = observer_commands_tx.send(ObserverCommand::RegisterPredicate(
ChainhookFullSpecification::Stacks(chainhook.clone()),
ApiKey(None),
));
@@ -442,7 +450,7 @@ fn test_stacks_chainhook_auto_deregister() {
.into_selected_network_specification(&StacksNetwork::Devnet)
.unwrap();
assert!(match observer_events_rx.recv() {
Ok(ObserverEvent::HookRegistered(registered_chainhook)) => {
Ok(ObserverEvent::HookRegistered(registered_chainhook, ApiKey(None))) => {
assert_eq!(
ChainhookSpecification::Stacks(chainhook.clone()),
registered_chainhook
@@ -735,7 +743,7 @@ fn test_bitcoin_chainhook_register_deregister() {
});
// Deregister the hook
let _ = observer_commands_tx.send(ObserverCommand::DeregisterBitcoinHook(
let _ = observer_commands_tx.send(ObserverCommand::DeregisterBitcoinPredicate(
chainhook.uuid.clone(),
ApiKey(None),
));