mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-15 00:49:30 +08:00
fix: remove thread_max * 2
This commit is contained in:
@@ -59,193 +59,197 @@ pub fn parallelize_inscription_data_computations(
|
||||
|
||||
let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty();
|
||||
|
||||
let thread_max = hord_config.ingestion_thread_max * 2;
|
||||
let thread_max = hord_config.ingestion_thread_max;
|
||||
|
||||
// Nothing to do? early return
|
||||
if has_transactions_to_process {
|
||||
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
|
||||
let (traversal_tx, traversal_rx) = channel();
|
||||
return Ok(false)
|
||||
}
|
||||
|
||||
let mut tx_thread_pool = vec![];
|
||||
let mut thread_pool_handles = vec![];
|
||||
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
|
||||
let (traversal_tx, traversal_rx) = channel();
|
||||
|
||||
for thread_index in 0..thread_max {
|
||||
let (tx, rx) = channel();
|
||||
tx_thread_pool.push(tx);
|
||||
let mut tx_thread_pool = vec![];
|
||||
let mut thread_pool_handles = vec![];
|
||||
|
||||
let moved_traversal_tx = traversal_tx.clone();
|
||||
let moved_ctx = inner_ctx.clone();
|
||||
let moved_hord_db_path = hord_config.db_path.clone();
|
||||
let local_cache = cache_l2.clone();
|
||||
for thread_index in 0..thread_max {
|
||||
let (tx, rx) = channel();
|
||||
tx_thread_pool.push(tx);
|
||||
|
||||
let handle = hiro_system_kit::thread_named("Worker")
|
||||
.spawn(move || {
|
||||
while let Ok(Some((
|
||||
transaction_id,
|
||||
block_identifier,
|
||||
let moved_traversal_tx = traversal_tx.clone();
|
||||
let moved_ctx = inner_ctx.clone();
|
||||
let moved_hord_db_path = hord_config.db_path.clone();
|
||||
let local_cache = cache_l2.clone();
|
||||
|
||||
let handle = hiro_system_kit::thread_named("Worker")
|
||||
.spawn(move || {
|
||||
while let Ok(Some((
|
||||
transaction_id,
|
||||
block_identifier,
|
||||
input_index,
|
||||
prioritary,
|
||||
))) = rx.recv()
|
||||
{
|
||||
let traversal: Result<TraversalResult, String> = compute_satoshi_number(
|
||||
&moved_hord_db_path,
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
input_index,
|
||||
prioritary,
|
||||
))) = rx.recv()
|
||||
{
|
||||
let traversal: Result<TraversalResult, String> = compute_satoshi_number(
|
||||
&moved_hord_db_path,
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
0,
|
||||
&local_cache,
|
||||
&moved_ctx,
|
||||
);
|
||||
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
thread_pool_handles.push(handle);
|
||||
}
|
||||
|
||||
// Empty cache
|
||||
let mut thread_index = 0;
|
||||
for key in l1_cache_hits.iter() {
|
||||
if let Some(entry) = cache_l1.remove(key) {
|
||||
let _ = traversal_tx.send((Ok(entry), true, thread_index));
|
||||
thread_index = (thread_index + 1) % thread_max;
|
||||
}
|
||||
}
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})",
|
||||
block.block_identifier.index,
|
||||
transactions_ids.len(),
|
||||
l1_cache_hits.len(),
|
||||
next_blocks.len(),
|
||||
cache_l1.len(),
|
||||
cache_l2.len(),
|
||||
)
|
||||
});
|
||||
|
||||
let mut rng = thread_rng();
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
let mut priority_queue = VecDeque::new();
|
||||
let mut warmup_queue = VecDeque::new();
|
||||
|
||||
for (transaction_id, input_index) in transactions_ids.into_iter() {
|
||||
priority_queue.push_back((
|
||||
transaction_id,
|
||||
block.block_identifier.clone(),
|
||||
input_index,
|
||||
true,
|
||||
));
|
||||
}
|
||||
|
||||
// Feed each workers with 2 workitems each
|
||||
for thread_index in 0..thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
|
||||
}
|
||||
for thread_index in 0..thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
|
||||
}
|
||||
|
||||
let mut next_block_iter = next_blocks.iter();
|
||||
let mut traversals_received = 0;
|
||||
while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() {
|
||||
if prioritary {
|
||||
traversals_received += 1;
|
||||
}
|
||||
match traversal_result {
|
||||
Ok(traversal) => {
|
||||
inner_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
|
||||
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
|
||||
)
|
||||
});
|
||||
cache_l1.insert(
|
||||
(
|
||||
traversal.transaction_identifier_inscription.clone(),
|
||||
traversal.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "Unable to compute inscription's Satoshi: {e}",)
|
||||
});
|
||||
}
|
||||
}
|
||||
if traversals_received == expected_traversals {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(w) = priority_queue.pop_front() {
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(w) = warmup_queue.pop_front() {
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(next_block) = next_block_iter.next() {
|
||||
let (mut transactions_ids, _) = get_transactions_to_process(
|
||||
next_block,
|
||||
cache_l1,
|
||||
inscriptions_db_tx,
|
||||
ctx,
|
||||
);
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Number of inscriptions in block #{} to pre-process: {}",
|
||||
block.block_identifier.index,
|
||||
transactions_ids.len()
|
||||
)
|
||||
});
|
||||
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
for (transaction_id, input_index) in transactions_ids.into_iter() {
|
||||
warmup_queue.push_back((
|
||||
transaction_id,
|
||||
next_block.block_identifier.clone(),
|
||||
input_index,
|
||||
0,
|
||||
&local_cache,
|
||||
&moved_ctx,
|
||||
);
|
||||
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
|
||||
false,
|
||||
));
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
thread_pool_handles.push(handle);
|
||||
}
|
||||
|
||||
// Empty cache
|
||||
let mut thread_index = 0;
|
||||
for key in l1_cache_hits.iter() {
|
||||
if let Some(entry) = cache_l1.remove(key) {
|
||||
let _ = traversal_tx.send((Ok(entry), true, thread_index));
|
||||
thread_index = (thread_index + 1) % thread_max;
|
||||
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})",
|
||||
block.block_identifier.index,
|
||||
transactions_ids.len(),
|
||||
l1_cache_hits.len(),
|
||||
next_blocks.len(),
|
||||
cache_l1.len(),
|
||||
cache_l2.len(),
|
||||
)
|
||||
});
|
||||
|
||||
let mut rng = thread_rng();
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
let mut priority_queue = VecDeque::new();
|
||||
let mut warmup_queue = VecDeque::new();
|
||||
|
||||
for (transaction_id, input_index) in transactions_ids.into_iter() {
|
||||
priority_queue.push_back((
|
||||
transaction_id,
|
||||
block.block_identifier.clone(),
|
||||
input_index,
|
||||
true,
|
||||
));
|
||||
}
|
||||
|
||||
// Feed each workers with 2 workitems each
|
||||
for thread_index in 0..thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
|
||||
}
|
||||
for thread_index in 0..thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
|
||||
}
|
||||
|
||||
let mut next_block_iter = next_blocks.iter();
|
||||
let mut traversals_received = 0;
|
||||
while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() {
|
||||
if prioritary {
|
||||
traversals_received += 1;
|
||||
}
|
||||
match traversal_result {
|
||||
Ok(traversal) => {
|
||||
inner_ctx.try_log(|logger| {
|
||||
for tx in tx_thread_pool.iter() {
|
||||
// Empty the queue
|
||||
if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() {
|
||||
if let Ok(traversal) = traversal_result {
|
||||
inner_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
|
||||
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
|
||||
)
|
||||
});
|
||||
cache_l1.insert(
|
||||
(
|
||||
traversal.transaction_identifier_inscription.clone(),
|
||||
traversal.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "Unable to compute inscription's Satoshi: {e}",)
|
||||
});
|
||||
}
|
||||
}
|
||||
if traversals_received == expected_traversals {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(w) = priority_queue.pop_front() {
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(w) = warmup_queue.pop_front() {
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(next_block) = next_block_iter.next() {
|
||||
let (mut transactions_ids, _) = get_transactions_to_process(
|
||||
next_block,
|
||||
cache_l1,
|
||||
inscriptions_db_tx,
|
||||
ctx,
|
||||
);
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Number of inscriptions in block #{} to pre-process: {}",
|
||||
block.block_identifier.index,
|
||||
transactions_ids.len()
|
||||
)
|
||||
});
|
||||
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
for (transaction_id, input_index) in transactions_ids.into_iter() {
|
||||
warmup_queue.push_back((
|
||||
transaction_id,
|
||||
next_block.block_identifier.clone(),
|
||||
input_index,
|
||||
false,
|
||||
));
|
||||
}
|
||||
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
|
||||
}
|
||||
}
|
||||
cache_l1.insert(
|
||||
(
|
||||
traversal.transaction_identifier_inscription.clone(),
|
||||
traversal.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
for tx in tx_thread_pool.iter() {
|
||||
// Empty the queue
|
||||
if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() {
|
||||
if let Ok(traversal) = traversal_result {
|
||||
inner_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
|
||||
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
|
||||
)
|
||||
});
|
||||
cache_l1.insert(
|
||||
(
|
||||
traversal.transaction_identifier_inscription.clone(),
|
||||
traversal.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
}
|
||||
let _ = tx.send(None);
|
||||
}
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
|
||||
for handle in thread_pool_handles.into_iter() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
});
|
||||
let _ = tx.send(None);
|
||||
}
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
|
||||
for handle in thread_pool_handles.into_iter() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
});
|
||||
|
||||
Ok(has_transactions_to_process)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user