feat: migration to rocksdb, moving json parsing from networking thread

This commit is contained in:
Ludo Galabru
2023-04-06 15:59:04 -04:00
parent d09dac17ea
commit 5ad0147fa0
8 changed files with 724 additions and 305 deletions

183
Cargo.lock generated
View File

@@ -272,6 +272,26 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "383d29d513d8764dcdc42ea295d979eb99c3c9f00607b3692cf68a431f7dca72"
[[package]]
name = "bindgen"
version = "0.64.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4"
dependencies = [
"bitflags",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"peeking_take_while",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn",
]
[[package]]
name = "bip39"
version = "1.0.1"
@@ -421,6 +441,17 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
[[package]]
name = "bzip2-sys"
version = "0.1.11+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]]
name = "cast"
version = "0.3.0"
@@ -432,6 +463,18 @@ name = "cc"
version = "1.0.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9f73505338f7d905b19d18738976aae232eb46b8efc15554ffc56deb5d9ebe4"
dependencies = [
"jobserver",
]
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
]
[[package]]
name = "cfg-if"
@@ -507,6 +550,7 @@ dependencies = [
"reqwest",
"rocket",
"rocket_okapi",
"rocksdb",
"rusqlite",
"schemars",
"serde",
@@ -518,6 +562,8 @@ dependencies = [
"threadpool",
"tokio",
"toml",
"zerocopy",
"zerocopy-derive",
"zeromq",
]
@@ -570,6 +616,17 @@ dependencies = [
"generic-array 0.14.6",
]
[[package]]
name = "clang-sys"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c688fc74432808e3eb684cae8830a86be1d66a2bd58e1f248ed0960a590baf6f"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
name = "clap"
version = "2.34.0"
@@ -2040,6 +2097,15 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc"
[[package]]
name = "jobserver"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.60"
@@ -2076,12 +2142,43 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.138"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8"
[[package]]
name = "libloading"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f"
dependencies = [
"cfg-if 1.0.0",
"winapi",
]
[[package]]
name = "librocksdb-sys"
version = "0.10.0+7.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fe4d5874f5ff2bc616e55e8c6086d478fcda13faf9495768a4aa1c22042d30b"
dependencies = [
"bindgen",
"bzip2-sys",
"cc",
"glob",
"libc",
"libz-sys",
"lz4-sys",
]
[[package]]
name = "libsecp256k1"
version = "0.3.5"
@@ -2205,6 +2302,17 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "libz-sys"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "link-cplusplus"
version = "1.0.7"
@@ -2254,6 +2362,16 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "lz4-sys"
version = "1.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "matchers"
version = "0.1.0"
@@ -2314,6 +2432,12 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.6.2"
@@ -2422,6 +2546,16 @@ version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -2672,6 +2806,12 @@ dependencies = [
"syn",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "percent-encoding"
version = "2.2.0"
@@ -3288,6 +3428,16 @@ dependencies = [
"syn",
]
[[package]]
name = "rocksdb"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "015439787fce1e75d55f279078d33ff14b4af5d93d995e8838ee4631301c8a99"
dependencies = [
"libc",
"librocksdb-sys",
]
[[package]]
name = "rstest"
version = "0.11.0"
@@ -3334,6 +3484,12 @@ version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.2.3"
@@ -3772,6 +3928,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@@ -4968,6 +5130,27 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
[[package]]
name = "zerocopy"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zeroize"
version = "1.5.7"

View File

@@ -13,10 +13,10 @@ use chainhook_event_observer::chainhooks::types::{
StacksPrintEventBasedPredicate,
};
use chainhook_event_observer::hord::db::{
delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db, find_all_inscriptions,
find_compacted_block_at_block_height, find_inscriptions_at_wached_outpoint,
find_latest_compacted_block_known, initialize_hord_db, open_readonly_hord_db_conn,
open_readwrite_hord_db_conn, patch_inscription_number,
delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db, find_block_at_block_height_sqlite,
find_inscriptions_at_wached_outpoint, find_last_block_inserted, initialize_hord_db,
insert_entry_in_blocks, open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db,
open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
retrieve_satoshi_point_using_local_storage,
};
use chainhook_event_observer::observer::BitcoinConfig;
@@ -173,6 +173,9 @@ struct StartCommand {
/// Start REST API for managing predicates
#[clap(long = "start-http-api")]
pub start_http_api: bool,
/// Disable hord indexing
#[clap(long = "no-hord")]
pub hord_disabled: bool,
}
#[derive(Subcommand, PartialEq, Clone, Debug)]
@@ -187,9 +190,6 @@ enum HordCommand {
#[derive(Subcommand, PartialEq, Clone, Debug)]
enum DbCommand {
/// Init hord db
#[clap(name = "init", bin_name = "init")]
Init(InitHordDbCommand),
/// Rewrite hord db
#[clap(name = "rewrite", bin_name = "rewrite")]
Rewrite(UpdateHordDbCommand),
@@ -260,15 +260,6 @@ struct FindInscriptionCommand {
pub db_path: Option<String>,
}
#[derive(Parser, PartialEq, Clone, Debug)]
struct InitHordDbCommand {
/// # of Networking thread
pub network_threads: usize,
/// Load config file path
#[clap(long = "config-path")]
pub config_path: Option<String>,
}
#[derive(Parser, PartialEq, Clone, Debug)]
struct UpdateHordDbCommand {
/// Starting block
@@ -344,12 +335,37 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
if cmd.predicates_paths.len() > 0 && !cmd.start_http_api {
config.chainhooks.enable_http_api = false;
}
let mut service = Service::new(config, ctx);
let predicates = cmd
.predicates_paths
.iter()
.map(|p| load_predicate_from_path(p))
.collect::<Result<Vec<ChainhookFullSpecification>, _>>()?;
info!(ctx.expect_logger(), "Starting service...",);
if !cmd.hord_disabled {
info!(
ctx.expect_logger(),
"Ordinal indexing is enabled by default hord, checking index... (use --no-hord to disable ordinals)"
);
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
if start_block == 0 {
info!(
ctx.expect_logger(),
"Initializing hord indexing from block #{}", start_block
);
} else {
info!(
ctx.expect_logger(),
"Resuming hord indexing from block #{}", start_block
);
}
perform_hord_db_update(start_block, end_block, 8, &config, &ctx).await?;
}
}
let mut service = Service::new(config, ctx);
return service.run(predicates).await;
}
},
@@ -498,10 +514,10 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
let hord_db_conn =
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
let tip_height = find_latest_compacted_block_known(&hord_db_conn) as u64;
open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
.unwrap();
let tip_height = find_last_block_inserted(&hord_db_conn) as u64;
if cmd.block_height > tip_height {
perform_hord_db_update(tip_height, cmd.block_height, 8, &config, &ctx).await?;
}
@@ -538,81 +554,48 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
},
Command::Hord(HordCommand::Db(subcmd)) => match subcmd {
DbCommand::Init(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
);
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
Ok(con) => con,
Err(message) => {
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
}
};
let end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};
let hord_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let start_block = find_latest_compacted_block_known(&hord_db_conn) as u64;
if start_block == 0 {
let _ = initialize_hord_db(&config.expected_cache_path(), &ctx);
} else {
info!(
ctx.expect_logger(),
"Resuming hord indexing from block #{}", start_block
);
}
perform_hord_db_update(start_block, end_block, cmd.network_threads, &config, &ctx)
.await?;
}
DbCommand::Sync(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
);
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
Ok(con) => con,
Err(message) => {
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
if start_block == 0 {
info!(
ctx.expect_logger(),
"Initializing hord indexing from block #{}", start_block
);
} else {
info!(
ctx.expect_logger(),
"Resuming hord indexing from block #{}", start_block
);
}
};
let hord_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let start_block = find_latest_compacted_block_known(&hord_db_conn) as u64;
let end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};
perform_hord_db_update(start_block, end_block, cmd.network_threads, &config, &ctx)
perform_hord_db_update(
start_block,
end_block,
cmd.network_threads,
&config,
&ctx,
)
.await?;
} else {
info!(ctx.expect_logger(), "Database hord up to date");
}
}
DbCommand::Rewrite(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
// Delete data, if any
let rw_hord_db_conn =
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
delete_data_in_hord_db(cmd.start_block, cmd.end_block, &rw_hord_db_conn, &ctx)?;
delete_data_in_hord_db(
cmd.start_block,
cmd.end_block,
&blocks_db_rw,
&inscriptions_db_conn_rw,
&ctx,
)?;
// Update data
perform_hord_db_update(
cmd.start_block,
@@ -625,9 +608,18 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
DbCommand::Drop(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let rw_hord_db_conn =
let blocks_db =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
delete_data_in_hord_db(cmd.start_block, cmd.end_block, &rw_hord_db_conn, &ctx)?;
delete_data_in_hord_db(
cmd.start_block,
cmd.end_block,
&blocks_db,
&inscriptions_db_conn_rw,
&ctx,
)?;
info!(
ctx.expect_logger(),
"Cleaning hord_db: {} blocks dropped",
@@ -636,40 +628,21 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
DbCommand::Patch(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let rw_hord_db_conn =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let sqlite_db_conn =
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let inscriptions_per_blocks = find_all_inscriptions(&rw_hord_db_conn);
let mut inscription_number = 0;
for (block_height, inscriptions) in inscriptions_per_blocks.iter() {
let block = match find_compacted_block_at_block_height(
*block_height as u32,
&rw_hord_db_conn,
) {
Some(block) => block,
None => continue,
};
let blocks_db =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
for (txid, _) in inscriptions.iter() {
for (txid_n, _, _) in block.0 .1.iter() {
if txid.hash[2..10].eq(&hex::encode(txid_n)) {
let inscription_id = format!("{}i0", &txid.hash[2..]);
patch_inscription_number(
&inscription_id,
inscription_number,
&rw_hord_db_conn,
&ctx,
);
info!(
ctx.expect_logger(),
"Patch inscription_number: {}\t{}\t({})",
inscription_id,
inscription_number,
block_height
);
}
for i in 0..774940 {
match find_block_at_block_height_sqlite(i, &sqlite_db_conn) {
Some(block) => {
insert_entry_in_blocks(i, &block, &blocks_db, &ctx);
println!("Block #{} inserted", i);
}
None => {
println!("Block #{} missing", i)
}
inscription_number += 1;
}
}
}
@@ -678,6 +651,45 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
Ok(())
}
pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
);
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
Ok(con) => con,
Err(message) => {
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
}
};
let start_block = {
let blocks_db = open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
find_last_block_inserted(&blocks_db) as u64
};
if start_block == 0 {
let _ = initialize_hord_db(&config.expected_cache_path(), &ctx);
}
let end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};
if start_block < end_block {
Ok(Some((start_block, end_block)))
} else {
Ok(None)
}
}
pub async fn perform_hord_db_update(
start_block: u64,
end_block: u64,
@@ -698,11 +710,13 @@ pub async fn perform_hord_db_update(
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
};
let rw_hord_db_conn = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let blocks_db = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let _ = fetch_and_cache_blocks_in_hord_db(
&bitcoin_config,
&rw_hord_db_conn,
&blocks_db,
&inscriptions_db_conn_rw,
start_block,
end_block,
network_threads,

View File

@@ -9,8 +9,9 @@ use chainhook_event_observer::chainhooks::types::{
BitcoinChainhookFullSpecification, BitcoinPredicateType, Protocols,
};
use chainhook_event_observer::hord::db::{
fetch_and_cache_blocks_in_hord_db, find_all_inscriptions, find_compacted_block_at_block_height,
find_latest_compacted_block_known, open_readonly_hord_db_conn, open_readwrite_hord_db_conn,
fetch_and_cache_blocks_in_hord_db, find_all_inscriptions, find_block_at_block_height,
find_last_block_inserted, open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db,
open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
};
use chainhook_event_observer::hord::{
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data,
@@ -18,7 +19,7 @@ use chainhook_event_observer::hord::{
};
use chainhook_event_observer::indexer;
use chainhook_event_observer::indexer::bitcoin::{
retrieve_block_hash_with_retry, retrieve_full_block_breakdown_with_retry,
download_and_parse_block_with_retry, retrieve_block_hash_with_retry,
};
use chainhook_event_observer::observer::{gather_proofs, EventObserverConfig};
use chainhook_event_observer::utils::{file_append, send_request, Context};
@@ -83,11 +84,17 @@ pub async fn scan_bitcoin_chain_with_predicate(
if let BitcoinPredicateType::Protocol(Protocols::Ordinal(_)) = &predicate_spec.predicate {
is_predicate_evaluating_ordinals = true;
if let Ok(hord_db_conn) = open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx) {
inscriptions_cache = find_all_inscriptions(&hord_db_conn);
if let Ok(inscriptions_db_conn) =
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)
{
inscriptions_cache = find_all_inscriptions(&inscriptions_db_conn);
// Will we have to update the blocks table?
if find_compacted_block_at_block_height(end_block as u32, &hord_db_conn).is_none() {
hord_blocks_requires_update = true;
if let Ok(blocks_db) =
open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
{
if find_block_at_block_height(end_block as u32, &blocks_db).is_none() {
hord_blocks_requires_update = true;
}
}
}
}
@@ -103,20 +110,23 @@ pub async fn scan_bitcoin_chain_with_predicate(
// TODO: make sure that we have a contiguous chain
// check_compacted_blocks_chain_integrity(&hord_db_conn);
let hord_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
let start_block = find_latest_compacted_block_known(&hord_db_conn) as u64;
let start_block = find_last_block_inserted(&blocks_db_rw) as u64;
if start_block < end_block {
warn!(
ctx.expect_logger(),
"Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks",
(end_block - start_block)
);
let rw_hord_db_conn =
let inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), ctx)?;
fetch_and_cache_blocks_in_hord_db(
&config.get_event_observer_config().get_bitcoin_config(),
&rw_hord_db_conn,
&blocks_db_rw,
&inscriptions_db_conn_rw,
start_block,
end_block,
8,
@@ -125,7 +135,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
)
.await?;
inscriptions_cache = find_all_inscriptions(&hord_db_conn);
inscriptions_cache = find_all_inscriptions(&inscriptions_db_conn_rw);
}
}
}
@@ -158,7 +168,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
let block_hash = retrieve_block_hash_with_retry(&cursor, &bitcoin_config, ctx).await?;
let block_breakdown =
retrieve_full_block_breakdown_with_retry(&block_hash, &bitcoin_config, ctx).await?;
download_and_parse_block_with_retry(&block_hash, &bitcoin_config, ctx).await?;
let mut block = indexer::bitcoin::standardize_bitcoin_block(
block_breakdown,
&event_observer_config.bitcoin_network,
@@ -204,7 +214,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
blocks_scanned += 1;
let block_hash = retrieve_block_hash_with_retry(&cursor, &bitcoin_config, ctx).await?;
let block_breakdown =
retrieve_full_block_breakdown_with_retry(&block_hash, &bitcoin_config, ctx).await?;
download_and_parse_block_with_retry(&block_hash, &bitcoin_config, ctx).await?;
let block = indexer::bitcoin::standardize_bitcoin_block(
block_breakdown,
&event_observer_config.bitcoin_network,

View File

@@ -51,6 +51,13 @@ rand = "0.8.5"
hex-simd = "0.8.0"
serde_cbor = "0.11.2"
zeromq = { version = "*", default-features = false, features = ["tokio-runtime", "tcp-transport"] }
zerocopy = "0.6.1"
zerocopy-derive = "0.3.2"
[dependencies.rocksdb]
version = "0.20.1"
default-features = false
features = ["lz4", "snappy"]
[replace]
"jsonrpc:0.13.0" = { git = 'https://github.com/apoelstra/rust-jsonrpc', rev = "1063671f122a8985c1b7c29030071253da515839" }

View File

@@ -8,12 +8,13 @@ use chainhook_types::{
};
use hiro_system_kit::slog;
use rocksdb::DB;
use rusqlite::{Connection, OpenFlags, ToSql};
use threadpool::ThreadPool;
use crate::{
indexer::bitcoin::{
retrieve_block_hash_with_retry, retrieve_full_block_breakdown_with_retry,
download_block_with_retry, parse_downloaded_block, retrieve_block_hash_with_retry,
standardize_bitcoin_block, BitcoinBlockFullBreakdown,
},
observer::BitcoinConfig,
@@ -47,15 +48,6 @@ pub fn open_readwrite_hord_db_conn(
pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
let conn = create_or_open_readwrite_db(path, ctx);
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS blocks (
id INTEGER NOT NULL PRIMARY KEY,
compacted_bytes TEXT NOT NULL
)",
[],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS inscriptions (
inscription_id TEXT NOT NULL PRIMARY KEY,
@@ -121,7 +113,7 @@ fn create_or_open_readwrite_db(cache_path: &PathBuf, ctx: &Context) -> Connectio
// db.busy_handler(Some(tx_busy_handler))?;
let mmap_size: i64 = 256 * 1024 * 1024;
let page_size: i64 = 32768;
let page_size: i64 = 16384;
conn.pragma_update(None, "mmap_size", mmap_size).unwrap();
conn.pragma_update(None, "page_size", page_size).unwrap();
conn.pragma_update(None, "journal_mode", &"WAL").unwrap();
@@ -157,15 +149,48 @@ fn open_existing_readonly_db(path: &PathBuf, ctx: &Context) -> Connection {
return conn;
}
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
// #[repr(C)]
// pub struct T {
// ci: [u8; 4],
// cv: u64,
// t: Vec<Tx>,
// }
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
// #[repr(C, packed)]
// pub struct Tx {
// t: [u8; 4],
// i: TxIn,
// o: TxOut,
// }
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
// #[repr(C, packed)]
// pub struct TxIn {
// i: [u8; 4],
// b: u32,
// o: u16,
// v: u64
// }
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
// #[repr(C, packed)]
// pub struct TxOut {
// v: u64,
// }
#[derive(Debug, Serialize, Deserialize)]
// pub struct CompactedBlock(Vec<(Vec<(u32, u16, u64)>, Vec<u64>)>);
#[repr(C)]
pub struct CompactedBlock(
pub (
pub (
([u8; 4], u64),
Vec<([u8; 4], Vec<([u8; 4], u32, u16, u64)>, Vec<u64>)>,
),
);
use std::io::{Read, Write};
impl CompactedBlock {
pub fn from_full_block(block: &BitcoinBlockFullBreakdown) -> CompactedBlock {
let mut txs = vec![];
@@ -238,31 +263,108 @@ impl CompactedBlock {
value
}
pub fn to_hex_bytes(&self) -> String {
let bytes = serde_cbor::to_vec(self).unwrap();
let hex_bytes = hex_simd::encode_to_string(bytes, hex_simd::AsciiCase::Lower);
hex_bytes
pub fn from_cbor_bytes(bytes: &[u8]) -> CompactedBlock {
serde_cbor::from_slice(&bytes[..]).unwrap()
}
fn serialize<W: Write>(&self, fd: &mut W) -> std::io::Result<()> {
fd.write_all(&self.0 .0 .0)?;
fd.write(&self.0 .0 .1.to_be_bytes())?;
fd.write(&self.0 .1.len().to_be_bytes())?;
for (id, inputs, outputs) in self.0 .1.iter() {
fd.write_all(id)?;
fd.write(&inputs.len().to_be_bytes())?;
for (txid, block, vout, value) in inputs.iter() {
fd.write_all(txid)?;
fd.write(&block.to_be_bytes())?;
fd.write(&vout.to_be_bytes())?;
fd.write(&value.to_be_bytes())?;
}
fd.write(&outputs.len().to_be_bytes())?;
for value in outputs.iter() {
fd.write(&value.to_be_bytes())?;
}
}
Ok(())
}
fn deserialize<R: Read>(fd: &mut R) -> std::io::Result<CompactedBlock> {
let mut ci = [0u8; 4];
fd.read_exact(&mut ci)?;
let mut cv = [0u8; 8];
fd.read_exact(&mut cv)?;
let mut tx_len = [0u8; 8];
fd.read_exact(&mut tx_len)?;
let mut txs = vec![];
for _ in 0..usize::from_be_bytes(tx_len) {
let mut txid = [0u8; 4];
fd.read_exact(&mut txid)?;
let mut inputs_len = [0u8; 8];
fd.read_exact(&mut inputs_len)?;
let mut inputs = vec![];
for _ in 0..usize::from_be_bytes(inputs_len) {
let mut txin = [0u8; 4];
fd.read_exact(&mut txin)?;
let mut block = [0u8; 4];
fd.read_exact(&mut block)?;
let mut vout = [0u8; 2];
fd.read_exact(&mut vout)?;
let mut value = [0u8; 8];
fd.read_exact(&mut value)?;
inputs.push((
txin,
u32::from_be_bytes(block),
u16::from_be_bytes(vout),
u64::from_be_bytes(value),
))
}
let mut outputs_len = [0u8; 8];
fd.read_exact(&mut outputs_len)?;
let mut outputs = vec![];
for _ in 0..usize::from_be_bytes(outputs_len) {
let mut v = [0u8; 8];
fd.read_exact(&mut v)?;
outputs.push(u64::from_be_bytes(v))
}
txs.push((txid, inputs, outputs));
}
Ok(CompactedBlock(((ci, u64::from_be_bytes(cv)), txs)))
}
}
pub fn find_latest_compacted_block_known(hord_db_conn: &Connection) -> u32 {
let args: &[&dyn ToSql] = &[];
let mut stmt = match hord_db_conn.prepare("SELECT id FROM blocks ORDER BY id DESC LIMIT 1") {
Ok(stmt) => stmt,
Err(_) => return 0,
};
let mut rows = match stmt.query(args) {
Ok(rows) => rows,
Err(_) => return 0,
};
while let Ok(Some(row)) = rows.next() {
let id: u32 = row.get(0).unwrap();
return id;
}
0
fn get_default_hord_db_file_path_rocks_db(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
destination_path.push("hord.rocksdb");
destination_path
}
pub fn find_compacted_block_at_block_height(
pub fn open_readonly_hord_db_conn_rocks_db(
base_dir: &PathBuf,
_ctx: &Context,
) -> Result<DB, String> {
let path = get_default_hord_db_file_path_rocks_db(&base_dir);
let mut opts = rocksdb::Options::default();
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.set_max_open_files(1000);
let db = DB::open_for_read_only(&opts, path, false).unwrap();
Ok(db)
}
pub fn open_readwrite_hord_db_conn_rocks_db(
base_dir: &PathBuf,
_ctx: &Context,
) -> Result<DB, String> {
let path = get_default_hord_db_file_path_rocks_db(&base_dir);
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.set_max_open_files(1000);
let db = DB::open(&opts, path).unwrap();
Ok(db)
}
// Legacy - to remove after migrations
pub fn find_block_at_block_height_sqlite(
block_height: u32,
hord_db_conn: &Connection,
) -> Option<CompactedBlock> {
@@ -283,6 +385,57 @@ pub fn find_compacted_block_at_block_height(
return None;
}
pub fn insert_entry_in_blocks(
block_height: u32,
compacted_block: &CompactedBlock,
blocks_db_rw: &DB,
_ctx: &Context,
) {
let mut bytes = vec![];
let _ = compacted_block.serialize(&mut bytes);
let block_height_bytes = block_height.to_be_bytes();
blocks_db_rw
.put(&block_height_bytes, bytes)
.expect("unable to insert blocks");
blocks_db_rw
.put(b"metadata::last_insert", block_height_bytes)
.expect("unable to insert metadata");
}
pub fn find_last_block_inserted(blocks_db: &DB) -> u32 {
match blocks_db.get(b"metadata::last_insert") {
Ok(Some(bytes)) => u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
_ => 0,
}
}
pub fn find_block_at_block_height(block_height: u32, blocks_db: &DB) -> Option<CompactedBlock> {
match blocks_db.get(block_height.to_be_bytes()) {
Ok(Some(ref res)) => {
let res = CompactedBlock::deserialize(&mut std::io::Cursor::new(&res)).unwrap();
Some(res)
}
_ => None,
}
}
pub fn remove_entry_from_blocks(block_height: u32, blocks_db_rw: &DB, ctx: &Context) {
if let Err(e) = blocks_db_rw.delete(block_height.to_be_bytes()) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}
pub fn delete_blocks_in_block_range(
start_block: u32,
end_block: u32,
blocks_db_rw: &DB,
ctx: &Context,
) {
for block_height in start_block..=end_block {
remove_entry_from_blocks(block_height, blocks_db_rw, ctx);
}
}
pub fn store_new_inscription(
inscription_data: &OrdinalInscriptionRevealData,
block_identifier: &BlockIdentifier,
@@ -301,10 +454,10 @@ pub fn update_transfered_inscription(
inscription_id: &str,
outpoint_post_transfer: &str,
offset: u64,
hord_db_conn: &Connection,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
if let Err(e) = hord_db_conn.execute(
if let Err(e) = inscriptions_db_conn_rw.execute(
"UPDATE inscriptions SET outpoint_to_watch = ?, offset = ? WHERE inscription_id = ?",
rusqlite::params![&outpoint_post_transfer, &offset, &inscription_id],
) {
@@ -315,10 +468,10 @@ pub fn update_transfered_inscription(
pub fn patch_inscription_number(
inscription_id: &str,
inscription_number: u64,
hord_db_conn: &Connection,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
if let Err(e) = hord_db_conn.execute(
if let Err(e) = inscriptions_db_conn_rw.execute(
"UPDATE inscriptions SET inscription_number = ? WHERE inscription_id = ?",
rusqlite::params![&inscription_number, &inscription_id],
) {
@@ -327,11 +480,11 @@ pub fn patch_inscription_number(
}
pub fn find_latest_inscription_block_height(
hord_db_conn: &Connection,
inscriptions_db_conn: &Connection,
_ctx: &Context,
) -> Result<Option<u64>, String> {
let args: &[&dyn ToSql] = &[];
let mut stmt = hord_db_conn
let mut stmt = inscriptions_db_conn
.prepare("SELECT block_height FROM inscriptions ORDER BY block_height DESC LIMIT 1")
.unwrap();
let mut rows = stmt.query(args).unwrap();
@@ -343,11 +496,11 @@ pub fn find_latest_inscription_block_height(
}
pub fn find_latest_inscription_number(
hord_db_conn: &Connection,
inscriptions_db_conn: &Connection,
_ctx: &Context,
) -> Result<u64, String> {
let args: &[&dyn ToSql] = &[];
let mut stmt = hord_db_conn
let mut stmt = inscriptions_db_conn
.prepare(
"SELECT inscription_number FROM inscriptions ORDER BY inscription_number DESC LIMIT 1",
)
@@ -362,11 +515,11 @@ pub fn find_latest_inscription_number(
pub fn find_inscription_with_ordinal_number(
ordinal_number: &u64,
hord_db_conn: &Connection,
inscriptions_db_conn: &Connection,
_ctx: &Context,
) -> Option<String> {
let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()];
let mut stmt = hord_db_conn
let mut stmt = inscriptions_db_conn
.prepare("SELECT inscription_id FROM inscriptions WHERE ordinal_number = ?")
.unwrap();
let mut rows = stmt.query(args).unwrap();
@@ -378,10 +531,10 @@ pub fn find_inscription_with_ordinal_number(
}
pub fn find_all_inscriptions(
hord_db_conn: &Connection,
inscriptions_db_conn: &Connection,
) -> BTreeMap<u64, Vec<(TransactionIdentifier, TraversalResult)>> {
let args: &[&dyn ToSql] = &[];
let mut stmt = hord_db_conn
let mut stmt = inscriptions_db_conn
.prepare("SELECT inscription_number, ordinal_number, block_height, inscription_id FROM inscriptions ORDER BY inscription_number ASC")
.unwrap();
let mut results: BTreeMap<u64, Vec<(TransactionIdentifier, TraversalResult)>> = BTreeMap::new();
@@ -444,52 +597,13 @@ pub fn find_inscriptions_at_wached_outpoint(
return Ok(results);
}
pub fn insert_entry_in_blocks(
block_id: u32,
compacted_block: &CompactedBlock,
hord_db_conn: &Connection,
ctx: &Context,
) {
let serialized_compacted_block = compacted_block.to_hex_bytes();
if let Err(e) = hord_db_conn.execute(
"INSERT INTO blocks (id, compacted_bytes) VALUES (?1, ?2)",
rusqlite::params![&block_id, &serialized_compacted_block],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}
pub fn remove_entry_from_blocks(block_id: u32, hord_db_conn: &Connection, ctx: &Context) {
if let Err(e) = hord_db_conn.execute(
"DELETE FROM blocks WHERE id = ?1",
rusqlite::params![&block_id],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}
pub fn delete_blocks_in_block_range(
start_block: u32,
end_block: u32,
rw_hord_db_conn: &Connection,
ctx: &Context,
) {
if let Err(e) = rw_hord_db_conn.execute(
"DELETE FROM blocks WHERE id >= ?1 AND id <= ?2",
rusqlite::params![&start_block, &end_block],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}
pub fn delete_inscriptions_in_block_range(
start_block: u32,
end_block: u32,
rw_hord_db_conn: &Connection,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
if let Err(e) = rw_hord_db_conn.execute(
if let Err(e) = inscriptions_db_conn_rw.execute(
"DELETE FROM inscriptions WHERE block_height >= ?1 AND block_height <= ?2",
rusqlite::params![&start_block, &end_block],
) {
@@ -499,10 +613,10 @@ pub fn delete_inscriptions_in_block_range(
pub fn remove_entry_from_inscriptions(
inscription_id: &str,
hord_db_conn: &Connection,
inscriptions_db_rw_conn: &Connection,
ctx: &Context,
) {
if let Err(e) = hord_db_conn.execute(
if let Err(e) = inscriptions_db_rw_conn.execute(
"DELETE FROM inscriptions WHERE inscription_id = ?1",
rusqlite::params![&inscription_id],
) {
@@ -513,17 +627,24 @@ pub fn remove_entry_from_inscriptions(
pub fn delete_data_in_hord_db(
start_block: u64,
end_block: u64,
rw_hord_db_conn: &Connection,
blocks_db_rw: &DB,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) -> Result<(), String> {
delete_blocks_in_block_range(start_block as u32, end_block as u32, rw_hord_db_conn, &ctx);
delete_inscriptions_in_block_range(start_block as u32, end_block as u32, rw_hord_db_conn, &ctx);
delete_blocks_in_block_range(start_block as u32, end_block as u32, blocks_db_rw, &ctx);
delete_inscriptions_in_block_range(
start_block as u32,
end_block as u32,
inscriptions_db_conn_rw,
&ctx,
);
Ok(())
}
pub async fn fetch_and_cache_blocks_in_hord_db(
bitcoin_config: &BitcoinConfig,
rw_hord_db_conn: &Connection,
blocks_db_rw: &DB,
inscriptions_db_conn_rw: &Connection,
start_block: u64,
end_block: u64,
network_thread: usize,
@@ -532,11 +653,11 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
) -> Result<(), String> {
let number_of_blocks_to_process = end_block - start_block + 1;
let retrieve_block_hash_pool = ThreadPool::new(network_thread);
let (block_hash_tx, block_hash_rx) = crossbeam_channel::bounded(128);
let (block_hash_tx, block_hash_rx) = crossbeam_channel::bounded(256);
let retrieve_block_data_pool = ThreadPool::new(network_thread);
let (block_data_tx, block_data_rx) = crossbeam_channel::bounded(64);
let (block_data_tx, block_data_rx) = crossbeam_channel::bounded(256);
let compress_block_data_pool = ThreadPool::new(16);
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(32);
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(512);
// Thread pool #1: given a block height, retrieve the block hash
for block_cursor in start_block..=end_block {
@@ -565,11 +686,8 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
retrieve_block_data_pool.execute(move || {
moved_ctx
.try_log(|logger| slog::debug!(logger, "Fetching block #{block_height}"));
let future = retrieve_full_block_breakdown_with_retry(
&block_hash,
&moved_bitcoin_config,
&moved_ctx,
);
let future =
download_block_with_retry(&block_hash, &moved_bitcoin_config, &moved_ctx);
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
let _ = block_data_tx.send(Some(block_data));
});
@@ -581,9 +699,10 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
let _ = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
while let Ok(Some(block_data)) = block_data_rx.recv() {
while let Ok(Some(downloaded_block)) = block_data_rx.recv() {
let block_compressed_tx_moved = block_compressed_tx.clone();
compress_block_data_pool.execute(move || {
let block_data = parse_downloaded_block(downloaded_block).unwrap();
let compressed_block = CompactedBlock::from_full_block(&block_data);
let block_index = block_data.height as u32;
let _ = block_compressed_tx_moved.send(Some((
@@ -604,25 +723,15 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
let mut inbox = HashMap::new();
while let Ok(Some((block_height, compacted_block, raw_block))) = block_compressed_rx.recv() {
insert_entry_in_blocks(block_height, &compacted_block, &rw_hord_db_conn, &ctx);
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &ctx);
blocks_stored += 1;
// println!("{} < {}", raw_block.height, cursor);
// Early return, only considering blocks after 1st inscription
// if raw_block.height < cursor {
// continue;
// }
// let block_height = raw_block.height;
inbox.insert(raw_block.height, raw_block);
// In the context of ordinals, we're constrained to process blocks sequentially
// Blocks are processed by a threadpool and could be coming out of order.
// Inbox block for later if the current block is not the one we should be
// processing.
// if block_height != cursor {
// continue;
// }
// Is the action of processing a block allows us
// to process more blocks present in the inbox?
@@ -641,7 +750,8 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
&mut new_block,
&rw_hord_db_conn,
blocks_db_rw,
&inscriptions_db_conn_rw,
false,
&hord_db_path,
&ctx,
@@ -692,7 +802,7 @@ impl TraversalResult {
}
pub fn retrieve_satoshi_point_using_local_storage(
hord_db_conn: &Connection,
blocks_db: &DB,
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
inscription_number: u64,
@@ -717,18 +827,17 @@ pub fn retrieve_satoshi_point_using_local_storage(
let mut hops: u32 = 0;
loop {
hops += 1;
let res = match find_compacted_block_at_block_height(ordinal_block_number, &hord_db_conn) {
let res = match find_block_at_block_height(ordinal_block_number, &blocks_db) {
Some(res) => res,
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
}
};
let coinbase_txid = &res.0 .0 .0;
let txid = tx_cursor.0;
// ctx.try_log(|logger| {
// slog::debug!(
// slog::info!(
// logger,
// "{ordinal_block_number}:{:?}:{:?}",
// hex::encode(&coinbase_txid),
@@ -737,7 +846,6 @@ pub fn retrieve_satoshi_point_using_local_storage(
// });
// to remove
//std::thread::sleep(std::time::Duration::from_millis(300));
// evaluate exit condition: did we reach the **final** coinbase transaction
if coinbase_txid.eq(&txid) {

View File

@@ -8,6 +8,7 @@ use chainhook_types::{
BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
};
use hiro_system_kit::slog;
use rocksdb::DB;
use rusqlite::Connection;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::path::PathBuf;
@@ -28,17 +29,18 @@ use crate::{
};
use self::db::{
open_readonly_hord_db_conn, remove_entry_from_blocks, remove_entry_from_inscriptions,
open_readonly_hord_db_conn_rocks_db, remove_entry_from_blocks, remove_entry_from_inscriptions,
TraversalResult, WatchedSatpoint,
};
pub fn revert_hord_db_with_augmented_bitcoin_block(
block: &BitcoinBlockData,
rw_hord_db_conn: &Connection,
blocks_db_rw: &DB,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) -> Result<(), String> {
// Remove block from
remove_entry_from_blocks(block.block_identifier.index as u32, &rw_hord_db_conn, ctx);
remove_entry_from_blocks(block.block_identifier.index as u32, &blocks_db_rw, ctx);
for tx_index in 1..=block.transactions.len() {
// Undo the changes in reverse order
let tx = &block.transactions[block.transactions.len() - tx_index];
@@ -46,7 +48,11 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
match ordinal_event {
OrdinalOperation::InscriptionRevealed(data) => {
// We remove any new inscription created
remove_entry_from_inscriptions(&data.inscription_id, &rw_hord_db_conn, ctx);
remove_entry_from_inscriptions(
&data.inscription_id,
&inscriptions_db_conn_rw,
ctx,
);
}
OrdinalOperation::InscriptionTransferred(data) => {
// We revert the outpoint to the pre-transfer value
@@ -59,7 +65,7 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
&&data.inscription_id,
&outpoint_pre_transfer,
offset_pre_transfer,
&rw_hord_db_conn,
&inscriptions_db_conn_rw,
&ctx,
);
}
@@ -71,7 +77,8 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
pub fn update_hord_db_and_augment_bitcoin_block(
new_block: &mut BitcoinBlockData,
rw_hord_db_conn: &Connection,
blocks_db_rw: &DB,
inscriptions_db_conn_rw: &Connection,
write_block: bool,
hord_db_path: &PathBuf,
ctx: &Context,
@@ -89,7 +96,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
insert_entry_in_blocks(
new_block.block_identifier.index as u32,
&compacted_block,
&rw_hord_db_conn,
&blocks_db_rw,
&ctx,
);
}
@@ -114,11 +121,10 @@ pub fn update_hord_db_and_augment_bitcoin_block(
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = ctx.clone();
let block_identifier = new_block.block_identifier.clone();
let hord_db_path = hord_db_path.clone();
let blocks_db = open_readonly_hord_db_conn_rocks_db(hord_db_path, &ctx)?;
traversal_data_pool.execute(move || {
let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &moved_ctx).unwrap();
let traversal = retrieve_satoshi_point_using_local_storage(
&hord_db_conn,
&blocks_db,
&block_identifier,
&transaction_id,
0,
@@ -139,19 +145,19 @@ pub fn update_hord_db_and_augment_bitcoin_block(
}
}
let mut storage = Storage::Sqlite(rw_hord_db_conn);
let mut storage = Storage::Sqlite(inscriptions_db_conn_rw);
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
new_block,
&mut storage,
&traversals,
rw_hord_db_conn,
&inscriptions_db_conn_rw,
&ctx,
);
// Have inscriptions been transfered?
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data(
new_block,
&mut Storage::Sqlite(rw_hord_db_conn),
&mut storage,
&ctx,
)?;
Ok(())
@@ -167,7 +173,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
block: &mut BitcoinBlockData,
storage: &mut Storage,
traversals: &HashMap<TransactionIdentifier, TraversalResult>,
hord_db_conn: &Connection,
inscription_db_conn: &Connection,
ctx: &Context,
) {
for new_tx in block.transactions.iter_mut().skip(1) {
@@ -196,7 +202,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
Storage::Sqlite(rw_hord_db_conn) => {
if let Some(_entry) = find_inscription_with_ordinal_number(
&traversal.ordinal_number,
&hord_db_conn,
&inscription_db_conn,
&ctx,
) {
ctx.try_log(|logger| {
@@ -211,7 +217,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
ordinals_events_indexes_to_discard.push_front(ordinal_event_index);
} else {
inscription.inscription_number =
match find_latest_inscription_number(&hord_db_conn, &ctx) {
match find_latest_inscription_number(&inscription_db_conn, &ctx) {
Ok(inscription_number) => inscription_number + 1,
Err(e) => {
ctx.try_log(|logger| {

View File

@@ -148,14 +148,14 @@ pub struct RewardParticipant {
amt: u64,
}
pub async fn retrieve_full_block_breakdown_with_retry(
pub async fn download_and_parse_block_with_retry(
block_hash: &str,
bitcoin_config: &BitcoinConfig,
ctx: &Context,
) -> Result<BitcoinBlockFullBreakdown, String> {
let mut errors_count = 0;
let block = loop {
match retrieve_full_block_breakdown(block_hash, bitcoin_config, ctx).await {
match download_and_parse_block(block_hash, bitcoin_config, ctx).await {
Ok(result) => break result,
Err(e) => {
errors_count += 1;
@@ -170,6 +170,28 @@ pub async fn retrieve_full_block_breakdown_with_retry(
Ok(block)
}
pub async fn download_block_with_retry(
block_hash: &str,
bitcoin_config: &BitcoinConfig,
ctx: &Context,
) -> Result<Vec<u8>, String> {
let mut errors_count = 0;
let block = loop {
match download_block(block_hash, bitcoin_config, ctx).await {
Ok(result) => break result,
Err(e) => {
errors_count += 1;
error!(
"unable to retrieve block #{block_hash} (attempt #{errors_count}): {}",
e.to_string()
);
std::thread::sleep(std::time::Duration::from_millis(500));
}
}
};
Ok(block)
}
pub async fn retrieve_block_hash_with_retry(
block_height: &u64,
bitcoin_config: &BitcoinConfig,
@@ -192,11 +214,11 @@ pub async fn retrieve_block_hash_with_retry(
Ok(block_hash)
}
pub async fn retrieve_full_block_breakdown(
pub async fn download_block(
block_hash: &str,
bitcoin_config: &BitcoinConfig,
_ctx: &Context,
) -> Result<BitcoinBlockFullBreakdown, String> {
) -> Result<Vec<u8>, String> {
use reqwest::Client as HttpClient;
let body = json!({
"jsonrpc": "1.0",
@@ -217,15 +239,32 @@ pub async fn retrieve_full_block_breakdown(
.send()
.await
.map_err(|e| format!("unable to send request ({})", e))?
.json::<bitcoincore_rpc::jsonrpc::Response>()
.bytes()
.await
.map_err(|e| format!("unable to parse response ({})", e))?
.result::<BitcoinBlockFullBreakdown>()
.map_err(|e| format!("unable to parse response ({})", e))?;
.map_err(|e| format!("unable to get bytes ({})", e))?
.to_vec();
Ok(block)
}
pub fn parse_downloaded_block(
downloaded_block: Vec<u8>,
) -> Result<BitcoinBlockFullBreakdown, String> {
let block = serde_json::from_slice::<bitcoincore_rpc::jsonrpc::Response>(&downloaded_block[..])
.map_err(|e| format!("unable to parse jsonrpc payload ({})", e))?
.result::<BitcoinBlockFullBreakdown>()
.map_err(|e| format!("unable to parse block ({})", e))?;
Ok(block)
}
pub async fn download_and_parse_block(
block_hash: &str,
bitcoin_config: &BitcoinConfig,
_ctx: &Context,
) -> Result<BitcoinBlockFullBreakdown, String> {
let response = download_block(block_hash, bitcoin_config, _ctx).await?;
parse_downloaded_block(response)
}
pub async fn retrieve_block_hash(
block_height: &u64,
bitcoin_config: &BitcoinConfig,
@@ -580,8 +619,17 @@ fn try_parse_stacks_operation(
Err(_) => None,
};
}
// let mining_address_pre_commit = match inputs[0].script_sig {
// Some(script) => match script.script() {
// }
// }
// mining_address_post_commit = match inputs.first().and_then(|i| i.script_sig).and_then(|s| s.script()).script_pub_key.script() {
// Ok(script) => Address::from_script(&script, bitcoin::Network::Bitcoin).and_then(|a| Ok(a.to_string())).ok(),
// Err(_) => None
// };
let pox_cycle_index = pox_config.get_pox_cycle_id(block_height);
let pox_cycle_length = pox_config.get_pox_cycle_len();

View File

@@ -10,12 +10,12 @@ use crate::chainhooks::types::{
ChainhookConfig, ChainhookFullSpecification, ChainhookSpecification,
};
use crate::hord::db::open_readwrite_hord_db_conn;
use crate::hord::db::{open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db};
use crate::hord::{
revert_hord_db_with_augmented_bitcoin_block, update_hord_db_and_augment_bitcoin_block,
};
use crate::indexer::bitcoin::{
retrieve_full_block_breakdown_with_retry, standardize_bitcoin_block, BitcoinBlockFullBreakdown,
download_and_parse_block_with_retry, standardize_bitcoin_block, BitcoinBlockFullBreakdown,
NewBitcoinBlock,
};
use crate::indexer::fork_scratch_pad::ForkScratchPad;
@@ -452,7 +452,7 @@ pub async fn start_event_observer(
};
let block_hash = hex::encode(message.get(1).unwrap().to_vec());
let block = match retrieve_full_block_breakdown_with_retry(
let block = match download_and_parse_block_with_retry(
&block_hash,
&bitcoin_config,
&ctx_moved,
@@ -464,7 +464,7 @@ pub async fn start_event_observer(
ctx_moved.try_log(|logger| {
slog::warn!(
logger,
"unable to retrieve_full_block_breakdown: {}",
"unable to download_and_parse_block: {}",
e.to_string()
)
});
@@ -557,7 +557,7 @@ pub fn gather_proofs<'a>(
ctx.try_log(|logger| {
slog::info!(
logger,
"collecting proof for transaction {}",
"Collecting proof for transaction {}",
transaction.transaction_identifier.hash
)
});
@@ -646,7 +646,27 @@ pub async fn start_observer_commands_handler(
BlockchainEvent::BlockchainUpdatedWithHeaders(data) => {
let mut new_blocks = vec![];
let mut confirmed_blocks = vec![];
let rw_hord_db_conn =
let blocks_db = match open_readwrite_hord_db_conn_rocks_db(
&config.get_cache_path_buf(),
&ctx,
) {
Ok(conn) => conn,
Err(e) => {
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::Error(format!(
"Channel error: {:?}",
e
)));
} else {
ctx.try_log(|logger| {
slog::error!(logger, "Unable to open readwtite connection",)
});
}
continue;
}
};
let inscriptions_db_conn_rw =
match open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx) {
Ok(conn) => conn,
Err(e) => {
@@ -666,12 +686,14 @@ pub async fn start_observer_commands_handler(
continue;
}
};
for header in data.new_headers.iter() {
match bitcoin_block_store.get_mut(&header.block_identifier) {
Some(block) => {
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block,
&rw_hord_db_conn,
&blocks_db,
&inscriptions_db_conn_rw,
true,
&config.get_cache_path_buf(),
&ctx,
@@ -726,7 +748,27 @@ pub async fn start_observer_commands_handler(
let mut blocks_to_rollback = vec![];
let mut confirmed_blocks = vec![];
let rw_hord_db_conn =
let blocks_db = match open_readwrite_hord_db_conn_rocks_db(
&config.get_cache_path_buf(),
&ctx,
) {
Ok(conn) => conn,
Err(e) => {
if let Some(ref tx) = observer_events_tx {
let _ = tx.send(ObserverEvent::Error(format!(
"Channel error: {:?}",
e
)));
} else {
ctx.try_log(|logger| {
slog::error!(logger, "Unable to open readwtite connection",)
});
}
continue;
}
};
let inscriptions_db_conn_rw =
match open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx) {
Ok(conn) => conn,
Err(e) => {
@@ -752,7 +794,8 @@ pub async fn start_observer_commands_handler(
Some(block) => {
if let Err(e) = revert_hord_db_with_augmented_bitcoin_block(
block,
&rw_hord_db_conn,
&blocks_db,
&inscriptions_db_conn_rw,
&ctx,
) {
ctx.try_log(|logger| {
@@ -782,7 +825,8 @@ pub async fn start_observer_commands_handler(
Some(block) => {
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block,
&rw_hord_db_conn,
&blocks_db,
&inscriptions_db_conn_rw,
true,
&config.get_cache_path_buf(),
&ctx,
@@ -1298,23 +1342,22 @@ pub async fn handle_new_bitcoin_block(
// into account the last 7 blocks.
let block_hash = bitcoin_block.burn_block_hash.strip_prefix("0x").unwrap();
let block =
match retrieve_full_block_breakdown_with_retry(block_hash, bitcoin_config, ctx).await {
Ok(block) => block,
Err(e) => {
ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to retrieve_full_block_breakdown: {}",
e.to_string()
)
});
return Json(json!({
"status": 500,
"result": "unable to retrieve_full_block",
}));
}
};
let block = match download_and_parse_block_with_retry(block_hash, bitcoin_config, ctx).await {
Ok(block) => block,
Err(e) => {
ctx.try_log(|logger| {
slog::warn!(
logger,
"unable to download_and_parse_block: {}",
e.to_string()
)
});
return Json(json!({
"status": 500,
"result": "unable to retrieve_full_block",
}));
}
};
let header = block.get_block_header();
match background_job_tx.lock() {