mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 16:52:57 +08:00
feat: add shared cache
This commit is contained in:
49
Cargo.lock
generated
49
Cargo.lock
generated
@@ -541,7 +541,9 @@ dependencies = [
|
||||
"clarity-repl",
|
||||
"crossbeam-channel 0.5.8",
|
||||
"ctrlc",
|
||||
"dashmap 5.4.0",
|
||||
"futures",
|
||||
"fxhash",
|
||||
"hex",
|
||||
"hex-simd",
|
||||
"hiro-system-kit",
|
||||
@@ -562,8 +564,6 @@ dependencies = [
|
||||
"threadpool",
|
||||
"tokio",
|
||||
"toml",
|
||||
"zerocopy",
|
||||
"zerocopy-derive",
|
||||
"zeromq",
|
||||
]
|
||||
|
||||
@@ -1240,6 +1240,19 @@ dependencies = [
|
||||
"num_cpus",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "5.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"hashbrown 0.12.3",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core 0.9.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "debug_types"
|
||||
version = "1.0.0"
|
||||
@@ -1634,6 +1647,15 @@ dependencies = [
|
||||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fxhash"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "generator"
|
||||
version = "0.7.2"
|
||||
@@ -5104,27 +5126,6 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"zerocopy-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy-derive"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zeroize"
|
||||
version = "1.5.7"
|
||||
@@ -5141,7 +5142,7 @@ dependencies = [
|
||||
"asynchronous-codec",
|
||||
"bytes",
|
||||
"crossbeam",
|
||||
"dashmap",
|
||||
"dashmap 3.11.10",
|
||||
"enum-primitive-derive",
|
||||
"futures",
|
||||
"futures-util",
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::sync::mpsc::channel;
|
||||
|
||||
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
|
||||
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
|
||||
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 12;
|
||||
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 1;
|
||||
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 12;
|
||||
|
||||
pub struct Service {
|
||||
@@ -147,8 +147,8 @@ impl Service {
|
||||
&mut moved_config,
|
||||
&moved_ctx,
|
||||
);
|
||||
let end_block = match hiro_system_kit::nestable_block_on(op) {
|
||||
Ok(end_block) => end_block,
|
||||
let last_block_in_csv = match hiro_system_kit::nestable_block_on(op) {
|
||||
Ok(last_block_in_csv) => last_block_in_csv,
|
||||
Err(e) => {
|
||||
error!(
|
||||
moved_ctx.expect_logger(),
|
||||
@@ -159,7 +159,8 @@ impl Service {
|
||||
};
|
||||
info!(
|
||||
moved_ctx.expect_logger(),
|
||||
"Stacks chainstate scan completed up to block: {}", end_block.index
|
||||
"Stacks chainstate scan completed up to block: {}",
|
||||
last_block_in_csv.index
|
||||
);
|
||||
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
|
||||
ChainhookSpecification::Stacks(predicate_spec),
|
||||
@@ -259,15 +260,7 @@ impl Service {
|
||||
}
|
||||
match chainhook {
|
||||
ChainhookSpecification::Stacks(predicate_spec) => {
|
||||
// let _ = stacks_scan_op_tx.send((predicate_spec, api_key));
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Enabling stacks predicate {}", predicate_spec.uuid
|
||||
);
|
||||
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
|
||||
ChainhookSpecification::Stacks(predicate_spec),
|
||||
api_key,
|
||||
));
|
||||
let _ = stacks_scan_op_tx.send((predicate_spec, api_key));
|
||||
}
|
||||
ChainhookSpecification::Bitcoin(predicate_spec) => {
|
||||
let _ = bitcoin_scan_op_tx.send((predicate_spec, api_key));
|
||||
|
||||
@@ -50,8 +50,8 @@ rand = "0.8.5"
|
||||
hex-simd = "0.8.0"
|
||||
serde_cbor = "0.11.2"
|
||||
zeromq = { version = "*", default-features = false, features = ["tokio-runtime", "tcp-transport"] }
|
||||
zerocopy = "0.6.1"
|
||||
zerocopy-derive = "0.3.2"
|
||||
dashmap = "5.4.0"
|
||||
fxhash = "0.2.1"
|
||||
|
||||
[dependencies.rocksdb]
|
||||
version = "0.20.1"
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
hash::BuildHasherDefault,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use chainhook_types::{
|
||||
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use fxhash::FxHasher;
|
||||
use hiro_system_kit::slog;
|
||||
|
||||
use rocksdb::DB;
|
||||
@@ -418,6 +422,7 @@ pub fn open_readonly_hord_db_conn_rocks_db(
|
||||
let mut opts = rocksdb::Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.set_max_open_files(5000);
|
||||
opts.set_disable_auto_compactions(true);
|
||||
let db = DB::open_for_read_only(&opts, path, false)
|
||||
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
|
||||
Ok(db)
|
||||
@@ -431,11 +436,22 @@ pub fn open_readwrite_hord_db_conn_rocks_db(
|
||||
let mut opts = rocksdb::Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.set_max_open_files(5000);
|
||||
opts.set_disable_auto_compactions(true);
|
||||
let db = DB::open(&opts, path)
|
||||
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
pub fn archive_hord_db_conn_rocks_db(base_dir: &PathBuf, _ctx: &Context) {
|
||||
let from = get_default_hord_db_file_path_rocks_db(&base_dir);
|
||||
let to = {
|
||||
let mut destination_path = base_dir.clone();
|
||||
destination_path.push("hord.rocksdb_archive");
|
||||
destination_path
|
||||
};
|
||||
let _ = std::fs::rename(from, to);
|
||||
}
|
||||
|
||||
// Legacy - to remove after migrations
|
||||
pub fn find_block_at_block_height_sqlite(
|
||||
block_height: u32,
|
||||
@@ -1058,11 +1074,24 @@ impl TraversalResult {
|
||||
}
|
||||
}
|
||||
|
||||
// May 05 21:48:55.191 INFO Computing ordinal number for Satoshi point 0x5489d47538302148cd524f0ab1cc13223f3dc089cd5267d4cd45ccf1d532b743:0:0 (block #788201)
|
||||
// May 05 21:50:36.807 INFO Satoshi #147405521136231 was minted in block #29481 at offset 521136231 and was transferred 2858 times (progress: 2379/2379).
|
||||
|
||||
// May 05 22:04:23.767 INFO Computing ordinal number for Satoshi point 0x57baf2e41fe5ffc70fc63129a1208c77606dd94d9ec05097a47ee557d8653c74:0:0 (block #788201)
|
||||
// May 05 22:04:56.009 INFO Satoshi #1122896574049767 was minted in block #239158 at offset 1574049767 and was transferred 10634 times (progress: 2379/2379).
|
||||
|
||||
pub fn retrieve_satoshi_point_using_local_storage(
|
||||
blocks_db: &DB,
|
||||
block_identifier: &BlockIdentifier,
|
||||
transaction_identifier: &TransactionIdentifier,
|
||||
inscription_number: u64,
|
||||
cache: Arc<
|
||||
DashMap<
|
||||
(u32, [u8; 8]),
|
||||
(Vec<([u8; 8], u32, u16, u64)>, Vec<u64>),
|
||||
BuildHasherDefault<FxHasher>,
|
||||
>,
|
||||
>,
|
||||
ctx: &Context,
|
||||
) -> Result<TraversalResult, String> {
|
||||
ctx.try_log(|logger| {
|
||||
@@ -1084,10 +1113,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
};
|
||||
let mut tx_cursor = (txid, 0);
|
||||
let mut hops: u32 = 0;
|
||||
let mut local_block_cache = HashMap::new();
|
||||
loop {
|
||||
local_block_cache.clear();
|
||||
|
||||
hops += 1;
|
||||
if hops as u64 > block_identifier.index {
|
||||
return Err(format!(
|
||||
@@ -1096,17 +1122,77 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
));
|
||||
}
|
||||
|
||||
let block = match local_block_cache.get(&ordinal_block_number) {
|
||||
if let Some(cached_tx) = cache.get(&(ordinal_block_number, tx_cursor.0)) {
|
||||
let (inputs, outputs) = cached_tx.value();
|
||||
let mut next_found_in_cache = false;
|
||||
|
||||
let mut sats_out = 0;
|
||||
for (index, output_value) in outputs.iter().enumerate() {
|
||||
if index == tx_cursor.1 {
|
||||
break;
|
||||
}
|
||||
// ctx.try_log(|logger| {
|
||||
// slog::info!(logger, "Adding {} from output #{}", output_value, index)
|
||||
// });
|
||||
sats_out += output_value;
|
||||
}
|
||||
sats_out += ordinal_offset;
|
||||
// ctx.try_log(|logger| {
|
||||
// slog::info!(
|
||||
// logger,
|
||||
// "Adding offset {ordinal_offset} to sats_out {sats_out}"
|
||||
// )
|
||||
// });
|
||||
|
||||
let mut sats_in = 0;
|
||||
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
|
||||
sats_in += txin_value;
|
||||
// ctx.try_log(|logger| {
|
||||
// slog::info!(
|
||||
// logger,
|
||||
// "Adding txin_value {txin_value} to sats_in {sats_in} (txin: {})",
|
||||
// hex::encode(&txin)
|
||||
// )
|
||||
// });
|
||||
|
||||
if sats_out < sats_in {
|
||||
ordinal_offset = sats_out - (sats_in - txin_value);
|
||||
ordinal_block_number = *block_height;
|
||||
|
||||
// ctx.try_log(|logger| slog::info!(logger, "Block {ordinal_block_number} / Tx {} / [in:{sats_in}, out:{sats_out}]: {block_height} -> {ordinal_block_number}:{ordinal_offset} -> {}:{vout}",
|
||||
// hex::encode(&txid_n),
|
||||
// hex::encode(&txin)));
|
||||
tx_cursor = (txin.clone(), *vout as usize);
|
||||
next_found_in_cache = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if next_found_in_cache {
|
||||
continue;
|
||||
}
|
||||
|
||||
if sats_in == 0 {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
logger,
|
||||
"Transaction {} is originating from a non spending transaction",
|
||||
transaction_identifier.hash
|
||||
)
|
||||
});
|
||||
return Ok(TraversalResult {
|
||||
inscription_number: 0,
|
||||
ordinal_number: 0,
|
||||
transfers: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let block = match find_block_at_block_height(ordinal_block_number, 3, &blocks_db) {
|
||||
Some(block) => block,
|
||||
None => match find_block_at_block_height(ordinal_block_number, 3, &blocks_db) {
|
||||
Some(block) => {
|
||||
local_block_cache.insert(ordinal_block_number, block);
|
||||
local_block_cache.get(&ordinal_block_number).unwrap()
|
||||
}
|
||||
None => {
|
||||
return Err(format!("block #{ordinal_block_number} not in database"));
|
||||
}
|
||||
},
|
||||
None => {
|
||||
return Err(format!("block #{ordinal_block_number} not in database"));
|
||||
}
|
||||
};
|
||||
|
||||
let coinbase_txid = &block.0 .0 .0;
|
||||
@@ -1162,7 +1248,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
}
|
||||
} else {
|
||||
// isolate the target transaction
|
||||
for (txid_n, inputs, outputs) in block.0 .1.iter() {
|
||||
for (txid_n, inputs, outputs) in block.0 .1.into_iter() {
|
||||
// we iterate over the transactions, looking for the transaction target
|
||||
if !txid_n.eq(&txid) {
|
||||
continue;
|
||||
@@ -1191,7 +1277,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
// });
|
||||
|
||||
let mut sats_in = 0;
|
||||
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
|
||||
for (txin, block_height, vout, txin_value) in inputs.iter() {
|
||||
sats_in += txin_value;
|
||||
// ctx.try_log(|logger| {
|
||||
// slog::info!(
|
||||
@@ -1202,6 +1288,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
// });
|
||||
|
||||
if sats_out < sats_in {
|
||||
cache.insert((ordinal_block_number, txid_n), (inputs.clone(), outputs));
|
||||
ordinal_offset = sats_out - (sats_in - txin_value);
|
||||
ordinal_block_number = *block_height;
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use chainhook_types::{
|
||||
BitcoinBlockData, OrdinalInscriptionRevealData, OrdinalInscriptionTransferData,
|
||||
OrdinalOperation, TransactionIdentifier,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use hiro_system_kit::slog;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
@@ -16,6 +17,7 @@ use rusqlite::Connection;
|
||||
use std::collections::{BTreeMap, HashMap, VecDeque};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::Arc;
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
use crate::indexer::bitcoin::BitcoinTransactionFullBreakdown;
|
||||
@@ -160,50 +162,34 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
new_block: &mut BitcoinBlockData,
|
||||
blocks_db_rw: &DB,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
write_block: bool,
|
||||
pub fn retrieve_inscribed_satoshi_points_from_block(
|
||||
block: &BitcoinBlockData,
|
||||
inscriptions_db_conn: Option<&Connection>,
|
||||
hord_db_path: &PathBuf,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
if write_block {
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Updating hord.sqlite with Bitcoin block #{} for future traversals",
|
||||
new_block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
|
||||
let compacted_block = CompactedBlock::from_standardized_block(&new_block);
|
||||
insert_entry_in_blocks(
|
||||
new_block.block_identifier.index as u32,
|
||||
&compacted_block,
|
||||
&blocks_db_rw,
|
||||
&ctx,
|
||||
);
|
||||
let _ = blocks_db_rw.flush();
|
||||
}
|
||||
|
||||
) -> HashMap<TransactionIdentifier, TraversalResult> {
|
||||
let mut transactions_ids = vec![];
|
||||
let mut traversals = HashMap::new();
|
||||
|
||||
for new_tx in new_block.transactions.iter_mut().skip(1) {
|
||||
for tx in block.transactions.iter().skip(1) {
|
||||
// Have a new inscription been revealed, if so, are looking at a re-inscription
|
||||
for ordinal_event in new_tx.metadata.ordinal_operations.iter_mut() {
|
||||
for ordinal_event in tx.metadata.ordinal_operations.iter() {
|
||||
if let OrdinalOperation::InscriptionRevealed(inscription_data) = ordinal_event {
|
||||
if let Some(traversal) = find_inscription_with_id(
|
||||
&inscription_data.inscription_id,
|
||||
&new_block.block_identifier.hash,
|
||||
inscriptions_db_conn_rw,
|
||||
ctx,
|
||||
) {
|
||||
traversals.insert(new_tx.transaction_identifier.clone(), traversal);
|
||||
if let Some(inscriptions_db_conn) = inscriptions_db_conn {
|
||||
if let Some(traversal) = find_inscription_with_id(
|
||||
&inscription_data.inscription_id,
|
||||
&block.block_identifier.hash,
|
||||
inscriptions_db_conn,
|
||||
ctx,
|
||||
) {
|
||||
traversals.insert(tx.transaction_identifier.clone(), traversal);
|
||||
} else {
|
||||
// Enqueue for traversals
|
||||
transactions_ids.push(tx.transaction_identifier.clone());
|
||||
}
|
||||
} else {
|
||||
// Enqueue for traversals
|
||||
transactions_ids.push(new_tx.transaction_identifier.clone());
|
||||
transactions_ids.push(tx.transaction_identifier.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,12 +202,14 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
|
||||
let mut rng = thread_rng();
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
|
||||
let hasher = fxhash::FxBuildHasher::default();
|
||||
let shared_cache = Arc::new(DashMap::with_hasher(hasher));
|
||||
for transaction_id in transactions_ids.into_iter() {
|
||||
let moved_traversal_tx = traversal_tx.clone();
|
||||
let moved_ctx = ctx.clone();
|
||||
let block_identifier = new_block.block_identifier.clone();
|
||||
let block_identifier = block.block_identifier.clone();
|
||||
let moved_hord_db_path = hord_db_path.clone();
|
||||
let cache = shared_cache.clone();
|
||||
traversal_data_pool.execute(move || loop {
|
||||
match open_readonly_hord_db_conn_rocks_db(&moved_hord_db_path, &moved_ctx) {
|
||||
Ok(blocks_db) => {
|
||||
@@ -230,6 +218,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
0,
|
||||
cache,
|
||||
&moved_ctx,
|
||||
);
|
||||
let _ = moved_traversal_tx.send((transaction_id, traversal));
|
||||
@@ -273,8 +262,46 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
}
|
||||
}
|
||||
let _ = traversal_data_pool.join();
|
||||
std::thread::spawn(move || drop(shared_cache));
|
||||
}
|
||||
|
||||
traversals
|
||||
}
|
||||
|
||||
pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
new_block: &mut BitcoinBlockData,
|
||||
blocks_db_rw: &DB,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
write_block: bool,
|
||||
hord_db_path: &PathBuf,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
if write_block {
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"Updating hord.sqlite with Bitcoin block #{} for future traversals",
|
||||
new_block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
|
||||
let compacted_block = CompactedBlock::from_standardized_block(&new_block);
|
||||
insert_entry_in_blocks(
|
||||
new_block.block_identifier.index as u32,
|
||||
&compacted_block,
|
||||
&blocks_db_rw,
|
||||
&ctx,
|
||||
);
|
||||
let _ = blocks_db_rw.flush();
|
||||
}
|
||||
|
||||
let traversals = retrieve_inscribed_satoshi_points_from_block(
|
||||
&new_block,
|
||||
Some(inscriptions_db_conn_rw),
|
||||
hord_db_path,
|
||||
ctx,
|
||||
);
|
||||
|
||||
let mut storage = Storage::Sqlite(inscriptions_db_conn_rw);
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
|
||||
new_block,
|
||||
|
||||
@@ -10,6 +10,8 @@ extern crate serde_derive;
|
||||
extern crate serde_json;
|
||||
|
||||
pub extern crate bitcoincore_rpc;
|
||||
pub extern crate dashmap;
|
||||
pub extern crate fxhash;
|
||||
|
||||
pub use chainhook_types;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user