mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 16:52:57 +08:00
feat: in-house thread pool
This commit is contained in:
@@ -2326,7 +2326,8 @@ pub async fn rebuild_rocks_db(
|
|||||||
let moved_bitcoin_network = moved_bitcoin_network.clone();
|
let moved_bitcoin_network = moved_bitcoin_network.clone();
|
||||||
compress_block_data_pool.execute(move || {
|
compress_block_data_pool.execute(move || {
|
||||||
while let Ok(Some(block_bytes)) = rx.recv() {
|
while let Ok(Some(block_bytes)) = rx.recv() {
|
||||||
let raw_block_data = parse_downloaded_block(block_bytes).expect("unable to parse block");
|
let raw_block_data =
|
||||||
|
parse_downloaded_block(block_bytes).expect("unable to parse block");
|
||||||
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
|
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
|
||||||
.expect("unable to compress block");
|
.expect("unable to compress block");
|
||||||
let block_data = hord::parse_ordinals_and_standardize_block(
|
let block_data = hord::parse_ordinals_and_standardize_block(
|
||||||
@@ -2422,12 +2423,7 @@ pub async fn rebuild_rocks_db(
|
|||||||
thread_index = (thread_index + 1) % hord_config.ingestion_thread_max;
|
thread_index = (thread_index + 1) % hord_config.ingestion_thread_max;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.try_log(|logger| {
|
ctx.try_log(|logger| info!(logger, "Gargbage collecting will start"));
|
||||||
info!(
|
|
||||||
logger,
|
|
||||||
"Gargbage collecting will start"
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
for tx in tx_thread_pool.iter() {
|
for tx in tx_thread_pool.iter() {
|
||||||
let _ = tx.send(None);
|
let _ = tx.send(None);
|
||||||
@@ -2436,12 +2432,7 @@ pub async fn rebuild_rocks_db(
|
|||||||
let _ = storage_thread.join();
|
let _ = storage_thread.join();
|
||||||
let _ = set.shutdown();
|
let _ = set.shutdown();
|
||||||
|
|
||||||
ctx.try_log(|logger| {
|
ctx.try_log(|logger| info!(logger, "Gargbage collecting did finish"));
|
||||||
info!(
|
|
||||||
logger,
|
|
||||||
"Gargbage collecting did finish"
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
// match guard.report().build() {
|
// match guard.report().build() {
|
||||||
// Ok(report) => {
|
// Ok(report) => {
|
||||||
|
|||||||
@@ -1192,8 +1192,8 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
|
|||||||
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
|
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
|
||||||
let (traversal_tx, traversal_rx) = channel();
|
let (traversal_tx, traversal_rx) = channel();
|
||||||
|
|
||||||
let traversal_data_pool = ThreadPool::new(thread_max);
|
|
||||||
let mut tx_thread_pool = vec![];
|
let mut tx_thread_pool = vec![];
|
||||||
|
let mut thread_pool_handles = vec![];
|
||||||
|
|
||||||
for thread_index in 0..thread_max {
|
for thread_index in 0..thread_max {
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
@@ -1204,23 +1204,30 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
|
|||||||
let moved_hord_db_path = hord_config.db_path.clone();
|
let moved_hord_db_path = hord_config.db_path.clone();
|
||||||
let local_cache = cache_l2.clone();
|
let local_cache = cache_l2.clone();
|
||||||
|
|
||||||
traversal_data_pool.execute(move || {
|
let handle = hiro_system_kit::thread_named("Worker")
|
||||||
while let Ok(Some((transaction_id, block_identifier, input_index, prioritary))) =
|
.spawn(move || {
|
||||||
rx.recv()
|
while let Ok(Some((
|
||||||
{
|
transaction_id,
|
||||||
let traversal: Result<TraversalResult, String> =
|
block_identifier,
|
||||||
retrieve_satoshi_point_using_lazy_storage_v3(
|
input_index,
|
||||||
&moved_hord_db_path,
|
prioritary,
|
||||||
&block_identifier,
|
))) = rx.recv()
|
||||||
&transaction_id,
|
{
|
||||||
input_index,
|
let traversal: Result<TraversalResult, String> =
|
||||||
0,
|
retrieve_satoshi_point_using_lazy_storage_v3(
|
||||||
&local_cache,
|
&moved_hord_db_path,
|
||||||
&moved_ctx,
|
&block_identifier,
|
||||||
);
|
&transaction_id,
|
||||||
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
|
input_index,
|
||||||
}
|
0,
|
||||||
});
|
&local_cache,
|
||||||
|
&moved_ctx,
|
||||||
|
);
|
||||||
|
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.expect("unable to spawn thread");
|
||||||
|
thread_pool_handles.push(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Empty cache
|
// Empty cache
|
||||||
@@ -1335,12 +1342,14 @@ pub fn retrieve_inscribed_satoshi_points_from_block_v3(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for thread_index in 0..thread_max {
|
for tx in tx_thread_pool.iter() {
|
||||||
let _ = tx_thread_pool[thread_index].send(None);
|
let _ = tx.send(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
|
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
|
||||||
let _ = traversal_data_pool.join();
|
for handle in thread_pool_handles.into_iter() {
|
||||||
|
let _ = handle.join();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
ctx.try_log(|logger| {
|
ctx.try_log(|logger| {
|
||||||
|
|||||||
Reference in New Issue
Block a user