fix: the stackerdb event observer now sends chunks over the event observer, not just chunk metadata

This commit is contained in:
Jude Nelson
2023-08-25 14:40:32 -04:00
parent 1f3bd2e7b4
commit 9c4e540250
4 changed files with 29 additions and 30 deletions

View File

@@ -19,6 +19,7 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::mem;
use rand::prelude::*;
use rand::thread_rng;
@@ -1739,21 +1740,18 @@ impl Relayer {
/// Process HTTP-uploaded stackerdb chunks.
/// They're already stored by the RPC handler, so just forward events for them.
pub fn process_uploaded_stackerdb_chunks(
uploaded_chunks: &[StackerDBPushChunkData],
uploaded_chunks: Vec<StackerDBPushChunkData>,
event_observer: Option<&dyn StackerDBEventDispatcher>,
) {
if let Some(observer) = event_observer {
let mut all_events: HashMap<QualifiedContractIdentifier, Vec<SlotMetadata>> =
let mut all_events: HashMap<QualifiedContractIdentifier, Vec<StackerDBChunkData>> =
HashMap::new();
for chunk in uploaded_chunks {
for chunk in uploaded_chunks.into_iter() {
debug!("Got uploaded StackerDB chunk"; "stackerdb_contract_id" => &format!("{}", &chunk.contract_id), "slot_id" => chunk.chunk_data.slot_id, "slot_version" => chunk.chunk_data.slot_version);
if let Some(events) = all_events.get_mut(&chunk.contract_id) {
events.push(chunk.chunk_data.get_slot_metadata());
events.push(chunk.chunk_data);
} else {
all_events.insert(
chunk.contract_id.clone(),
vec![chunk.chunk_data.get_slot_metadata()],
);
all_events.insert(chunk.contract_id.clone(), vec![chunk.chunk_data]);
}
}
for (contract_id, new_chunks) in all_events.iter() {
@@ -1766,31 +1764,31 @@ impl Relayer {
pub fn process_stacker_db_chunks(
stackerdbs: &mut StackerDBs,
stackerdb_configs: &HashMap<QualifiedContractIdentifier, StackerDBConfig>,
sync_results: &[StackerDBSyncResult],
sync_results: Vec<StackerDBSyncResult>,
event_observer: Option<&dyn StackerDBEventDispatcher>,
) -> Result<(), Error> {
// sort stacker results by contract, so as to minimize the number of transactions.
let mut sync_results_map: HashMap<&QualifiedContractIdentifier, Vec<&StackerDBSyncResult>> =
let mut sync_results_map: HashMap<QualifiedContractIdentifier, Vec<StackerDBSyncResult>> =
HashMap::new();
for sync_result in sync_results {
let sc = &sync_result.contract_id;
if let Some(result_list) = sync_results_map.get_mut(sc) {
for sync_result in sync_results.into_iter() {
let sc = sync_result.contract_id.clone();
if let Some(result_list) = sync_results_map.get_mut(&sc) {
result_list.push(sync_result);
} else {
sync_results_map.insert(sc, vec![sync_result]);
}
}
let mut all_events: HashMap<QualifiedContractIdentifier, Vec<SlotMetadata>> =
let mut all_events: HashMap<QualifiedContractIdentifier, Vec<StackerDBChunkData>> =
HashMap::new();
for (sc, sync_results) in sync_results_map.iter() {
if let Some(config) = stackerdb_configs.get(sc) {
for (sc, sync_results) in sync_results_map.into_iter() {
if let Some(config) = stackerdb_configs.get(&sc) {
let tx = stackerdbs.tx_begin(config.clone())?;
for sync_result in sync_results {
for chunk in sync_result.chunks_to_store.iter() {
for sync_result in sync_results.into_iter() {
for chunk in sync_result.chunks_to_store.into_iter() {
let md = chunk.get_slot_metadata();
if let Err(e) = tx.try_replace_chunk(sc, &md, &chunk.data) {
if let Err(e) = tx.try_replace_chunk(&sc, &md, &chunk.data) {
warn!(
"Failed to store chunk for StackerDB";
"stackerdb_contract_id" => &format!("{}", &sync_result.contract_id),
@@ -1804,9 +1802,9 @@ impl Relayer {
}
if let Some(event_list) = all_events.get_mut(&sync_result.contract_id) {
event_list.push(md);
event_list.push(chunk);
} else {
all_events.insert(sync_result.contract_id.clone(), vec![md]);
all_events.insert(sync_result.contract_id.clone(), vec![chunk]);
}
}
}
@@ -1849,7 +1847,7 @@ impl Relayer {
Relayer::process_stacker_db_chunks(
stackerdbs,
stackerdb_configs,
&sync_results,
sync_results,
event_observer,
)
}
@@ -2004,7 +2002,7 @@ impl Relayer {
// push events for HTTP-uploaded stacker DB chunks
Relayer::process_uploaded_stackerdb_chunks(
&network_result.uploaded_stackerdb_chunks,
mem::replace(&mut network_result.uploaded_stackerdb_chunks, vec![]),
event_observer.map(|obs| obs.as_stackerdb_event_dispatcher()),
);
@@ -2012,7 +2010,7 @@ impl Relayer {
Relayer::process_stacker_db_chunks(
&mut self.stacker_dbs,
&network_result.stacker_db_configs,
&network_result.stacker_db_sync_results,
mem::replace(&mut network_result.stacker_db_sync_results, vec![]),
event_observer.map(|obs| obs.as_stackerdb_event_dispatcher()),
)?;

View File

@@ -297,7 +297,7 @@ pub trait StackerDBEventDispatcher {
fn new_stackerdb_chunks(
&self,
contract_id: &QualifiedContractIdentifier,
chunk_info: &[SlotMetadata],
chunk_info: &[StackerDBChunkData],
);
}

View File

@@ -39,7 +39,7 @@ use stacks::chainstate::stacks::TransactionPayload;
use stacks::net::stackerdb::StackerDBEventDispatcher;
use stacks::libstackerdb::SlotMetadata;
use stacks::libstackerdb::StackerDBChunkData;
#[derive(Debug, Clone)]
struct EventObserver {
@@ -94,7 +94,7 @@ pub struct MinedMicroblockEvent {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StackerDBChunksEvent {
pub contract_id: QualifiedContractIdentifier,
pub modified_slots: Vec<SlotMetadata>,
pub modified_slots: Vec<StackerDBChunkData>,
}
impl EventObserver {
@@ -478,7 +478,7 @@ impl StackerDBEventDispatcher for EventDispatcher {
fn new_stackerdb_chunks(
&self,
contract_id: &QualifiedContractIdentifier,
chunks: &[SlotMetadata],
chunks: &[StackerDBChunkData],
) {
self.process_new_stackerdb_chunks(contract_id, chunks);
}
@@ -915,7 +915,7 @@ impl EventDispatcher {
pub fn process_new_stackerdb_chunks(
&self,
contract_id: &QualifiedContractIdentifier,
new_chunks: &[SlotMetadata],
new_chunks: &[StackerDBChunkData],
) {
let interested_observers: Vec<_> = self
.registered_observers

View File

@@ -407,6 +407,7 @@ fn test_stackerdb_event_observer() {
let expected_data = format!("Hello chunks {}", &i);
let expected_hash = Sha512Trunc256Sum::from_data(expected_data.as_bytes());
assert_eq!(event.data_hash, expected_hash);
assert_eq!(event.data, expected_data.as_bytes().to_vec());
assert_eq!(event.data_hash(), expected_hash);
}
}