feat: push stackerdb events to observers with the "stackerdb" or "any" concern

This commit is contained in:
Jude Nelson
2023-08-22 17:45:32 -04:00
parent 6640032ef0
commit d5ea39eeb0

View File

@@ -20,7 +20,8 @@ use stacks::chainstate::stacks::{
};
use stacks::chainstate::stacks::{StacksBlock, StacksMicroblock};
use stacks::codec::StacksMessageCodec;
use stacks::core::mempool::{MemPoolDropReason, MemPoolEventDispatcher};
use stacks::core::mempool::MemPoolDropReason;
use stacks::core::mempool::MemPoolEventDispatcher;
use stacks::net::atlas::{Attachment, AttachmentInstance};
use stacks::types::chainstate::{BlockHeaderHash, BurnchainHeaderHash, StacksBlockId};
use stacks::util::hash::bytes_to_hex;
@@ -36,6 +37,10 @@ use stacks::chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState;
use stacks::chainstate::stacks::miner::TransactionEvent;
use stacks::chainstate::stacks::TransactionPayload;
use stacks::net::stackerdb::StackerDBEventDispatcher;
use stacks::libstackerdb::SlotMetadata;
#[derive(Debug, Clone)]
struct EventObserver {
endpoint: String,
@@ -60,6 +65,7 @@ pub const PATH_MEMPOOL_TX_SUBMIT: &str = "new_mempool_tx";
pub const PATH_MEMPOOL_TX_DROP: &str = "drop_mempool_tx";
pub const PATH_MINED_BLOCK: &str = "mined_block";
pub const PATH_MINED_MICROBLOCK: &str = "mined_microblock";
pub const PATH_STACKERDB_CHUNKS: &str = "stackerdb_chunks";
pub const PATH_BURN_BLOCK_SUBMIT: &str = "new_burn_block";
pub const PATH_BLOCK_PROCESSED: &str = "new_block";
pub const PATH_ATTACHMENT_PROCESSED: &str = "attachments/new";
@@ -84,6 +90,13 @@ pub struct MinedMicroblockEvent {
pub anchor_block: BlockHeaderHash,
}
/// Event structure for newly-arrived StackerDB data
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StackerDBChunksEvent {
pub contract_id: QualifiedContractIdentifier,
pub modified_slots: Vec<SlotMetadata>,
}
impl EventObserver {
pub fn send_payload(&self, payload: &serde_json::Value, path: &str) {
let body = match serde_json::to_vec(&payload) {
@@ -336,6 +349,10 @@ impl EventObserver {
self.send_payload(payload, PATH_MINED_MICROBLOCK);
}
fn send_stackerdb_chunks(&self, payload: &serde_json::Value) {
self.send_payload(payload, PATH_STACKERDB_CHUNKS);
}
fn send_new_burn_block(&self, payload: &serde_json::Value) {
self.send_payload(payload, PATH_BURN_BLOCK_SUBMIT);
}
@@ -411,6 +428,7 @@ pub struct EventDispatcher {
any_event_observers_lookup: HashSet<u16>,
miner_observers_lookup: HashSet<u16>,
mined_microblocks_observers_lookup: HashSet<u16>,
stackerdb_observers_lookup: HashSet<u16>,
}
impl MemPoolEventDispatcher for EventDispatcher {
@@ -455,6 +473,17 @@ impl MemPoolEventDispatcher for EventDispatcher {
}
}
impl StackerDBEventDispatcher for EventDispatcher {
/// Relay new StackerDB chunks
fn new_stackerdb_chunks(
&self,
contract_id: &QualifiedContractIdentifier,
chunks: &[SlotMetadata],
) {
self.process_new_stackerdb_chunks(contract_id, chunks);
}
}
impl BlockEventDispatcher for EventDispatcher {
fn announce_block(
&self,
@@ -520,6 +549,7 @@ impl EventDispatcher {
microblock_observers_lookup: HashSet::new(),
miner_observers_lookup: HashSet::new(),
mined_microblocks_observers_lookup: HashSet::new(),
stackerdb_observers_lookup: HashSet::new(),
}
}
@@ -880,6 +910,37 @@ impl EventDispatcher {
}
}
/// Forward newly-accepted StackerDB chunk metadata to downstream `stackerdb` observers.
/// Infallible.
pub fn process_new_stackerdb_chunks(
&self,
contract_id: &QualifiedContractIdentifier,
new_chunks: &[SlotMetadata],
) {
let interested_observers: Vec<_> = self
.registered_observers
.iter()
.enumerate()
.filter(|(obs_id, _observer)| {
self.stackerdb_observers_lookup.contains(&(*obs_id as u16))
|| self.any_event_observers_lookup.contains(&(*obs_id as u16))
})
.collect();
if interested_observers.len() < 1 {
return;
}
let payload = serde_json::to_value(StackerDBChunksEvent {
contract_id: contract_id.clone(),
modified_slots: new_chunks.to_vec(),
})
.expect("FATAL: failed to serialize StackerDBChunksEvent to JSON");
for (_, observer) in interested_observers.iter() {
observer.send_stackerdb_chunks(&payload);
}
}
pub fn process_dropped_mempool_txs(&self, txs: Vec<Txid>, reason: MemPoolDropReason) {
// lazily assemble payload only if we have observers
let interested_observers: Vec<_> = self
@@ -999,6 +1060,9 @@ impl EventDispatcher {
self.mined_microblocks_observers_lookup
.insert(observer_index);
}
EventKeyType::StackerDBChunks => {
self.stackerdb_observers_lookup.insert(observer_index);
}
}
}