mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-15 00:49:30 +08:00
feat: revisit threading model
This commit is contained in:
@@ -113,6 +113,7 @@ impl Config {
|
||||
HordConfig {
|
||||
network_thread_max: self.limits.max_number_of_networking_threads,
|
||||
ingestion_thread_max: self.limits.max_number_of_processing_threads,
|
||||
ingestion_thread_queue_size: 4,
|
||||
cache_size: self.limits.max_caching_memory_size_mb,
|
||||
db_path: self.expected_cache_path(),
|
||||
first_inscription_height: match self.network.bitcoin_network {
|
||||
|
||||
@@ -2,7 +2,9 @@ use std::{
|
||||
collections::{BTreeMap, HashMap, VecDeque},
|
||||
hash::BuildHasherDefault,
|
||||
path::PathBuf,
|
||||
sync::{mpsc::Sender, Arc}, thread::sleep, time::Duration,
|
||||
sync::{mpsc::Sender, Arc},
|
||||
thread::sleep,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use chainhook_sdk::{
|
||||
@@ -760,12 +762,12 @@ pub fn find_all_inscriptions_in_block(
|
||||
block_height: &u64,
|
||||
inscriptions_db_conn: &Connection,
|
||||
ctx: &Context,
|
||||
) -> Vec<(TransactionIdentifier, TraversalResult)> {
|
||||
) -> BTreeMap<(TransactionIdentifier, usize), TraversalResult> {
|
||||
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
|
||||
let mut stmt = inscriptions_db_conn
|
||||
.prepare("SELECT inscription_number, ordinal_number, inscription_id FROM inscriptions where block_height = ? ORDER BY inscription_number ASC")
|
||||
.unwrap();
|
||||
let mut results = vec![];
|
||||
let mut results = BTreeMap::new();
|
||||
let mut rows = stmt.query(args).unwrap();
|
||||
|
||||
let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_conn, ctx);
|
||||
@@ -789,7 +791,10 @@ pub fn find_all_inscriptions_in_block(
|
||||
transaction_identifier_inscription: transaction_identifier_inscription.clone(),
|
||||
transfer_data: transfer_data,
|
||||
};
|
||||
results.push((transaction_identifier_inscription, traversal));
|
||||
results.insert(
|
||||
(transaction_identifier_inscription, inscription_input_index),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
@@ -1009,8 +1014,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
let moved_ctx = moved_ctx.clone();
|
||||
let moved_http_client = http_client.clone();
|
||||
retrieve_block_data_pool.execute(move || {
|
||||
moved_ctx
|
||||
.try_log(|logger| debug!(logger, "Fetching block #{block_height}"));
|
||||
moved_ctx.try_log(|logger| debug!(logger, "Fetching block #{block_height}"));
|
||||
let future = download_block_with_retry(
|
||||
&moved_http_client,
|
||||
&block_hash,
|
||||
@@ -2280,8 +2284,7 @@ pub async fn rebuild_rocks_db(
|
||||
|
||||
let number_of_blocks_to_process = end_block - start_block + 1;
|
||||
|
||||
let compress_block_data_pool = ThreadPool::new(hord_config.ingestion_thread_max);
|
||||
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::unbounded();
|
||||
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(50);
|
||||
let http_client = build_http_client();
|
||||
|
||||
let moved_config = bitcoin_config.clone();
|
||||
@@ -2292,12 +2295,12 @@ pub async fn rebuild_rocks_db(
|
||||
|
||||
let mut block_heights = VecDeque::from((start_block..=end_block).collect::<Vec<u64>>());
|
||||
|
||||
for _ in 0..8 {
|
||||
for _ in 0..hord_config.network_thread_max {
|
||||
if let Some(block_height) = block_heights.pop_front() {
|
||||
let config = moved_config.clone();
|
||||
let ctx = moved_ctx.clone();
|
||||
let http_client = moved_http_client.clone();
|
||||
sleep(Duration::from_millis(500));
|
||||
sleep(Duration::from_millis(200));
|
||||
set.spawn(try_download_block_bytes_with_retry(
|
||||
http_client,
|
||||
block_height,
|
||||
@@ -2312,40 +2315,39 @@ pub async fn rebuild_rocks_db(
|
||||
|
||||
let mut tx_thread_pool = vec![];
|
||||
let mut rx_thread_pool = vec![];
|
||||
let mut thread_pool_handles = vec![];
|
||||
|
||||
for _ in 0..hord_config.ingestion_thread_max {
|
||||
let (tx, rx) = bounded::<Option<Vec<u8>>>(8);
|
||||
let (tx, rx) = bounded::<Option<Vec<u8>>>(hord_config.ingestion_thread_queue_size);
|
||||
tx_thread_pool.push(tx);
|
||||
rx_thread_pool.push(rx);
|
||||
}
|
||||
|
||||
let compression_thread = hiro_system_kit::thread_named("Block data compression")
|
||||
.spawn(move || {
|
||||
for rx in rx_thread_pool.into_iter() {
|
||||
let block_compressed_tx_moved = block_compressed_tx.clone();
|
||||
let moved_ctx: Context = moved_ctx.clone();
|
||||
let moved_bitcoin_network = moved_bitcoin_network.clone();
|
||||
compress_block_data_pool.execute(move || {
|
||||
while let Ok(Some(block_bytes)) = rx.recv() {
|
||||
let raw_block_data =
|
||||
parse_downloaded_block(block_bytes).expect("unable to parse block");
|
||||
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
|
||||
.expect("unable to compress block");
|
||||
let block_data = hord::parse_ordinals_and_standardize_block(
|
||||
raw_block_data,
|
||||
&moved_bitcoin_network,
|
||||
&moved_ctx,
|
||||
)
|
||||
.expect("unable to deserialize block");
|
||||
for rx in rx_thread_pool.into_iter() {
|
||||
let block_compressed_tx_moved = block_compressed_tx.clone();
|
||||
let moved_ctx: Context = moved_ctx.clone();
|
||||
let moved_bitcoin_network = moved_bitcoin_network.clone();
|
||||
|
||||
let _ =
|
||||
block_compressed_tx_moved.send(Some((block_data, compressed_block)));
|
||||
}
|
||||
});
|
||||
}
|
||||
let _ = compress_block_data_pool.join();
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
let handle = hiro_system_kit::thread_named("Block data compression")
|
||||
.spawn(move || {
|
||||
while let Ok(Some(block_bytes)) = rx.recv() {
|
||||
let raw_block_data =
|
||||
parse_downloaded_block(block_bytes).expect("unable to parse block");
|
||||
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
|
||||
.expect("unable to compress block");
|
||||
let block_data = hord::parse_ordinals_and_standardize_block(
|
||||
raw_block_data,
|
||||
&moved_bitcoin_network,
|
||||
&moved_ctx,
|
||||
)
|
||||
.expect("unable to deserialize block");
|
||||
|
||||
let _ = block_compressed_tx_moved.send(Some((block_data, compressed_block)));
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
thread_pool_handles.push(handle);
|
||||
}
|
||||
|
||||
let cloned_ctx = ctx.clone();
|
||||
|
||||
@@ -2354,52 +2356,56 @@ pub async fn rebuild_rocks_db(
|
||||
let mut inbox = HashMap::new();
|
||||
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
|
||||
let mut blocks_processed = 0;
|
||||
while let Ok(Some((block, compacted_block))) =
|
||||
block_compressed_rx.recv()
|
||||
{
|
||||
blocks_processed += 1;
|
||||
let block_index = block.block_identifier.index;
|
||||
|
||||
// In the context of ordinals, we're constrained to process blocks sequentially
|
||||
// Blocks are processed by a threadpool and could be coming out of order.
|
||||
// Inbox block for later if the current block is not the one we should be
|
||||
// processing.
|
||||
if block_index >= start_sequencing_blocks_at_height {
|
||||
inbox.insert(block_index, (block, compacted_block));
|
||||
let mut chunk = Vec::new();
|
||||
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Dequeuing block #{inbox_cursor} for processing (# blocks inboxed: {})",
|
||||
inbox.len()
|
||||
)
|
||||
});
|
||||
chunk.push((block, compacted_block));
|
||||
inbox_cursor += 1;
|
||||
}
|
||||
if chunk.is_empty() {
|
||||
// Early return / wait for next block
|
||||
cloned_ctx.try_log(|logger| {
|
||||
info!(logger, "Inboxing compacted block #{block_index}")
|
||||
});
|
||||
continue;
|
||||
loop {
|
||||
// Dequeue all the blocks available
|
||||
let mut new_blocks = vec![];
|
||||
while let Ok(Some((block, compacted_block))) = block_compressed_rx.try_recv()
|
||||
{
|
||||
blocks_processed += 1;
|
||||
new_blocks.push((block, compacted_block))
|
||||
}
|
||||
//
|
||||
let mut ooo_processing = vec![];
|
||||
for (block, compacted_block) in new_blocks.into_iter() {
|
||||
let block_index = block.block_identifier.index;
|
||||
if block_index >= start_sequencing_blocks_at_height {
|
||||
inbox.insert(block_index, (block, compacted_block));
|
||||
} else {
|
||||
if let Some(ref tx) = blocks_post_processor {
|
||||
let _ = tx.send(chunk);
|
||||
}
|
||||
ooo_processing.push((block, compacted_block));
|
||||
// todo: do something
|
||||
}
|
||||
}
|
||||
|
||||
if blocks_processed == number_of_blocks_to_process {
|
||||
// In order processing: construct the longest sequence of known blocks
|
||||
let mut chunk = Vec::new();
|
||||
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Local block storage successfully seeded with #{blocks_processed} blocks"
|
||||
"Adding block #{inbox_cursor} to next sequence (# blocks inboxed: {})",
|
||||
inbox.len()
|
||||
)
|
||||
});
|
||||
break;
|
||||
chunk.push((block, compacted_block));
|
||||
inbox_cursor += 1;
|
||||
}
|
||||
if !chunk.is_empty() {
|
||||
if let Some(ref tx) = blocks_post_processor {
|
||||
let _ = tx.send(chunk);
|
||||
}
|
||||
} else {
|
||||
if blocks_processed == number_of_blocks_to_process {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Local block storage successfully seeded with #{blocks_processed} blocks"
|
||||
)
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_secs(3));
|
||||
}
|
||||
()
|
||||
})
|
||||
@@ -2429,7 +2435,11 @@ pub async fn rebuild_rocks_db(
|
||||
for tx in tx_thread_pool.iter() {
|
||||
let _ = tx.send(None);
|
||||
}
|
||||
let _ = compression_thread.join();
|
||||
|
||||
for handle in thread_pool_handles.into_iter() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
|
||||
let _ = storage_thread.join();
|
||||
let _ = set.shutdown();
|
||||
|
||||
|
||||
@@ -32,10 +32,11 @@ use chainhook_sdk::{
|
||||
};
|
||||
|
||||
use crate::db::{
|
||||
find_inscription_with_ordinal_number, find_inscriptions_at_wached_outpoint,
|
||||
get_any_entry_in_ordinal_activities, insert_entry_in_blocks, insert_entry_in_inscriptions,
|
||||
insert_transfer_in_locations_tx, retrieve_satoshi_point_using_lazy_storage,
|
||||
retrieve_satoshi_point_using_lazy_storage_v3, InscriptionHeigthHint,
|
||||
find_all_inscriptions_in_block, find_inscription_with_ordinal_number,
|
||||
find_inscriptions_at_wached_outpoint, get_any_entry_in_ordinal_activities,
|
||||
insert_entry_in_blocks, insert_entry_in_inscriptions, insert_transfer_in_locations_tx,
|
||||
retrieve_satoshi_point_using_lazy_storage, retrieve_satoshi_point_using_lazy_storage_v3,
|
||||
InscriptionHeigthHint,
|
||||
};
|
||||
use crate::ord::height::Height;
|
||||
|
||||
@@ -67,6 +68,7 @@ use crate::ord::inscription_id::InscriptionId;
|
||||
pub struct HordConfig {
|
||||
pub network_thread_max: usize,
|
||||
pub ingestion_thread_max: usize,
|
||||
pub ingestion_thread_queue_size: usize,
|
||||
pub cache_size: usize,
|
||||
pub db_path: PathBuf,
|
||||
pub first_inscription_height: u64,
|
||||
@@ -248,6 +250,7 @@ pub fn new_traversals_lazy_cache(
|
||||
)
|
||||
}
|
||||
|
||||
// TODO(lgalabru): deprecate
|
||||
pub fn retrieve_inscribed_satoshi_points_from_block(
|
||||
block: &BitcoinBlockData,
|
||||
inscriptions_db_conn: Option<&Connection>,
|
||||
@@ -482,7 +485,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Block #{} processed through hord, revealing {} inscriptions [{}]",
|
||||
"Block #{} revealed {} inscriptions [{}]",
|
||||
new_block.block_identifier.index,
|
||||
inscriptions_revealed.len(),
|
||||
inscriptions_revealed.join(", ")
|
||||
@@ -992,6 +995,7 @@ pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(lgalabru): Deprecate
|
||||
pub async fn perform_hord_db_update(
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
@@ -1119,16 +1123,15 @@ pub fn get_transactions_to_process(
|
||||
let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![];
|
||||
let mut l1_cache_hits = vec![];
|
||||
|
||||
let mut known_transactions =
|
||||
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx);
|
||||
|
||||
for tx in block.transactions.iter().skip(1) {
|
||||
// Have a new inscription been revealed, if so, are looking at a re-inscription
|
||||
for ordinal_event in tx.metadata.ordinal_operations.iter() {
|
||||
let (inscription_data, _is_cursed) = match ordinal_event {
|
||||
OrdinalOperation::InscriptionRevealed(inscription_data) => {
|
||||
(inscription_data, false)
|
||||
}
|
||||
OrdinalOperation::CursedInscriptionRevealed(inscription_data) => {
|
||||
(inscription_data, false)
|
||||
}
|
||||
let inscription_data = match ordinal_event {
|
||||
OrdinalOperation::InscriptionRevealed(inscription_data) => inscription_data,
|
||||
OrdinalOperation::CursedInscriptionRevealed(inscription_data) => inscription_data,
|
||||
OrdinalOperation::InscriptionTransferred(_) => {
|
||||
continue;
|
||||
}
|
||||
@@ -1142,25 +1145,17 @@ pub fn get_transactions_to_process(
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(Some((traversal, _))) = find_inscription_with_id(
|
||||
&inscription_data.inscription_id,
|
||||
&inscriptions_db_conn,
|
||||
ctx,
|
||||
) {
|
||||
cache_l1.insert(
|
||||
(
|
||||
tx.transaction_identifier.clone(),
|
||||
inscription_data.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
} else {
|
||||
// Enqueue for traversals
|
||||
transactions_ids.push((
|
||||
tx.transaction_identifier.clone(),
|
||||
inscription_data.inscription_input_index,
|
||||
));
|
||||
if let Some(entry) = known_transactions.remove(&key) {
|
||||
l1_cache_hits.push(key.clone());
|
||||
cache_l1.insert(key, entry);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Enqueue for traversals
|
||||
transactions_ids.push((
|
||||
tx.transaction_identifier.clone(),
|
||||
inscription_data.inscription_input_index,
|
||||
));
|
||||
}
|
||||
}
|
||||
(transactions_ids, l1_cache_hits)
|
||||
@@ -1242,11 +1237,13 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {})",
|
||||
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})",
|
||||
block.block_identifier.index,
|
||||
transactions_ids.len(),
|
||||
l1_cache_hits.len(),
|
||||
next_blocks.len(),
|
||||
cache_l1.len(),
|
||||
cache_l2.len(),
|
||||
)
|
||||
});
|
||||
|
||||
@@ -1342,7 +1339,27 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for tx in tx_thread_pool.iter() {
|
||||
// Empty the queue
|
||||
if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() {
|
||||
if let Ok(traversal) = traversal_result {
|
||||
inner_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
|
||||
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
|
||||
)
|
||||
});
|
||||
cache_l1.insert(
|
||||
(
|
||||
traversal.transaction_identifier_inscription.clone(),
|
||||
traversal.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
}
|
||||
let _ = tx.send(None);
|
||||
}
|
||||
|
||||
|
||||
@@ -23,13 +23,7 @@ pub fn start_ordinals_number_processor(
|
||||
crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>,
|
||||
JoinHandle<()>,
|
||||
) {
|
||||
// let mut batches = HashMap::new();
|
||||
let hord_config = config.get_hord_config();
|
||||
|
||||
let (tx, rx) = crossbeam_channel::bounded::<Vec<(BitcoinBlockData, LazyBlock)>>(
|
||||
hord_config.ingestion_thread_max,
|
||||
);
|
||||
// let (inner_tx, inner_rx) = channel();
|
||||
let (tx, rx) = crossbeam_channel::bounded::<Vec<(BitcoinBlockData, LazyBlock)>>(1);
|
||||
|
||||
let config = config.clone();
|
||||
let ctx = ctx.clone();
|
||||
|
||||
@@ -136,14 +136,8 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
// Evaluating every single block is required for also keeping track of transfers.
|
||||
let local_traverals =
|
||||
find_all_inscriptions_in_block(&cursor, &inscriptions_db_conn, &ctx);
|
||||
for (transaction_identifier, traversal_result) in local_traverals.into_iter() {
|
||||
traversals.insert(
|
||||
(
|
||||
transaction_identifier,
|
||||
traversal_result.inscription_input_index,
|
||||
),
|
||||
traversal_result,
|
||||
);
|
||||
for (key, traversal_result) in local_traverals.into_iter() {
|
||||
traversals.insert(key, traversal_result);
|
||||
}
|
||||
|
||||
let transaction = inscriptions_db_conn.transaction().unwrap();
|
||||
|
||||
Reference in New Issue
Block a user