fix: remove thread_max * 2

This commit is contained in:
Ludo Galabru
2023-08-13 15:39:56 +02:00
parent a1ffc1a59a
commit 359c6f9422

View File

@@ -59,193 +59,197 @@ pub fn parallelize_inscription_data_computations(
let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty();
let thread_max = hord_config.ingestion_thread_max * 2;
let thread_max = hord_config.ingestion_thread_max;
// Nothing to do? early return
if has_transactions_to_process {
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
let (traversal_tx, traversal_rx) = channel();
return Ok(false)
}
let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
let (traversal_tx, traversal_rx) = channel();
for thread_index in 0..thread_max {
let (tx, rx) = channel();
tx_thread_pool.push(tx);
let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = inner_ctx.clone();
let moved_hord_db_path = hord_config.db_path.clone();
let local_cache = cache_l2.clone();
for thread_index in 0..thread_max {
let (tx, rx) = channel();
tx_thread_pool.push(tx);
let handle = hiro_system_kit::thread_named("Worker")
.spawn(move || {
while let Ok(Some((
transaction_id,
block_identifier,
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = inner_ctx.clone();
let moved_hord_db_path = hord_config.db_path.clone();
let local_cache = cache_l2.clone();
let handle = hiro_system_kit::thread_named("Worker")
.spawn(move || {
while let Ok(Some((
transaction_id,
block_identifier,
input_index,
prioritary,
))) = rx.recv()
{
let traversal: Result<TraversalResult, String> = compute_satoshi_number(
&moved_hord_db_path,
&block_identifier,
&transaction_id,
input_index,
prioritary,
))) = rx.recv()
{
let traversal: Result<TraversalResult, String> = compute_satoshi_number(
&moved_hord_db_path,
&block_identifier,
&transaction_id,
0,
&local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
}
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
}
// Empty cache
let mut thread_index = 0;
for key in l1_cache_hits.iter() {
if let Some(entry) = cache_l1.remove(key) {
let _ = traversal_tx.send((Ok(entry), true, thread_index));
thread_index = (thread_index + 1) % thread_max;
}
}
ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})",
block.block_identifier.index,
transactions_ids.len(),
l1_cache_hits.len(),
next_blocks.len(),
cache_l1.len(),
cache_l2.len(),
)
});
let mut rng = thread_rng();
transactions_ids.shuffle(&mut rng);
let mut priority_queue = VecDeque::new();
let mut warmup_queue = VecDeque::new();
for (transaction_id, input_index) in transactions_ids.into_iter() {
priority_queue.push_back((
transaction_id,
block.block_identifier.clone(),
input_index,
true,
));
}
// Feed each workers with 2 workitems each
for thread_index in 0..thread_max {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
for thread_index in 0..thread_max {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
let mut next_block_iter = next_blocks.iter();
let mut traversals_received = 0;
while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() {
if prioritary {
traversals_received += 1;
}
match traversal_result {
Ok(traversal) => {
inner_ctx.try_log(|logger| {
info!(
logger,
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
)
});
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
),
traversal,
);
}
Err(e) => {
ctx.try_log(|logger| {
error!(logger, "Unable to compute inscription's Satoshi: {e}",)
});
}
}
if traversals_received == expected_traversals {
break;
}
if let Some(w) = priority_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(w) = warmup_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(next_block) = next_block_iter.next() {
let (mut transactions_ids, _) = get_transactions_to_process(
next_block,
cache_l1,
inscriptions_db_tx,
ctx,
);
ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to pre-process: {}",
block.block_identifier.index,
transactions_ids.len()
)
});
transactions_ids.shuffle(&mut rng);
for (transaction_id, input_index) in transactions_ids.into_iter() {
warmup_queue.push_back((
transaction_id,
next_block.block_identifier.clone(),
input_index,
0,
&local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
false,
));
}
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
}
// Empty cache
let mut thread_index = 0;
for key in l1_cache_hits.iter() {
if let Some(entry) = cache_l1.remove(key) {
let _ = traversal_tx.send((Ok(entry), true, thread_index));
thread_index = (thread_index + 1) % thread_max;
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
}
}
}
}
ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})",
block.block_identifier.index,
transactions_ids.len(),
l1_cache_hits.len(),
next_blocks.len(),
cache_l1.len(),
cache_l2.len(),
)
});
let mut rng = thread_rng();
transactions_ids.shuffle(&mut rng);
let mut priority_queue = VecDeque::new();
let mut warmup_queue = VecDeque::new();
for (transaction_id, input_index) in transactions_ids.into_iter() {
priority_queue.push_back((
transaction_id,
block.block_identifier.clone(),
input_index,
true,
));
}
// Feed each workers with 2 workitems each
for thread_index in 0..thread_max {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
for thread_index in 0..thread_max {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
let mut next_block_iter = next_blocks.iter();
let mut traversals_received = 0;
while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() {
if prioritary {
traversals_received += 1;
}
match traversal_result {
Ok(traversal) => {
inner_ctx.try_log(|logger| {
for tx in tx_thread_pool.iter() {
// Empty the queue
if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() {
if let Ok(traversal) = traversal_result {
inner_ctx.try_log(|logger| {
info!(
logger,
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
)
});
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
),
traversal,
);
}
Err(e) => {
ctx.try_log(|logger| {
error!(logger, "Unable to compute inscription's Satoshi: {e}",)
});
}
}
if traversals_received == expected_traversals {
break;
}
if let Some(w) = priority_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(w) = warmup_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(next_block) = next_block_iter.next() {
let (mut transactions_ids, _) = get_transactions_to_process(
next_block,
cache_l1,
inscriptions_db_tx,
ctx,
);
ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to pre-process: {}",
block.block_identifier.index,
transactions_ids.len()
)
});
transactions_ids.shuffle(&mut rng);
for (transaction_id, input_index) in transactions_ids.into_iter() {
warmup_queue.push_back((
transaction_id,
next_block.block_identifier.clone(),
input_index,
false,
));
}
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
}
}
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
),
traversal,
);
}
}
for tx in tx_thread_pool.iter() {
// Empty the queue
if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() {
if let Ok(traversal) = traversal_result {
inner_ctx.try_log(|logger| {
info!(
logger,
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
)
});
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
),
traversal,
);
}
}
let _ = tx.send(None);
}
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
});
let _ = tx.send(None);
}
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
});
Ok(has_transactions_to_process)
}