mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-04-25 12:25:07 +08:00
fix: threading model
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
mod blocks_pool;
|
||||
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::chainhooks::types::{
|
||||
@@ -106,7 +107,7 @@ pub async fn retrieve_full_block(
|
||||
Ok((block_height, block))
|
||||
}
|
||||
|
||||
pub async fn standardize_bitcoin_block(
|
||||
pub fn standardize_bitcoin_block(
|
||||
indexer_config: &IndexerConfig,
|
||||
block_height: u64,
|
||||
block: Block,
|
||||
@@ -114,21 +115,38 @@ pub async fn standardize_bitcoin_block(
|
||||
ctx: &Context,
|
||||
) -> Result<BitcoinBlockData, String> {
|
||||
let mut transactions = vec![];
|
||||
|
||||
ctx.try_log(|logger| slog::info!(logger, "Updating ordinal index",));
|
||||
match OrdinalIndexUpdater::update(&mut bitcoin_context.ordinal_index).await {
|
||||
Ok(_) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Ordinal index updated (block count: {:?})",
|
||||
bitcoin_context.ordinal_index.block_count()
|
||||
)
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
|
||||
|
||||
let ordinal_index = bitcoin_context.ordinal_index.take();
|
||||
let ctx_ = ctx.clone();
|
||||
let handle: JoinHandle<Result<_, String>> =
|
||||
hiro_system_kit::thread_named("Ordinal index update")
|
||||
.spawn(move || {
|
||||
if let Some(ref ordinal_index) = ordinal_index {
|
||||
match hiro_system_kit::nestable_block_on(OrdinalIndexUpdater::update(
|
||||
&ordinal_index,
|
||||
)) {
|
||||
Ok(_) => {
|
||||
ctx_.try_log(|logger| {
|
||||
slog::info!(logger, "Ordinal index successfully updated",)
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
ctx_.try_log(|logger| {
|
||||
slog::error!(logger, "Error updating ordinal index",)
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(ordinal_index)
|
||||
})
|
||||
.expect("unable to detach thread");
|
||||
|
||||
match handle.join() {
|
||||
Ok(Ok(ordinal_index)) => {
|
||||
bitcoin_context.ordinal_index = ordinal_index;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let expected_magic_bytes = get_stacks_canonical_magic_bytes(&indexer_config.bitcoin_network);
|
||||
@@ -153,10 +171,10 @@ pub async fn standardize_bitcoin_block(
|
||||
}
|
||||
|
||||
let mut ordinal_operations = vec![];
|
||||
if let Some(op) =
|
||||
try_parse_ordinal_operation(&tx, block_height, &bitcoin_context.ordinal_index, ctx)
|
||||
{
|
||||
ordinal_operations.push(op);
|
||||
if let Some(ref ordinal_index) = bitcoin_context.ordinal_index {
|
||||
if let Some(op) = try_parse_ordinal_operation(&tx, block_height, &ordinal_index, ctx) {
|
||||
ordinal_operations.push(op);
|
||||
}
|
||||
}
|
||||
|
||||
let mut inputs = vec![];
|
||||
|
||||
@@ -36,12 +36,14 @@ impl StacksChainContext {
|
||||
}
|
||||
|
||||
pub struct BitcoinChainContext {
|
||||
ordinal_index: OrdinalIndex,
|
||||
ordinal_index: Option<OrdinalIndex>,
|
||||
}
|
||||
|
||||
impl BitcoinChainContext {
|
||||
pub fn new(ordinal_index: OrdinalIndex) -> BitcoinChainContext {
|
||||
BitcoinChainContext { ordinal_index }
|
||||
BitcoinChainContext {
|
||||
ordinal_index: Some(ordinal_index),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,13 +87,13 @@ impl Indexer {
|
||||
block: Block,
|
||||
ctx: &Context,
|
||||
) -> Result<Option<BitcoinChainEvent>, String> {
|
||||
let block = hiro_system_kit::nestable_block_on(bitcoin::standardize_bitcoin_block(
|
||||
let block = bitcoin::standardize_bitcoin_block(
|
||||
&self.config,
|
||||
block_height,
|
||||
block,
|
||||
&mut self.bitcoin_context,
|
||||
ctx,
|
||||
))?;
|
||||
)?;
|
||||
let event = self.bitcoin_blocks_pool.process_block(block, ctx);
|
||||
event
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user