feat: revisit threading model

This commit is contained in:
Ludo Galabru
2023-08-01 16:10:55 +02:00
parent 31d99809e2
commit 05b6d5c4d7
5 changed files with 135 additions and 119 deletions

View File

@@ -113,6 +113,7 @@ impl Config {
HordConfig {
network_thread_max: self.limits.max_number_of_networking_threads,
ingestion_thread_max: self.limits.max_number_of_processing_threads,
ingestion_thread_queue_size: 4,
cache_size: self.limits.max_caching_memory_size_mb,
db_path: self.expected_cache_path(),
first_inscription_height: match self.network.bitcoin_network {

View File

@@ -2,7 +2,9 @@ use std::{
collections::{BTreeMap, HashMap, VecDeque},
hash::BuildHasherDefault,
path::PathBuf,
sync::{mpsc::Sender, Arc}, thread::sleep, time::Duration,
sync::{mpsc::Sender, Arc},
thread::sleep,
time::Duration,
};
use chainhook_sdk::{
@@ -760,12 +762,12 @@ pub fn find_all_inscriptions_in_block(
block_height: &u64,
inscriptions_db_conn: &Connection,
ctx: &Context,
) -> Vec<(TransactionIdentifier, TraversalResult)> {
) -> BTreeMap<(TransactionIdentifier, usize), TraversalResult> {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT inscription_number, ordinal_number, inscription_id FROM inscriptions where block_height = ? ORDER BY inscription_number ASC")
.unwrap();
let mut results = vec![];
let mut results = BTreeMap::new();
let mut rows = stmt.query(args).unwrap();
let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_conn, ctx);
@@ -789,7 +791,10 @@ pub fn find_all_inscriptions_in_block(
transaction_identifier_inscription: transaction_identifier_inscription.clone(),
transfer_data: transfer_data,
};
results.push((transaction_identifier_inscription, traversal));
results.insert(
(transaction_identifier_inscription, inscription_input_index),
traversal,
);
}
return results;
}
@@ -1009,8 +1014,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
let moved_ctx = moved_ctx.clone();
let moved_http_client = http_client.clone();
retrieve_block_data_pool.execute(move || {
moved_ctx
.try_log(|logger| debug!(logger, "Fetching block #{block_height}"));
moved_ctx.try_log(|logger| debug!(logger, "Fetching block #{block_height}"));
let future = download_block_with_retry(
&moved_http_client,
&block_hash,
@@ -2280,8 +2284,7 @@ pub async fn rebuild_rocks_db(
let number_of_blocks_to_process = end_block - start_block + 1;
let compress_block_data_pool = ThreadPool::new(hord_config.ingestion_thread_max);
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::unbounded();
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(50);
let http_client = build_http_client();
let moved_config = bitcoin_config.clone();
@@ -2292,12 +2295,12 @@ pub async fn rebuild_rocks_db(
let mut block_heights = VecDeque::from((start_block..=end_block).collect::<Vec<u64>>());
for _ in 0..8 {
for _ in 0..hord_config.network_thread_max {
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = moved_ctx.clone();
let http_client = moved_http_client.clone();
sleep(Duration::from_millis(500));
sleep(Duration::from_millis(200));
set.spawn(try_download_block_bytes_with_retry(
http_client,
block_height,
@@ -2312,40 +2315,39 @@ pub async fn rebuild_rocks_db(
let mut tx_thread_pool = vec![];
let mut rx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
for _ in 0..hord_config.ingestion_thread_max {
let (tx, rx) = bounded::<Option<Vec<u8>>>(8);
let (tx, rx) = bounded::<Option<Vec<u8>>>(hord_config.ingestion_thread_queue_size);
tx_thread_pool.push(tx);
rx_thread_pool.push(rx);
}
let compression_thread = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
for rx in rx_thread_pool.into_iter() {
let block_compressed_tx_moved = block_compressed_tx.clone();
let moved_ctx: Context = moved_ctx.clone();
let moved_bitcoin_network = moved_bitcoin_network.clone();
compress_block_data_pool.execute(move || {
while let Ok(Some(block_bytes)) = rx.recv() {
let raw_block_data =
parse_downloaded_block(block_bytes).expect("unable to parse block");
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
.expect("unable to compress block");
let block_data = hord::parse_ordinals_and_standardize_block(
raw_block_data,
&moved_bitcoin_network,
&moved_ctx,
)
.expect("unable to deserialize block");
for rx in rx_thread_pool.into_iter() {
let block_compressed_tx_moved = block_compressed_tx.clone();
let moved_ctx: Context = moved_ctx.clone();
let moved_bitcoin_network = moved_bitcoin_network.clone();
let _ =
block_compressed_tx_moved.send(Some((block_data, compressed_block)));
}
});
}
let _ = compress_block_data_pool.join();
})
.expect("unable to spawn thread");
let handle = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
while let Ok(Some(block_bytes)) = rx.recv() {
let raw_block_data =
parse_downloaded_block(block_bytes).expect("unable to parse block");
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
.expect("unable to compress block");
let block_data = hord::parse_ordinals_and_standardize_block(
raw_block_data,
&moved_bitcoin_network,
&moved_ctx,
)
.expect("unable to deserialize block");
let _ = block_compressed_tx_moved.send(Some((block_data, compressed_block)));
}
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
}
let cloned_ctx = ctx.clone();
@@ -2354,52 +2356,56 @@ pub async fn rebuild_rocks_db(
let mut inbox = HashMap::new();
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
let mut blocks_processed = 0;
while let Ok(Some((block, compacted_block))) =
block_compressed_rx.recv()
{
blocks_processed += 1;
let block_index = block.block_identifier.index;
// In the context of ordinals, we're constrained to process blocks sequentially
// Blocks are processed by a threadpool and could be coming out of order.
// Inbox block for later if the current block is not the one we should be
// processing.
if block_index >= start_sequencing_blocks_at_height {
inbox.insert(block_index, (block, compacted_block));
let mut chunk = Vec::new();
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
cloned_ctx.try_log(|logger| {
info!(
logger,
"Dequeuing block #{inbox_cursor} for processing (# blocks inboxed: {})",
inbox.len()
)
});
chunk.push((block, compacted_block));
inbox_cursor += 1;
}
if chunk.is_empty() {
// Early return / wait for next block
cloned_ctx.try_log(|logger| {
info!(logger, "Inboxing compacted block #{block_index}")
});
continue;
loop {
// Dequeue all the blocks available
let mut new_blocks = vec![];
while let Ok(Some((block, compacted_block))) = block_compressed_rx.try_recv()
{
blocks_processed += 1;
new_blocks.push((block, compacted_block))
}
//
let mut ooo_processing = vec![];
for (block, compacted_block) in new_blocks.into_iter() {
let block_index = block.block_identifier.index;
if block_index >= start_sequencing_blocks_at_height {
inbox.insert(block_index, (block, compacted_block));
} else {
if let Some(ref tx) = blocks_post_processor {
let _ = tx.send(chunk);
}
ooo_processing.push((block, compacted_block));
// todo: do something
}
}
if blocks_processed == number_of_blocks_to_process {
// In order processing: construct the longest sequence of known blocks
let mut chunk = Vec::new();
while let Some((block, compacted_block)) = inbox.remove(&inbox_cursor) {
cloned_ctx.try_log(|logger| {
info!(
logger,
"Local block storage successfully seeded with #{blocks_processed} blocks"
"Adding block #{inbox_cursor} to next sequence (# blocks inboxed: {})",
inbox.len()
)
});
break;
chunk.push((block, compacted_block));
inbox_cursor += 1;
}
if !chunk.is_empty() {
if let Some(ref tx) = blocks_post_processor {
let _ = tx.send(chunk);
}
} else {
if blocks_processed == number_of_blocks_to_process {
cloned_ctx.try_log(|logger| {
info!(
logger,
"Local block storage successfully seeded with #{blocks_processed} blocks"
)
});
break;
}
}
sleep(Duration::from_secs(3));
}
()
})
@@ -2429,7 +2435,11 @@ pub async fn rebuild_rocks_db(
for tx in tx_thread_pool.iter() {
let _ = tx.send(None);
}
let _ = compression_thread.join();
for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
let _ = storage_thread.join();
let _ = set.shutdown();

View File

@@ -32,10 +32,11 @@ use chainhook_sdk::{
};
use crate::db::{
find_inscription_with_ordinal_number, find_inscriptions_at_wached_outpoint,
get_any_entry_in_ordinal_activities, insert_entry_in_blocks, insert_entry_in_inscriptions,
insert_transfer_in_locations_tx, retrieve_satoshi_point_using_lazy_storage,
retrieve_satoshi_point_using_lazy_storage_v3, InscriptionHeigthHint,
find_all_inscriptions_in_block, find_inscription_with_ordinal_number,
find_inscriptions_at_wached_outpoint, get_any_entry_in_ordinal_activities,
insert_entry_in_blocks, insert_entry_in_inscriptions, insert_transfer_in_locations_tx,
retrieve_satoshi_point_using_lazy_storage, retrieve_satoshi_point_using_lazy_storage_v3,
InscriptionHeigthHint,
};
use crate::ord::height::Height;
@@ -67,6 +68,7 @@ use crate::ord::inscription_id::InscriptionId;
pub struct HordConfig {
pub network_thread_max: usize,
pub ingestion_thread_max: usize,
pub ingestion_thread_queue_size: usize,
pub cache_size: usize,
pub db_path: PathBuf,
pub first_inscription_height: u64,
@@ -248,6 +250,7 @@ pub fn new_traversals_lazy_cache(
)
}
// TODO(lgalabru): deprecate
pub fn retrieve_inscribed_satoshi_points_from_block(
block: &BitcoinBlockData,
inscriptions_db_conn: Option<&Connection>,
@@ -482,7 +485,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
ctx.try_log(|logger| {
slog::info!(
logger,
"Block #{} processed through hord, revealing {} inscriptions [{}]",
"Block #{} revealed {} inscriptions [{}]",
new_block.block_identifier.index,
inscriptions_revealed.len(),
inscriptions_revealed.join(", ")
@@ -992,6 +995,7 @@ pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64
}
}
// TODO(lgalabru): Deprecate
pub async fn perform_hord_db_update(
start_block: u64,
end_block: u64,
@@ -1119,16 +1123,15 @@ pub fn get_transactions_to_process(
let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![];
let mut l1_cache_hits = vec![];
let mut known_transactions =
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx);
for tx in block.transactions.iter().skip(1) {
// Have a new inscription been revealed, if so, are looking at a re-inscription
for ordinal_event in tx.metadata.ordinal_operations.iter() {
let (inscription_data, _is_cursed) = match ordinal_event {
OrdinalOperation::InscriptionRevealed(inscription_data) => {
(inscription_data, false)
}
OrdinalOperation::CursedInscriptionRevealed(inscription_data) => {
(inscription_data, false)
}
let inscription_data = match ordinal_event {
OrdinalOperation::InscriptionRevealed(inscription_data) => inscription_data,
OrdinalOperation::CursedInscriptionRevealed(inscription_data) => inscription_data,
OrdinalOperation::InscriptionTransferred(_) => {
continue;
}
@@ -1142,25 +1145,17 @@ pub fn get_transactions_to_process(
continue;
}
if let Ok(Some((traversal, _))) = find_inscription_with_id(
&inscription_data.inscription_id,
&inscriptions_db_conn,
ctx,
) {
cache_l1.insert(
(
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
),
traversal,
);
} else {
// Enqueue for traversals
transactions_ids.push((
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
));
if let Some(entry) = known_transactions.remove(&key) {
l1_cache_hits.push(key.clone());
cache_l1.insert(key, entry);
continue;
}
// Enqueue for traversals
transactions_ids.push((
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
));
}
}
(transactions_ids, l1_cache_hits)
@@ -1242,11 +1237,13 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {})",
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})",
block.block_identifier.index,
transactions_ids.len(),
l1_cache_hits.len(),
next_blocks.len(),
cache_l1.len(),
cache_l2.len(),
)
});
@@ -1342,7 +1339,27 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
}
}
}
for tx in tx_thread_pool.iter() {
// Empty the queue
if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() {
if let Ok(traversal) = traversal_result {
inner_ctx.try_log(|logger| {
info!(
logger,
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
)
});
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
),
traversal,
);
}
}
let _ = tx.send(None);
}

View File

@@ -23,13 +23,7 @@ pub fn start_ordinals_number_processor(
crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>,
JoinHandle<()>,
) {
// let mut batches = HashMap::new();
let hord_config = config.get_hord_config();
let (tx, rx) = crossbeam_channel::bounded::<Vec<(BitcoinBlockData, LazyBlock)>>(
hord_config.ingestion_thread_max,
);
// let (inner_tx, inner_rx) = channel();
let (tx, rx) = crossbeam_channel::bounded::<Vec<(BitcoinBlockData, LazyBlock)>>(1);
let config = config.clone();
let ctx = ctx.clone();

View File

@@ -136,14 +136,8 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
// Evaluating every single block is required for also keeping track of transfers.
let local_traverals =
find_all_inscriptions_in_block(&cursor, &inscriptions_db_conn, &ctx);
for (transaction_identifier, traversal_result) in local_traverals.into_iter() {
traversals.insert(
(
transaction_identifier,
traversal_result.inscription_input_index,
),
traversal_result,
);
for (key, traversal_result) in local_traverals.into_iter() {
traversals.insert(key, traversal_result);
}
let transaction = inscriptions_db_conn.transaction().unwrap();