feat: add shared cache

This commit is contained in:
Ludo Galabru
2023-05-08 09:26:32 -04:00
parent ce439ae900
commit 07523aed1a
6 changed files with 201 additions and 91 deletions

49
Cargo.lock generated
View File

@@ -541,7 +541,9 @@ dependencies = [
"clarity-repl",
"crossbeam-channel 0.5.8",
"ctrlc",
"dashmap 5.4.0",
"futures",
"fxhash",
"hex",
"hex-simd",
"hiro-system-kit",
@@ -562,8 +564,6 @@ dependencies = [
"threadpool",
"tokio",
"toml",
"zerocopy",
"zerocopy-derive",
"zeromq",
]
@@ -1240,6 +1240,19 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "dashmap"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
dependencies = [
"cfg-if 1.0.0",
"hashbrown 0.12.3",
"lock_api",
"once_cell",
"parking_lot_core 0.9.5",
]
[[package]]
name = "debug_types"
version = "1.0.0"
@@ -1634,6 +1647,15 @@ dependencies = [
"slab",
]
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder",
]
[[package]]
name = "generator"
version = "0.7.2"
@@ -5104,27 +5126,6 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
[[package]]
name = "zerocopy"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zeroize"
version = "1.5.7"
@@ -5141,7 +5142,7 @@ dependencies = [
"asynchronous-codec",
"bytes",
"crossbeam",
"dashmap",
"dashmap 3.11.10",
"enum-primitive-derive",
"futures",
"futures-util",

View File

@@ -17,7 +17,7 @@ use std::sync::mpsc::channel;
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 12;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 1;
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 12;
pub struct Service {
@@ -147,8 +147,8 @@ impl Service {
&mut moved_config,
&moved_ctx,
);
let end_block = match hiro_system_kit::nestable_block_on(op) {
Ok(end_block) => end_block,
let last_block_in_csv = match hiro_system_kit::nestable_block_on(op) {
Ok(last_block_in_csv) => last_block_in_csv,
Err(e) => {
error!(
moved_ctx.expect_logger(),
@@ -159,7 +159,8 @@ impl Service {
};
info!(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", end_block.index
"Stacks chainstate scan completed up to block: {}",
last_block_in_csv.index
);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
@@ -259,15 +260,7 @@ impl Service {
}
match chainhook {
ChainhookSpecification::Stacks(predicate_spec) => {
// let _ = stacks_scan_op_tx.send((predicate_spec, api_key));
info!(
self.ctx.expect_logger(),
"Enabling stacks predicate {}", predicate_spec.uuid
);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
api_key,
));
let _ = stacks_scan_op_tx.send((predicate_spec, api_key));
}
ChainhookSpecification::Bitcoin(predicate_spec) => {
let _ = bitcoin_scan_op_tx.send((predicate_spec, api_key));

View File

@@ -50,8 +50,8 @@ rand = "0.8.5"
hex-simd = "0.8.0"
serde_cbor = "0.11.2"
zeromq = { version = "*", default-features = false, features = ["tokio-runtime", "tcp-transport"] }
zerocopy = "0.6.1"
zerocopy-derive = "0.3.2"
dashmap = "5.4.0"
fxhash = "0.2.1"
[dependencies.rocksdb]
version = "0.20.1"

View File

@@ -1,11 +1,15 @@
use std::{
collections::{BTreeMap, HashMap},
hash::BuildHasherDefault,
path::PathBuf,
sync::Arc,
};
use chainhook_types::{
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier,
};
use dashmap::DashMap;
use fxhash::FxHasher;
use hiro_system_kit::slog;
use rocksdb::DB;
@@ -418,6 +422,7 @@ pub fn open_readonly_hord_db_conn_rocks_db(
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_max_open_files(5000);
opts.set_disable_auto_compactions(true);
let db = DB::open_for_read_only(&opts, path, false)
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
Ok(db)
@@ -431,11 +436,22 @@ pub fn open_readwrite_hord_db_conn_rocks_db(
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_max_open_files(5000);
opts.set_disable_auto_compactions(true);
let db = DB::open(&opts, path)
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
Ok(db)
}
pub fn archive_hord_db_conn_rocks_db(base_dir: &PathBuf, _ctx: &Context) {
let from = get_default_hord_db_file_path_rocks_db(&base_dir);
let to = {
let mut destination_path = base_dir.clone();
destination_path.push("hord.rocksdb_archive");
destination_path
};
let _ = std::fs::rename(from, to);
}
// Legacy - to remove after migrations
pub fn find_block_at_block_height_sqlite(
block_height: u32,
@@ -1058,11 +1074,24 @@ impl TraversalResult {
}
}
// May 05 21:48:55.191 INFO Computing ordinal number for Satoshi point 0x5489d47538302148cd524f0ab1cc13223f3dc089cd5267d4cd45ccf1d532b743:0:0 (block #788201)
// May 05 21:50:36.807 INFO Satoshi #147405521136231 was minted in block #29481 at offset 521136231 and was transferred 2858 times (progress: 2379/2379).
// May 05 22:04:23.767 INFO Computing ordinal number for Satoshi point 0x57baf2e41fe5ffc70fc63129a1208c77606dd94d9ec05097a47ee557d8653c74:0:0 (block #788201)
// May 05 22:04:56.009 INFO Satoshi #1122896574049767 was minted in block #239158 at offset 1574049767 and was transferred 10634 times (progress: 2379/2379).
pub fn retrieve_satoshi_point_using_local_storage(
blocks_db: &DB,
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
inscription_number: u64,
cache: Arc<
DashMap<
(u32, [u8; 8]),
(Vec<([u8; 8], u32, u16, u64)>, Vec<u64>),
BuildHasherDefault<FxHasher>,
>,
>,
ctx: &Context,
) -> Result<TraversalResult, String> {
ctx.try_log(|logger| {
@@ -1084,10 +1113,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
};
let mut tx_cursor = (txid, 0);
let mut hops: u32 = 0;
let mut local_block_cache = HashMap::new();
loop {
local_block_cache.clear();
hops += 1;
if hops as u64 > block_identifier.index {
return Err(format!(
@@ -1096,17 +1122,77 @@ pub fn retrieve_satoshi_point_using_local_storage(
));
}
let block = match local_block_cache.get(&ordinal_block_number) {
if let Some(cached_tx) = cache.get(&(ordinal_block_number, tx_cursor.0)) {
let (inputs, outputs) = cached_tx.value();
let mut next_found_in_cache = false;
let mut sats_out = 0;
for (index, output_value) in outputs.iter().enumerate() {
if index == tx_cursor.1 {
break;
}
// ctx.try_log(|logger| {
// slog::info!(logger, "Adding {} from output #{}", output_value, index)
// });
sats_out += output_value;
}
sats_out += ordinal_offset;
// ctx.try_log(|logger| {
// slog::info!(
// logger,
// "Adding offset {ordinal_offset} to sats_out {sats_out}"
// )
// });
let mut sats_in = 0;
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
sats_in += txin_value;
// ctx.try_log(|logger| {
// slog::info!(
// logger,
// "Adding txin_value {txin_value} to sats_in {sats_in} (txin: {})",
// hex::encode(&txin)
// )
// });
if sats_out < sats_in {
ordinal_offset = sats_out - (sats_in - txin_value);
ordinal_block_number = *block_height;
// ctx.try_log(|logger| slog::info!(logger, "Block {ordinal_block_number} / Tx {} / [in:{sats_in}, out:{sats_out}]: {block_height} -> {ordinal_block_number}:{ordinal_offset} -> {}:{vout}",
// hex::encode(&txid_n),
// hex::encode(&txin)));
tx_cursor = (txin.clone(), *vout as usize);
next_found_in_cache = true;
break;
}
}
if next_found_in_cache {
continue;
}
if sats_in == 0 {
ctx.try_log(|logger| {
slog::error!(
logger,
"Transaction {} is originating from a non spending transaction",
transaction_identifier.hash
)
});
return Ok(TraversalResult {
inscription_number: 0,
ordinal_number: 0,
transfers: 0,
});
}
}
let block = match find_block_at_block_height(ordinal_block_number, 3, &blocks_db) {
Some(block) => block,
None => match find_block_at_block_height(ordinal_block_number, 3, &blocks_db) {
Some(block) => {
local_block_cache.insert(ordinal_block_number, block);
local_block_cache.get(&ordinal_block_number).unwrap()
}
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
}
},
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
}
};
let coinbase_txid = &block.0 .0 .0;
@@ -1162,7 +1248,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
}
} else {
// isolate the target transaction
for (txid_n, inputs, outputs) in block.0 .1.iter() {
for (txid_n, inputs, outputs) in block.0 .1.into_iter() {
// we iterate over the transactions, looking for the transaction target
if !txid_n.eq(&txid) {
continue;
@@ -1191,7 +1277,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
// });
let mut sats_in = 0;
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
for (txin, block_height, vout, txin_value) in inputs.iter() {
sats_in += txin_value;
// ctx.try_log(|logger| {
// slog::info!(
@@ -1202,6 +1288,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
// });
if sats_out < sats_in {
cache.insert((ordinal_block_number, txid_n), (inputs.clone(), outputs));
ordinal_offset = sats_out - (sats_in - txin_value);
ordinal_block_number = *block_height;

View File

@@ -8,6 +8,7 @@ use chainhook_types::{
BitcoinBlockData, OrdinalInscriptionRevealData, OrdinalInscriptionTransferData,
OrdinalOperation, TransactionIdentifier,
};
use dashmap::DashMap;
use hiro_system_kit::slog;
use rand::seq::SliceRandom;
use rand::thread_rng;
@@ -16,6 +17,7 @@ use rusqlite::Connection;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::path::PathBuf;
use std::sync::mpsc::channel;
use std::sync::Arc;
use threadpool::ThreadPool;
use crate::indexer::bitcoin::BitcoinTransactionFullBreakdown;
@@ -160,50 +162,34 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
Ok(())
}
pub fn update_hord_db_and_augment_bitcoin_block(
new_block: &mut BitcoinBlockData,
blocks_db_rw: &DB,
inscriptions_db_conn_rw: &Connection,
write_block: bool,
pub fn retrieve_inscribed_satoshi_points_from_block(
block: &BitcoinBlockData,
inscriptions_db_conn: Option<&Connection>,
hord_db_path: &PathBuf,
ctx: &Context,
) -> Result<(), String> {
if write_block {
ctx.try_log(|logger| {
slog::info!(
logger,
"Updating hord.sqlite with Bitcoin block #{} for future traversals",
new_block.block_identifier.index,
)
});
let compacted_block = CompactedBlock::from_standardized_block(&new_block);
insert_entry_in_blocks(
new_block.block_identifier.index as u32,
&compacted_block,
&blocks_db_rw,
&ctx,
);
let _ = blocks_db_rw.flush();
}
) -> HashMap<TransactionIdentifier, TraversalResult> {
let mut transactions_ids = vec![];
let mut traversals = HashMap::new();
for new_tx in new_block.transactions.iter_mut().skip(1) {
for tx in block.transactions.iter().skip(1) {
// Have a new inscription been revealed, if so, are looking at a re-inscription
for ordinal_event in new_tx.metadata.ordinal_operations.iter_mut() {
for ordinal_event in tx.metadata.ordinal_operations.iter() {
if let OrdinalOperation::InscriptionRevealed(inscription_data) = ordinal_event {
if let Some(traversal) = find_inscription_with_id(
&inscription_data.inscription_id,
&new_block.block_identifier.hash,
inscriptions_db_conn_rw,
ctx,
) {
traversals.insert(new_tx.transaction_identifier.clone(), traversal);
if let Some(inscriptions_db_conn) = inscriptions_db_conn {
if let Some(traversal) = find_inscription_with_id(
&inscription_data.inscription_id,
&block.block_identifier.hash,
inscriptions_db_conn,
ctx,
) {
traversals.insert(tx.transaction_identifier.clone(), traversal);
} else {
// Enqueue for traversals
transactions_ids.push(tx.transaction_identifier.clone());
}
} else {
// Enqueue for traversals
transactions_ids.push(new_tx.transaction_identifier.clone());
transactions_ids.push(tx.transaction_identifier.clone());
}
}
}
@@ -216,12 +202,14 @@ pub fn update_hord_db_and_augment_bitcoin_block(
let mut rng = thread_rng();
transactions_ids.shuffle(&mut rng);
let hasher = fxhash::FxBuildHasher::default();
let shared_cache = Arc::new(DashMap::with_hasher(hasher));
for transaction_id in transactions_ids.into_iter() {
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = ctx.clone();
let block_identifier = new_block.block_identifier.clone();
let block_identifier = block.block_identifier.clone();
let moved_hord_db_path = hord_db_path.clone();
let cache = shared_cache.clone();
traversal_data_pool.execute(move || loop {
match open_readonly_hord_db_conn_rocks_db(&moved_hord_db_path, &moved_ctx) {
Ok(blocks_db) => {
@@ -230,6 +218,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
&block_identifier,
&transaction_id,
0,
cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((transaction_id, traversal));
@@ -273,8 +262,46 @@ pub fn update_hord_db_and_augment_bitcoin_block(
}
}
let _ = traversal_data_pool.join();
std::thread::spawn(move || drop(shared_cache));
}
traversals
}
pub fn update_hord_db_and_augment_bitcoin_block(
new_block: &mut BitcoinBlockData,
blocks_db_rw: &DB,
inscriptions_db_conn_rw: &Connection,
write_block: bool,
hord_db_path: &PathBuf,
ctx: &Context,
) -> Result<(), String> {
if write_block {
ctx.try_log(|logger| {
slog::info!(
logger,
"Updating hord.sqlite with Bitcoin block #{} for future traversals",
new_block.block_identifier.index,
)
});
let compacted_block = CompactedBlock::from_standardized_block(&new_block);
insert_entry_in_blocks(
new_block.block_identifier.index as u32,
&compacted_block,
&blocks_db_rw,
&ctx,
);
let _ = blocks_db_rw.flush();
}
let traversals = retrieve_inscribed_satoshi_points_from_block(
&new_block,
Some(inscriptions_db_conn_rw),
hord_db_path,
ctx,
);
let mut storage = Storage::Sqlite(inscriptions_db_conn_rw);
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
new_block,

View File

@@ -10,6 +10,8 @@ extern crate serde_derive;
extern crate serde_json;
pub extern crate bitcoincore_rpc;
pub extern crate dashmap;
pub extern crate fxhash;
pub use chainhook_types;