feat: in-house thread pool

This commit is contained in:
Ludo Galabru
2023-08-01 10:22:10 +02:00
parent 2f172e0a8a
commit bc5ffddb5b
2 changed files with 34 additions and 34 deletions

View File

@@ -2326,7 +2326,8 @@ pub async fn rebuild_rocks_db(
let moved_bitcoin_network = moved_bitcoin_network.clone(); let moved_bitcoin_network = moved_bitcoin_network.clone();
compress_block_data_pool.execute(move || { compress_block_data_pool.execute(move || {
while let Ok(Some(block_bytes)) = rx.recv() { while let Ok(Some(block_bytes)) = rx.recv() {
let raw_block_data = parse_downloaded_block(block_bytes).expect("unable to parse block"); let raw_block_data =
parse_downloaded_block(block_bytes).expect("unable to parse block");
let compressed_block = LazyBlock::from_full_block(&raw_block_data) let compressed_block = LazyBlock::from_full_block(&raw_block_data)
.expect("unable to compress block"); .expect("unable to compress block");
let block_data = hord::parse_ordinals_and_standardize_block( let block_data = hord::parse_ordinals_and_standardize_block(
@@ -2422,12 +2423,7 @@ pub async fn rebuild_rocks_db(
thread_index = (thread_index + 1) % hord_config.ingestion_thread_max; thread_index = (thread_index + 1) % hord_config.ingestion_thread_max;
} }
ctx.try_log(|logger| { ctx.try_log(|logger| info!(logger, "Gargbage collecting will start"));
info!(
logger,
"Gargbage collecting will start"
)
});
for tx in tx_thread_pool.iter() { for tx in tx_thread_pool.iter() {
let _ = tx.send(None); let _ = tx.send(None);
@@ -2436,12 +2432,7 @@ pub async fn rebuild_rocks_db(
let _ = storage_thread.join(); let _ = storage_thread.join();
let _ = set.shutdown(); let _ = set.shutdown();
ctx.try_log(|logger| { ctx.try_log(|logger| info!(logger, "Gargbage collecting did finish"));
info!(
logger,
"Gargbage collecting did finish"
)
});
// match guard.report().build() { // match guard.report().build() {
// Ok(report) => { // Ok(report) => {

View File

@@ -1192,8 +1192,8 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
let expected_traversals = transactions_ids.len() + l1_cache_hits.len(); let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
let (traversal_tx, traversal_rx) = channel(); let (traversal_tx, traversal_rx) = channel();
let traversal_data_pool = ThreadPool::new(thread_max);
let mut tx_thread_pool = vec![]; let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
for thread_index in 0..thread_max { for thread_index in 0..thread_max {
let (tx, rx) = channel(); let (tx, rx) = channel();
@@ -1204,23 +1204,30 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
let moved_hord_db_path = hord_config.db_path.clone(); let moved_hord_db_path = hord_config.db_path.clone();
let local_cache = cache_l2.clone(); let local_cache = cache_l2.clone();
traversal_data_pool.execute(move || { let handle = hiro_system_kit::thread_named("Worker")
while let Ok(Some((transaction_id, block_identifier, input_index, prioritary))) = .spawn(move || {
rx.recv() while let Ok(Some((
{ transaction_id,
let traversal: Result<TraversalResult, String> = block_identifier,
retrieve_satoshi_point_using_lazy_storage_v3( input_index,
&moved_hord_db_path, prioritary,
&block_identifier, ))) = rx.recv()
&transaction_id, {
input_index, let traversal: Result<TraversalResult, String> =
0, retrieve_satoshi_point_using_lazy_storage_v3(
&local_cache, &moved_hord_db_path,
&moved_ctx, &block_identifier,
); &transaction_id,
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index)); input_index,
} 0,
}); &local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
}
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
} }
// Empty cache // Empty cache
@@ -1335,12 +1342,14 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
} }
} }
} }
for thread_index in 0..thread_max { for tx in tx_thread_pool.iter() {
let _ = tx_thread_pool[thread_index].send(None); let _ = tx.send(None);
} }
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || { let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
let _ = traversal_data_pool.join(); for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
}); });
} else { } else {
ctx.try_log(|logger| { ctx.try_log(|logger| {