fix: threading model

This commit is contained in:
Ludo Galabru
2023-03-02 17:11:11 -05:00
parent 71691778bb
commit c2354fcacd
2 changed files with 42 additions and 22 deletions

View File

@@ -1,5 +1,6 @@
mod blocks_pool;
use std::thread::JoinHandle;
use std::time::Duration;
use crate::chainhooks::types::{
@@ -106,7 +107,7 @@ pub async fn retrieve_full_block(
Ok((block_height, block))
}
pub async fn standardize_bitcoin_block(
pub fn standardize_bitcoin_block(
indexer_config: &IndexerConfig,
block_height: u64,
block: Block,
@@ -114,21 +115,38 @@ pub async fn standardize_bitcoin_block(
ctx: &Context,
) -> Result<BitcoinBlockData, String> {
let mut transactions = vec![];
ctx.try_log(|logger| slog::info!(logger, "Updating ordinal index",));
match OrdinalIndexUpdater::update(&mut bitcoin_context.ordinal_index).await {
Ok(_) => {
ctx.try_log(|logger| {
slog::info!(
logger,
"Ordinal index updated (block count: {:?})",
bitcoin_context.ordinal_index.block_count()
)
});
}
Err(e) => {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
let ordinal_index = bitcoin_context.ordinal_index.take();
let ctx_ = ctx.clone();
let handle: JoinHandle<Result<_, String>> =
hiro_system_kit::thread_named("Ordinal index update")
.spawn(move || {
if let Some(ref ordinal_index) = ordinal_index {
match hiro_system_kit::nestable_block_on(OrdinalIndexUpdater::update(
&ordinal_index,
)) {
Ok(_) => {
ctx_.try_log(|logger| {
slog::info!(logger, "Ordinal index successfully updated",)
});
}
Err(e) => {
ctx_.try_log(|logger| {
slog::error!(logger, "Error updating ordinal index",)
});
}
};
}
Ok(ordinal_index)
})
.expect("unable to detach thread");
match handle.join() {
Ok(Ok(ordinal_index)) => {
bitcoin_context.ordinal_index = ordinal_index;
}
_ => {}
}
let expected_magic_bytes = get_stacks_canonical_magic_bytes(&indexer_config.bitcoin_network);
@@ -153,10 +171,10 @@ pub async fn standardize_bitcoin_block(
}
let mut ordinal_operations = vec![];
if let Some(op) =
try_parse_ordinal_operation(&tx, block_height, &bitcoin_context.ordinal_index, ctx)
{
ordinal_operations.push(op);
if let Some(ref ordinal_index) = bitcoin_context.ordinal_index {
if let Some(op) = try_parse_ordinal_operation(&tx, block_height, &ordinal_index, ctx) {
ordinal_operations.push(op);
}
}
let mut inputs = vec![];

View File

@@ -36,12 +36,14 @@ impl StacksChainContext {
}
pub struct BitcoinChainContext {
ordinal_index: OrdinalIndex,
ordinal_index: Option<OrdinalIndex>,
}
impl BitcoinChainContext {
pub fn new(ordinal_index: OrdinalIndex) -> BitcoinChainContext {
BitcoinChainContext { ordinal_index }
BitcoinChainContext {
ordinal_index: Some(ordinal_index),
}
}
}
@@ -85,13 +87,13 @@ impl Indexer {
block: Block,
ctx: &Context,
) -> Result<Option<BitcoinChainEvent>, String> {
let block = hiro_system_kit::nestable_block_on(bitcoin::standardize_bitcoin_block(
let block = bitcoin::standardize_bitcoin_block(
&self.config,
block_height,
block,
&mut self.bitcoin_context,
ctx,
))?;
)?;
let event = self.bitcoin_blocks_pool.process_block(block, ctx);
event
}