fix: block streaming

This commit is contained in:
Ludo Galabru
2023-08-03 05:35:46 +02:00
parent c31611fb28
commit dcdfd1655c
3 changed files with 10 additions and 9 deletions

View File

@@ -177,9 +177,9 @@ pub fn process_blocks(
hord_config: &HordConfig,
post_processor: &Option<Sender<BitcoinBlockData>>,
ctx: &Context,
) {
) -> Vec<BitcoinBlockData> {
let mut cache_l1 = HashMap::new();
let mut updated_blocks = vec![];
for _cursor in 0..next_blocks.len() {
let mut block = next_blocks.remove(0);
@@ -195,9 +195,11 @@ pub fn process_blocks(
);
if let Some(post_processor_tx) = post_processor {
let _ = post_processor_tx.send(block);
let _ = post_processor_tx.send(block.clone());
}
updated_blocks.push(block);
}
updated_blocks
}
pub fn process_block(

View File

@@ -134,7 +134,7 @@ fn handle_create_predicate(
if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) {
let key: String = format!(
"hord::{}",
"{}",
ChainhookSpecification::bitcoin_key(&predicate_uuid)
);
match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) {
@@ -173,7 +173,7 @@ fn handle_get_predicate(
match open_readwrite_predicates_db_conn(api_config) {
Ok(mut predicates_db_conn) => {
let key: String = format!(
"hord::{}",
"{}",
ChainhookSpecification::bitcoin_key(&predicate_uuid)
);
let entry = match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) {
@@ -272,7 +272,7 @@ pub fn get_entries_from_predicates_db(
predicate_db_conn: &mut Connection,
ctx: &Context,
) -> Result<Vec<(ChainhookSpecification, PredicateStatus)>, String> {
let key: String = format!("hord::{}", ChainhookSpecification::bitcoin_key("*"));
let key: String = format!("{}", ChainhookSpecification::bitcoin_key("*"));
let chainhooks_to_load: Vec<String> = predicate_db_conn
.scan_match(key)
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?

View File

@@ -105,7 +105,6 @@ impl Service {
let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> =
vec![];
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() {
bitcoin_predicate.enabled = false;
bitcoin_predicates_ref.push(bitcoin_predicate);
}
while let Ok(block) = rx_replayer.recv() {
@@ -292,7 +291,7 @@ impl Service {
}
let mut hint = InscriptionHeigthHint::new();
process_blocks(
let updated_blocks = process_blocks(
&mut blocks,
&moved_traversals_cache,
&mut hint,
@@ -302,7 +301,7 @@ impl Service {
&ctx,
);
let _ = block_processor_out_tx.send(blocks);
let _ = block_processor_out_tx.send(updated_blocks);
}
}
});