feat: in-house thread pool

This commit is contained in:
Ludo Galabru
2023-08-01 10:22:10 +02:00
parent 2f172e0a8a
commit bc5ffddb5b
2 changed files with 34 additions and 34 deletions

View File

@@ -2326,7 +2326,8 @@ pub async fn rebuild_rocks_db(
let moved_bitcoin_network = moved_bitcoin_network.clone();
compress_block_data_pool.execute(move || {
while let Ok(Some(block_bytes)) = rx.recv() {
let raw_block_data = parse_downloaded_block(block_bytes).expect("unable to parse block");
let raw_block_data =
parse_downloaded_block(block_bytes).expect("unable to parse block");
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
.expect("unable to compress block");
let block_data = hord::parse_ordinals_and_standardize_block(
@@ -2422,12 +2423,7 @@ pub async fn rebuild_rocks_db(
thread_index = (thread_index + 1) % hord_config.ingestion_thread_max;
}
ctx.try_log(|logger| {
info!(
logger,
"Gargbage collecting will start"
)
});
ctx.try_log(|logger| info!(logger, "Gargbage collecting will start"));
for tx in tx_thread_pool.iter() {
let _ = tx.send(None);
@@ -2436,12 +2432,7 @@ pub async fn rebuild_rocks_db(
let _ = storage_thread.join();
let _ = set.shutdown();
ctx.try_log(|logger| {
info!(
logger,
"Gargbage collecting did finish"
)
});
ctx.try_log(|logger| info!(logger, "Gargbage collecting did finish"));
// match guard.report().build() {
// Ok(report) => {

View File

@@ -1192,8 +1192,8 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
let (traversal_tx, traversal_rx) = channel();
let traversal_data_pool = ThreadPool::new(thread_max);
let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
for thread_index in 0..thread_max {
let (tx, rx) = channel();
@@ -1204,23 +1204,30 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
let moved_hord_db_path = hord_config.db_path.clone();
let local_cache = cache_l2.clone();
traversal_data_pool.execute(move || {
while let Ok(Some((transaction_id, block_identifier, input_index, prioritary))) =
rx.recv()
{
let traversal: Result<TraversalResult, String> =
retrieve_satoshi_point_using_lazy_storage_v3(
&moved_hord_db_path,
&block_identifier,
&transaction_id,
input_index,
0,
&local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
}
});
let handle = hiro_system_kit::thread_named("Worker")
.spawn(move || {
while let Ok(Some((
transaction_id,
block_identifier,
input_index,
prioritary,
))) = rx.recv()
{
let traversal: Result<TraversalResult, String> =
retrieve_satoshi_point_using_lazy_storage_v3(
&moved_hord_db_path,
&block_identifier,
&transaction_id,
input_index,
0,
&local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
}
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
}
// Empty cache
@@ -1335,12 +1342,14 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
}
}
}
for thread_index in 0..thread_max {
let _ = tx_thread_pool[thread_index].send(None);
for tx in tx_thread_pool.iter() {
let _ = tx.send(None);
}
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
let _ = traversal_data_pool.join();
for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
});
} else {
ctx.try_log(|logger| {