diff --git a/testnet/stacks-node/src/event_dispatcher.rs b/testnet/stacks-node/src/event_dispatcher.rs index 0c49b7f2f..406cfc142 100644 --- a/testnet/stacks-node/src/event_dispatcher.rs +++ b/testnet/stacks-node/src/event_dispatcher.rs @@ -20,7 +20,8 @@ use stacks::chainstate::stacks::{ }; use stacks::chainstate::stacks::{StacksBlock, StacksMicroblock}; use stacks::codec::StacksMessageCodec; -use stacks::core::mempool::{MemPoolDropReason, MemPoolEventDispatcher}; +use stacks::core::mempool::MemPoolDropReason; +use stacks::core::mempool::MemPoolEventDispatcher; use stacks::net::atlas::{Attachment, AttachmentInstance}; use stacks::types::chainstate::{BlockHeaderHash, BurnchainHeaderHash, StacksBlockId}; use stacks::util::hash::bytes_to_hex; @@ -36,6 +37,10 @@ use stacks::chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState; use stacks::chainstate::stacks::miner::TransactionEvent; use stacks::chainstate::stacks::TransactionPayload; +use stacks::net::stackerdb::StackerDBEventDispatcher; + +use stacks::libstackerdb::SlotMetadata; + #[derive(Debug, Clone)] struct EventObserver { endpoint: String, @@ -60,6 +65,7 @@ pub const PATH_MEMPOOL_TX_SUBMIT: &str = "new_mempool_tx"; pub const PATH_MEMPOOL_TX_DROP: &str = "drop_mempool_tx"; pub const PATH_MINED_BLOCK: &str = "mined_block"; pub const PATH_MINED_MICROBLOCK: &str = "mined_microblock"; +pub const PATH_STACKERDB_CHUNKS: &str = "stackerdb_chunks"; pub const PATH_BURN_BLOCK_SUBMIT: &str = "new_burn_block"; pub const PATH_BLOCK_PROCESSED: &str = "new_block"; pub const PATH_ATTACHMENT_PROCESSED: &str = "attachments/new"; @@ -84,6 +90,13 @@ pub struct MinedMicroblockEvent { pub anchor_block: BlockHeaderHash, } +/// Event structure for newly-arrived StackerDB data +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct StackerDBChunksEvent { + pub contract_id: QualifiedContractIdentifier, + pub modified_slots: Vec, +} + impl EventObserver { pub fn send_payload(&self, payload: &serde_json::Value, path: &str) { let body = match serde_json::to_vec(&payload) { @@ -336,6 +349,10 @@ impl EventObserver { self.send_payload(payload, PATH_MINED_MICROBLOCK); } + fn send_stackerdb_chunks(&self, payload: &serde_json::Value) { + self.send_payload(payload, PATH_STACKERDB_CHUNKS); + } + fn send_new_burn_block(&self, payload: &serde_json::Value) { self.send_payload(payload, PATH_BURN_BLOCK_SUBMIT); } @@ -411,6 +428,7 @@ pub struct EventDispatcher { any_event_observers_lookup: HashSet, miner_observers_lookup: HashSet, mined_microblocks_observers_lookup: HashSet, + stackerdb_observers_lookup: HashSet, } impl MemPoolEventDispatcher for EventDispatcher { @@ -455,6 +473,17 @@ impl MemPoolEventDispatcher for EventDispatcher { } } +impl StackerDBEventDispatcher for EventDispatcher { + /// Relay new StackerDB chunks + fn new_stackerdb_chunks( + &self, + contract_id: &QualifiedContractIdentifier, + chunks: &[SlotMetadata], + ) { + self.process_new_stackerdb_chunks(contract_id, chunks); + } +} + impl BlockEventDispatcher for EventDispatcher { fn announce_block( &self, @@ -520,6 +549,7 @@ impl EventDispatcher { microblock_observers_lookup: HashSet::new(), miner_observers_lookup: HashSet::new(), mined_microblocks_observers_lookup: HashSet::new(), + stackerdb_observers_lookup: HashSet::new(), } } @@ -880,6 +910,37 @@ impl EventDispatcher { } } + /// Forward newly-accepted StackerDB chunk metadata to downstream `stackerdb` observers. + /// Infallible. + pub fn process_new_stackerdb_chunks( + &self, + contract_id: &QualifiedContractIdentifier, + new_chunks: &[SlotMetadata], + ) { + let interested_observers: Vec<_> = self + .registered_observers + .iter() + .enumerate() + .filter(|(obs_id, _observer)| { + self.stackerdb_observers_lookup.contains(&(*obs_id as u16)) + || self.any_event_observers_lookup.contains(&(*obs_id as u16)) + }) + .collect(); + if interested_observers.len() < 1 { + return; + } + + let payload = serde_json::to_value(StackerDBChunksEvent { + contract_id: contract_id.clone(), + modified_slots: new_chunks.to_vec(), + }) + .expect("FATAL: failed to serialize StackerDBChunksEvent to JSON"); + + for (_, observer) in interested_observers.iter() { + observer.send_stackerdb_chunks(&payload); + } + } + pub fn process_dropped_mempool_txs(&self, txs: Vec, reason: MemPoolDropReason) { // lazily assemble payload only if we have observers let interested_observers: Vec<_> = self @@ -999,6 +1060,9 @@ impl EventDispatcher { self.mined_microblocks_observers_lookup .insert(observer_index); } + EventKeyType::StackerDBChunks => { + self.stackerdb_observers_lookup.insert(observer_index); + } } }