mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-13 16:19:01 +08:00
feat: migration to rocksdb, moving json parsing from networking thread
This commit is contained in:
183
Cargo.lock
generated
183
Cargo.lock
generated
@@ -272,6 +272,26 @@ version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "383d29d513d8764dcdc42ea295d979eb99c3c9f00607b3692cf68a431f7dca72"
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.64.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"lazy_static",
|
||||
"lazycell",
|
||||
"peeking_take_while",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
"rustc-hash",
|
||||
"shlex",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bip39"
|
||||
version = "1.0.1"
|
||||
@@ -421,6 +441,17 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
|
||||
|
||||
[[package]]
|
||||
name = "bzip2-sys"
|
||||
version = "0.1.11+1.0.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cast"
|
||||
version = "0.3.0"
|
||||
@@ -432,6 +463,18 @@ name = "cc"
|
||||
version = "1.0.77"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e9f73505338f7d905b19d18738976aae232eb46b8efc15554ffc56deb5d9ebe4"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cexpr"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
|
||||
dependencies = [
|
||||
"nom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
@@ -507,6 +550,7 @@ dependencies = [
|
||||
"reqwest",
|
||||
"rocket",
|
||||
"rocket_okapi",
|
||||
"rocksdb",
|
||||
"rusqlite",
|
||||
"schemars",
|
||||
"serde",
|
||||
@@ -518,6 +562,8 @@ dependencies = [
|
||||
"threadpool",
|
||||
"tokio",
|
||||
"toml",
|
||||
"zerocopy",
|
||||
"zerocopy-derive",
|
||||
"zeromq",
|
||||
]
|
||||
|
||||
@@ -570,6 +616,17 @@ dependencies = [
|
||||
"generic-array 0.14.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clang-sys"
|
||||
version = "1.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c688fc74432808e3eb684cae8830a86be1d66a2bd58e1f248ed0960a590baf6f"
|
||||
dependencies = [
|
||||
"glob",
|
||||
"libc",
|
||||
"libloading",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "2.34.0"
|
||||
@@ -2040,6 +2097,15 @@ version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc"
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.60"
|
||||
@@ -2076,12 +2142,43 @@ version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||
|
||||
[[package]]
|
||||
name = "lazycell"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.138"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
version = "0.7.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "librocksdb-sys"
|
||||
version = "0.10.0+7.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fe4d5874f5ff2bc616e55e8c6086d478fcda13faf9495768a4aa1c22042d30b"
|
||||
dependencies = [
|
||||
"bindgen",
|
||||
"bzip2-sys",
|
||||
"cc",
|
||||
"glob",
|
||||
"libc",
|
||||
"libz-sys",
|
||||
"lz4-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libsecp256k1"
|
||||
version = "0.3.5"
|
||||
@@ -2205,6 +2302,17 @@ dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libz-sys"
|
||||
version = "1.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "link-cplusplus"
|
||||
version = "1.0.7"
|
||||
@@ -2254,6 +2362,16 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lz4-sys"
|
||||
version = "1.9.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.1.0"
|
||||
@@ -2314,6 +2432,12 @@ version = "0.3.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
|
||||
|
||||
[[package]]
|
||||
name = "minimal-lexical"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.6.2"
|
||||
@@ -2422,6 +2546,16 @@ version = "0.1.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
@@ -2672,6 +2806,12 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "peeking_take_while"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "2.2.0"
|
||||
@@ -3288,6 +3428,16 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rocksdb"
|
||||
version = "0.20.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "015439787fce1e75d55f279078d33ff14b4af5d93d995e8838ee4631301c8a99"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"librocksdb-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rstest"
|
||||
version = "0.11.0"
|
||||
@@ -3334,6 +3484,12 @@ version = "0.1.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||
|
||||
[[package]]
|
||||
name = "rustc_version"
|
||||
version = "0.2.3"
|
||||
@@ -3772,6 +3928,12 @@ dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "shlex"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.0"
|
||||
@@ -4968,6 +5130,27 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"zerocopy-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy-derive"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zeroize"
|
||||
version = "1.5.7"
|
||||
|
||||
@@ -13,10 +13,10 @@ use chainhook_event_observer::chainhooks::types::{
|
||||
StacksPrintEventBasedPredicate,
|
||||
};
|
||||
use chainhook_event_observer::hord::db::{
|
||||
delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db, find_all_inscriptions,
|
||||
find_compacted_block_at_block_height, find_inscriptions_at_wached_outpoint,
|
||||
find_latest_compacted_block_known, initialize_hord_db, open_readonly_hord_db_conn,
|
||||
open_readwrite_hord_db_conn, patch_inscription_number,
|
||||
delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db, find_block_at_block_height_sqlite,
|
||||
find_inscriptions_at_wached_outpoint, find_last_block_inserted, initialize_hord_db,
|
||||
insert_entry_in_blocks, open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db,
|
||||
open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
|
||||
retrieve_satoshi_point_using_local_storage,
|
||||
};
|
||||
use chainhook_event_observer::observer::BitcoinConfig;
|
||||
@@ -173,6 +173,9 @@ struct StartCommand {
|
||||
/// Start REST API for managing predicates
|
||||
#[clap(long = "start-http-api")]
|
||||
pub start_http_api: bool,
|
||||
/// Disable hord indexing
|
||||
#[clap(long = "no-hord")]
|
||||
pub hord_disabled: bool,
|
||||
}
|
||||
|
||||
#[derive(Subcommand, PartialEq, Clone, Debug)]
|
||||
@@ -187,9 +190,6 @@ enum HordCommand {
|
||||
|
||||
#[derive(Subcommand, PartialEq, Clone, Debug)]
|
||||
enum DbCommand {
|
||||
/// Init hord db
|
||||
#[clap(name = "init", bin_name = "init")]
|
||||
Init(InitHordDbCommand),
|
||||
/// Rewrite hord db
|
||||
#[clap(name = "rewrite", bin_name = "rewrite")]
|
||||
Rewrite(UpdateHordDbCommand),
|
||||
@@ -260,15 +260,6 @@ struct FindInscriptionCommand {
|
||||
pub db_path: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Parser, PartialEq, Clone, Debug)]
|
||||
struct InitHordDbCommand {
|
||||
/// # of Networking thread
|
||||
pub network_threads: usize,
|
||||
/// Load config file path
|
||||
#[clap(long = "config-path")]
|
||||
pub config_path: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Parser, PartialEq, Clone, Debug)]
|
||||
struct UpdateHordDbCommand {
|
||||
/// Starting block
|
||||
@@ -344,12 +335,37 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
if cmd.predicates_paths.len() > 0 && !cmd.start_http_api {
|
||||
config.chainhooks.enable_http_api = false;
|
||||
}
|
||||
let mut service = Service::new(config, ctx);
|
||||
let predicates = cmd
|
||||
.predicates_paths
|
||||
.iter()
|
||||
.map(|p| load_predicate_from_path(p))
|
||||
.collect::<Result<Vec<ChainhookFullSpecification>, _>>()?;
|
||||
|
||||
info!(ctx.expect_logger(), "Starting service...",);
|
||||
|
||||
if !cmd.hord_disabled {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Ordinal indexing is enabled by default hord, checking index... (use --no-hord to disable ordinals)"
|
||||
);
|
||||
|
||||
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
|
||||
if start_block == 0 {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Initializing hord indexing from block #{}", start_block
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Resuming hord indexing from block #{}", start_block
|
||||
);
|
||||
}
|
||||
perform_hord_db_update(start_block, end_block, 8, &config, &ctx).await?;
|
||||
}
|
||||
}
|
||||
|
||||
let mut service = Service::new(config, ctx);
|
||||
return service.run(predicates).await;
|
||||
}
|
||||
},
|
||||
@@ -498,10 +514,10 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
|
||||
|
||||
let hord_db_conn =
|
||||
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
|
||||
|
||||
let tip_height = find_latest_compacted_block_known(&hord_db_conn) as u64;
|
||||
open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
|
||||
.unwrap();
|
||||
|
||||
let tip_height = find_last_block_inserted(&hord_db_conn) as u64;
|
||||
if cmd.block_height > tip_height {
|
||||
perform_hord_db_update(tip_height, cmd.block_height, 8, &config, &ctx).await?;
|
||||
}
|
||||
@@ -538,81 +554,48 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
}
|
||||
},
|
||||
Command::Hord(HordCommand::Db(subcmd)) => match subcmd {
|
||||
DbCommand::Init(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
let auth = Auth::UserPass(
|
||||
config.network.bitcoind_rpc_username.clone(),
|
||||
config.network.bitcoind_rpc_password.clone(),
|
||||
);
|
||||
|
||||
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
|
||||
Ok(con) => con,
|
||||
Err(message) => {
|
||||
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
|
||||
}
|
||||
};
|
||||
|
||||
let end_block = match bitcoin_rpc.get_blockchain_info() {
|
||||
Ok(result) => result.blocks,
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"unable to retrieve Bitcoin chain tip ({})",
|
||||
e.to_string()
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let hord_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
let start_block = find_latest_compacted_block_known(&hord_db_conn) as u64;
|
||||
if start_block == 0 {
|
||||
let _ = initialize_hord_db(&config.expected_cache_path(), &ctx);
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Resuming hord indexing from block #{}", start_block
|
||||
);
|
||||
}
|
||||
|
||||
perform_hord_db_update(start_block, end_block, cmd.network_threads, &config, &ctx)
|
||||
.await?;
|
||||
}
|
||||
DbCommand::Sync(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
let auth = Auth::UserPass(
|
||||
config.network.bitcoind_rpc_username.clone(),
|
||||
config.network.bitcoind_rpc_password.clone(),
|
||||
);
|
||||
|
||||
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
|
||||
Ok(con) => con,
|
||||
Err(message) => {
|
||||
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
|
||||
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
|
||||
if start_block == 0 {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Initializing hord indexing from block #{}", start_block
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Resuming hord indexing from block #{}", start_block
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let hord_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
|
||||
let start_block = find_latest_compacted_block_known(&hord_db_conn) as u64;
|
||||
|
||||
let end_block = match bitcoin_rpc.get_blockchain_info() {
|
||||
Ok(result) => result.blocks,
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"unable to retrieve Bitcoin chain tip ({})",
|
||||
e.to_string()
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
perform_hord_db_update(start_block, end_block, cmd.network_threads, &config, &ctx)
|
||||
perform_hord_db_update(
|
||||
start_block,
|
||||
end_block,
|
||||
cmd.network_threads,
|
||||
&config,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
info!(ctx.expect_logger(), "Database hord up to date");
|
||||
}
|
||||
}
|
||||
DbCommand::Rewrite(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
// Delete data, if any
|
||||
let rw_hord_db_conn =
|
||||
let blocks_db_rw =
|
||||
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let inscriptions_db_conn_rw =
|
||||
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
delete_data_in_hord_db(cmd.start_block, cmd.end_block, &rw_hord_db_conn, &ctx)?;
|
||||
|
||||
delete_data_in_hord_db(
|
||||
cmd.start_block,
|
||||
cmd.end_block,
|
||||
&blocks_db_rw,
|
||||
&inscriptions_db_conn_rw,
|
||||
&ctx,
|
||||
)?;
|
||||
|
||||
// Update data
|
||||
perform_hord_db_update(
|
||||
cmd.start_block,
|
||||
@@ -625,9 +608,18 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
}
|
||||
DbCommand::Drop(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
let rw_hord_db_conn =
|
||||
let blocks_db =
|
||||
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let inscriptions_db_conn_rw =
|
||||
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
delete_data_in_hord_db(cmd.start_block, cmd.end_block, &rw_hord_db_conn, &ctx)?;
|
||||
|
||||
delete_data_in_hord_db(
|
||||
cmd.start_block,
|
||||
cmd.end_block,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
&ctx,
|
||||
)?;
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Cleaning hord_db: {} blocks dropped",
|
||||
@@ -636,40 +628,21 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
}
|
||||
DbCommand::Patch(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
let rw_hord_db_conn =
|
||||
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
let sqlite_db_conn =
|
||||
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
|
||||
let inscriptions_per_blocks = find_all_inscriptions(&rw_hord_db_conn);
|
||||
let mut inscription_number = 0;
|
||||
for (block_height, inscriptions) in inscriptions_per_blocks.iter() {
|
||||
let block = match find_compacted_block_at_block_height(
|
||||
*block_height as u32,
|
||||
&rw_hord_db_conn,
|
||||
) {
|
||||
Some(block) => block,
|
||||
None => continue,
|
||||
};
|
||||
let blocks_db =
|
||||
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
|
||||
for (txid, _) in inscriptions.iter() {
|
||||
for (txid_n, _, _) in block.0 .1.iter() {
|
||||
if txid.hash[2..10].eq(&hex::encode(txid_n)) {
|
||||
let inscription_id = format!("{}i0", &txid.hash[2..]);
|
||||
patch_inscription_number(
|
||||
&inscription_id,
|
||||
inscription_number,
|
||||
&rw_hord_db_conn,
|
||||
&ctx,
|
||||
);
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Patch inscription_number: {}\t{}\t({})",
|
||||
inscription_id,
|
||||
inscription_number,
|
||||
block_height
|
||||
);
|
||||
}
|
||||
for i in 0..774940 {
|
||||
match find_block_at_block_height_sqlite(i, &sqlite_db_conn) {
|
||||
Some(block) => {
|
||||
insert_entry_in_blocks(i, &block, &blocks_db, &ctx);
|
||||
println!("Block #{} inserted", i);
|
||||
}
|
||||
None => {
|
||||
println!("Block #{} missing", i)
|
||||
}
|
||||
inscription_number += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -678,6 +651,45 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
|
||||
let auth = Auth::UserPass(
|
||||
config.network.bitcoind_rpc_username.clone(),
|
||||
config.network.bitcoind_rpc_password.clone(),
|
||||
);
|
||||
|
||||
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
|
||||
Ok(con) => con,
|
||||
Err(message) => {
|
||||
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
|
||||
}
|
||||
};
|
||||
|
||||
let start_block = {
|
||||
let blocks_db = open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
find_last_block_inserted(&blocks_db) as u64
|
||||
};
|
||||
|
||||
if start_block == 0 {
|
||||
let _ = initialize_hord_db(&config.expected_cache_path(), &ctx);
|
||||
}
|
||||
|
||||
let end_block = match bitcoin_rpc.get_blockchain_info() {
|
||||
Ok(result) => result.blocks,
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"unable to retrieve Bitcoin chain tip ({})",
|
||||
e.to_string()
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if start_block < end_block {
|
||||
Ok(Some((start_block, end_block)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn perform_hord_db_update(
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
@@ -698,11 +710,13 @@ pub async fn perform_hord_db_update(
|
||||
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
|
||||
};
|
||||
|
||||
let rw_hord_db_conn = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
let blocks_db = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let inscriptions_db_conn_rw = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
|
||||
let _ = fetch_and_cache_blocks_in_hord_db(
|
||||
&bitcoin_config,
|
||||
&rw_hord_db_conn,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
start_block,
|
||||
end_block,
|
||||
network_threads,
|
||||
|
||||
@@ -9,8 +9,9 @@ use chainhook_event_observer::chainhooks::types::{
|
||||
BitcoinChainhookFullSpecification, BitcoinPredicateType, Protocols,
|
||||
};
|
||||
use chainhook_event_observer::hord::db::{
|
||||
fetch_and_cache_blocks_in_hord_db, find_all_inscriptions, find_compacted_block_at_block_height,
|
||||
find_latest_compacted_block_known, open_readonly_hord_db_conn, open_readwrite_hord_db_conn,
|
||||
fetch_and_cache_blocks_in_hord_db, find_all_inscriptions, find_block_at_block_height,
|
||||
find_last_block_inserted, open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db,
|
||||
open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
|
||||
};
|
||||
use chainhook_event_observer::hord::{
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data,
|
||||
@@ -18,7 +19,7 @@ use chainhook_event_observer::hord::{
|
||||
};
|
||||
use chainhook_event_observer::indexer;
|
||||
use chainhook_event_observer::indexer::bitcoin::{
|
||||
retrieve_block_hash_with_retry, retrieve_full_block_breakdown_with_retry,
|
||||
download_and_parse_block_with_retry, retrieve_block_hash_with_retry,
|
||||
};
|
||||
use chainhook_event_observer::observer::{gather_proofs, EventObserverConfig};
|
||||
use chainhook_event_observer::utils::{file_append, send_request, Context};
|
||||
@@ -83,11 +84,17 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
|
||||
if let BitcoinPredicateType::Protocol(Protocols::Ordinal(_)) = &predicate_spec.predicate {
|
||||
is_predicate_evaluating_ordinals = true;
|
||||
if let Ok(hord_db_conn) = open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx) {
|
||||
inscriptions_cache = find_all_inscriptions(&hord_db_conn);
|
||||
if let Ok(inscriptions_db_conn) =
|
||||
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)
|
||||
{
|
||||
inscriptions_cache = find_all_inscriptions(&inscriptions_db_conn);
|
||||
// Will we have to update the blocks table?
|
||||
if find_compacted_block_at_block_height(end_block as u32, &hord_db_conn).is_none() {
|
||||
hord_blocks_requires_update = true;
|
||||
if let Ok(blocks_db) =
|
||||
open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
|
||||
{
|
||||
if find_block_at_block_height(end_block as u32, &blocks_db).is_none() {
|
||||
hord_blocks_requires_update = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -103,20 +110,23 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
// TODO: make sure that we have a contiguous chain
|
||||
// check_compacted_blocks_chain_integrity(&hord_db_conn);
|
||||
|
||||
let hord_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;
|
||||
let blocks_db_rw =
|
||||
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
|
||||
|
||||
let start_block = find_latest_compacted_block_known(&hord_db_conn) as u64;
|
||||
let start_block = find_last_block_inserted(&blocks_db_rw) as u64;
|
||||
if start_block < end_block {
|
||||
warn!(
|
||||
ctx.expect_logger(),
|
||||
"Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks",
|
||||
(end_block - start_block)
|
||||
);
|
||||
let rw_hord_db_conn =
|
||||
|
||||
let inscriptions_db_conn_rw =
|
||||
open_readwrite_hord_db_conn(&config.expected_cache_path(), ctx)?;
|
||||
fetch_and_cache_blocks_in_hord_db(
|
||||
&config.get_event_observer_config().get_bitcoin_config(),
|
||||
&rw_hord_db_conn,
|
||||
&blocks_db_rw,
|
||||
&inscriptions_db_conn_rw,
|
||||
start_block,
|
||||
end_block,
|
||||
8,
|
||||
@@ -125,7 +135,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
)
|
||||
.await?;
|
||||
|
||||
inscriptions_cache = find_all_inscriptions(&hord_db_conn);
|
||||
inscriptions_cache = find_all_inscriptions(&inscriptions_db_conn_rw);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -158,7 +168,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
|
||||
let block_hash = retrieve_block_hash_with_retry(&cursor, &bitcoin_config, ctx).await?;
|
||||
let block_breakdown =
|
||||
retrieve_full_block_breakdown_with_retry(&block_hash, &bitcoin_config, ctx).await?;
|
||||
download_and_parse_block_with_retry(&block_hash, &bitcoin_config, ctx).await?;
|
||||
let mut block = indexer::bitcoin::standardize_bitcoin_block(
|
||||
block_breakdown,
|
||||
&event_observer_config.bitcoin_network,
|
||||
@@ -204,7 +214,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
blocks_scanned += 1;
|
||||
let block_hash = retrieve_block_hash_with_retry(&cursor, &bitcoin_config, ctx).await?;
|
||||
let block_breakdown =
|
||||
retrieve_full_block_breakdown_with_retry(&block_hash, &bitcoin_config, ctx).await?;
|
||||
download_and_parse_block_with_retry(&block_hash, &bitcoin_config, ctx).await?;
|
||||
let block = indexer::bitcoin::standardize_bitcoin_block(
|
||||
block_breakdown,
|
||||
&event_observer_config.bitcoin_network,
|
||||
|
||||
@@ -51,6 +51,13 @@ rand = "0.8.5"
|
||||
hex-simd = "0.8.0"
|
||||
serde_cbor = "0.11.2"
|
||||
zeromq = { version = "*", default-features = false, features = ["tokio-runtime", "tcp-transport"] }
|
||||
zerocopy = "0.6.1"
|
||||
zerocopy-derive = "0.3.2"
|
||||
|
||||
[dependencies.rocksdb]
|
||||
version = "0.20.1"
|
||||
default-features = false
|
||||
features = ["lz4", "snappy"]
|
||||
|
||||
[replace]
|
||||
"jsonrpc:0.13.0" = { git = 'https://github.com/apoelstra/rust-jsonrpc', rev = "1063671f122a8985c1b7c29030071253da515839" }
|
||||
|
||||
@@ -8,12 +8,13 @@ use chainhook_types::{
|
||||
};
|
||||
use hiro_system_kit::slog;
|
||||
|
||||
use rocksdb::DB;
|
||||
use rusqlite::{Connection, OpenFlags, ToSql};
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
use crate::{
|
||||
indexer::bitcoin::{
|
||||
retrieve_block_hash_with_retry, retrieve_full_block_breakdown_with_retry,
|
||||
download_block_with_retry, parse_downloaded_block, retrieve_block_hash_with_retry,
|
||||
standardize_bitcoin_block, BitcoinBlockFullBreakdown,
|
||||
},
|
||||
observer::BitcoinConfig,
|
||||
@@ -47,15 +48,6 @@ pub fn open_readwrite_hord_db_conn(
|
||||
|
||||
pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
|
||||
let conn = create_or_open_readwrite_db(path, ctx);
|
||||
if let Err(e) = conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS blocks (
|
||||
id INTEGER NOT NULL PRIMARY KEY,
|
||||
compacted_bytes TEXT NOT NULL
|
||||
)",
|
||||
[],
|
||||
) {
|
||||
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
|
||||
}
|
||||
if let Err(e) = conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS inscriptions (
|
||||
inscription_id TEXT NOT NULL PRIMARY KEY,
|
||||
@@ -121,7 +113,7 @@ fn create_or_open_readwrite_db(cache_path: &PathBuf, ctx: &Context) -> Connectio
|
||||
// db.busy_handler(Some(tx_busy_handler))?;
|
||||
|
||||
let mmap_size: i64 = 256 * 1024 * 1024;
|
||||
let page_size: i64 = 32768;
|
||||
let page_size: i64 = 16384;
|
||||
conn.pragma_update(None, "mmap_size", mmap_size).unwrap();
|
||||
conn.pragma_update(None, "page_size", page_size).unwrap();
|
||||
conn.pragma_update(None, "journal_mode", &"WAL").unwrap();
|
||||
@@ -157,15 +149,48 @@ fn open_existing_readonly_db(path: &PathBuf, ctx: &Context) -> Connection {
|
||||
return conn;
|
||||
}
|
||||
|
||||
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
|
||||
// #[repr(C)]
|
||||
// pub struct T {
|
||||
// ci: [u8; 4],
|
||||
// cv: u64,
|
||||
// t: Vec<Tx>,
|
||||
// }
|
||||
|
||||
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
|
||||
// #[repr(C, packed)]
|
||||
// pub struct Tx {
|
||||
// t: [u8; 4],
|
||||
// i: TxIn,
|
||||
// o: TxOut,
|
||||
// }
|
||||
|
||||
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
|
||||
// #[repr(C, packed)]
|
||||
// pub struct TxIn {
|
||||
// i: [u8; 4],
|
||||
// b: u32,
|
||||
// o: u16,
|
||||
// v: u64
|
||||
// }
|
||||
|
||||
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
|
||||
// #[repr(C, packed)]
|
||||
// pub struct TxOut {
|
||||
// v: u64,
|
||||
// }
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
// pub struct CompactedBlock(Vec<(Vec<(u32, u16, u64)>, Vec<u64>)>);
|
||||
#[repr(C)]
|
||||
pub struct CompactedBlock(
|
||||
pub (
|
||||
pub (
|
||||
([u8; 4], u64),
|
||||
Vec<([u8; 4], Vec<([u8; 4], u32, u16, u64)>, Vec<u64>)>,
|
||||
),
|
||||
);
|
||||
|
||||
use std::io::{Read, Write};
|
||||
|
||||
impl CompactedBlock {
|
||||
pub fn from_full_block(block: &BitcoinBlockFullBreakdown) -> CompactedBlock {
|
||||
let mut txs = vec![];
|
||||
@@ -238,31 +263,108 @@ impl CompactedBlock {
|
||||
value
|
||||
}
|
||||
|
||||
pub fn to_hex_bytes(&self) -> String {
|
||||
let bytes = serde_cbor::to_vec(self).unwrap();
|
||||
let hex_bytes = hex_simd::encode_to_string(bytes, hex_simd::AsciiCase::Lower);
|
||||
hex_bytes
|
||||
pub fn from_cbor_bytes(bytes: &[u8]) -> CompactedBlock {
|
||||
serde_cbor::from_slice(&bytes[..]).unwrap()
|
||||
}
|
||||
|
||||
fn serialize<W: Write>(&self, fd: &mut W) -> std::io::Result<()> {
|
||||
fd.write_all(&self.0 .0 .0)?;
|
||||
fd.write(&self.0 .0 .1.to_be_bytes())?;
|
||||
fd.write(&self.0 .1.len().to_be_bytes())?;
|
||||
for (id, inputs, outputs) in self.0 .1.iter() {
|
||||
fd.write_all(id)?;
|
||||
fd.write(&inputs.len().to_be_bytes())?;
|
||||
for (txid, block, vout, value) in inputs.iter() {
|
||||
fd.write_all(txid)?;
|
||||
fd.write(&block.to_be_bytes())?;
|
||||
fd.write(&vout.to_be_bytes())?;
|
||||
fd.write(&value.to_be_bytes())?;
|
||||
}
|
||||
fd.write(&outputs.len().to_be_bytes())?;
|
||||
for value in outputs.iter() {
|
||||
fd.write(&value.to_be_bytes())?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: Read>(fd: &mut R) -> std::io::Result<CompactedBlock> {
|
||||
let mut ci = [0u8; 4];
|
||||
fd.read_exact(&mut ci)?;
|
||||
let mut cv = [0u8; 8];
|
||||
fd.read_exact(&mut cv)?;
|
||||
let mut tx_len = [0u8; 8];
|
||||
fd.read_exact(&mut tx_len)?;
|
||||
let mut txs = vec![];
|
||||
for _ in 0..usize::from_be_bytes(tx_len) {
|
||||
let mut txid = [0u8; 4];
|
||||
fd.read_exact(&mut txid)?;
|
||||
let mut inputs_len = [0u8; 8];
|
||||
fd.read_exact(&mut inputs_len)?;
|
||||
let mut inputs = vec![];
|
||||
for _ in 0..usize::from_be_bytes(inputs_len) {
|
||||
let mut txin = [0u8; 4];
|
||||
fd.read_exact(&mut txin)?;
|
||||
let mut block = [0u8; 4];
|
||||
fd.read_exact(&mut block)?;
|
||||
let mut vout = [0u8; 2];
|
||||
fd.read_exact(&mut vout)?;
|
||||
let mut value = [0u8; 8];
|
||||
fd.read_exact(&mut value)?;
|
||||
inputs.push((
|
||||
txin,
|
||||
u32::from_be_bytes(block),
|
||||
u16::from_be_bytes(vout),
|
||||
u64::from_be_bytes(value),
|
||||
))
|
||||
}
|
||||
let mut outputs_len = [0u8; 8];
|
||||
fd.read_exact(&mut outputs_len)?;
|
||||
let mut outputs = vec![];
|
||||
for _ in 0..usize::from_be_bytes(outputs_len) {
|
||||
let mut v = [0u8; 8];
|
||||
fd.read_exact(&mut v)?;
|
||||
outputs.push(u64::from_be_bytes(v))
|
||||
}
|
||||
txs.push((txid, inputs, outputs));
|
||||
}
|
||||
Ok(CompactedBlock(((ci, u64::from_be_bytes(cv)), txs)))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find_latest_compacted_block_known(hord_db_conn: &Connection) -> u32 {
|
||||
let args: &[&dyn ToSql] = &[];
|
||||
let mut stmt = match hord_db_conn.prepare("SELECT id FROM blocks ORDER BY id DESC LIMIT 1") {
|
||||
Ok(stmt) => stmt,
|
||||
Err(_) => return 0,
|
||||
};
|
||||
let mut rows = match stmt.query(args) {
|
||||
Ok(rows) => rows,
|
||||
Err(_) => return 0,
|
||||
};
|
||||
while let Ok(Some(row)) = rows.next() {
|
||||
let id: u32 = row.get(0).unwrap();
|
||||
return id;
|
||||
}
|
||||
0
|
||||
fn get_default_hord_db_file_path_rocks_db(base_dir: &PathBuf) -> PathBuf {
|
||||
let mut destination_path = base_dir.clone();
|
||||
destination_path.push("hord.rocksdb");
|
||||
destination_path
|
||||
}
|
||||
|
||||
pub fn find_compacted_block_at_block_height(
|
||||
pub fn open_readonly_hord_db_conn_rocks_db(
|
||||
base_dir: &PathBuf,
|
||||
_ctx: &Context,
|
||||
) -> Result<DB, String> {
|
||||
let path = get_default_hord_db_file_path_rocks_db(&base_dir);
|
||||
let mut opts = rocksdb::Options::default();
|
||||
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
|
||||
opts.set_max_open_files(1000);
|
||||
let db = DB::open_for_read_only(&opts, path, false).unwrap();
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
pub fn open_readwrite_hord_db_conn_rocks_db(
|
||||
base_dir: &PathBuf,
|
||||
_ctx: &Context,
|
||||
) -> Result<DB, String> {
|
||||
let path = get_default_hord_db_file_path_rocks_db(&base_dir);
|
||||
let mut opts = rocksdb::Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
|
||||
opts.set_max_open_files(1000);
|
||||
let db = DB::open(&opts, path).unwrap();
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
// Legacy - to remove after migrations
|
||||
pub fn find_block_at_block_height_sqlite(
|
||||
block_height: u32,
|
||||
hord_db_conn: &Connection,
|
||||
) -> Option<CompactedBlock> {
|
||||
@@ -283,6 +385,57 @@ pub fn find_compacted_block_at_block_height(
|
||||
return None;
|
||||
}
|
||||
|
||||
pub fn insert_entry_in_blocks(
|
||||
block_height: u32,
|
||||
compacted_block: &CompactedBlock,
|
||||
blocks_db_rw: &DB,
|
||||
_ctx: &Context,
|
||||
) {
|
||||
let mut bytes = vec![];
|
||||
let _ = compacted_block.serialize(&mut bytes);
|
||||
let block_height_bytes = block_height.to_be_bytes();
|
||||
blocks_db_rw
|
||||
.put(&block_height_bytes, bytes)
|
||||
.expect("unable to insert blocks");
|
||||
blocks_db_rw
|
||||
.put(b"metadata::last_insert", block_height_bytes)
|
||||
.expect("unable to insert metadata");
|
||||
}
|
||||
|
||||
pub fn find_last_block_inserted(blocks_db: &DB) -> u32 {
|
||||
match blocks_db.get(b"metadata::last_insert") {
|
||||
Ok(Some(bytes)) => u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find_block_at_block_height(block_height: u32, blocks_db: &DB) -> Option<CompactedBlock> {
|
||||
match blocks_db.get(block_height.to_be_bytes()) {
|
||||
Ok(Some(ref res)) => {
|
||||
let res = CompactedBlock::deserialize(&mut std::io::Cursor::new(&res)).unwrap();
|
||||
Some(res)
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_entry_from_blocks(block_height: u32, blocks_db_rw: &DB, ctx: &Context) {
|
||||
if let Err(e) = blocks_db_rw.delete(block_height.to_be_bytes()) {
|
||||
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_blocks_in_block_range(
|
||||
start_block: u32,
|
||||
end_block: u32,
|
||||
blocks_db_rw: &DB,
|
||||
ctx: &Context,
|
||||
) {
|
||||
for block_height in start_block..=end_block {
|
||||
remove_entry_from_blocks(block_height, blocks_db_rw, ctx);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn store_new_inscription(
|
||||
inscription_data: &OrdinalInscriptionRevealData,
|
||||
block_identifier: &BlockIdentifier,
|
||||
@@ -301,10 +454,10 @@ pub fn update_transfered_inscription(
|
||||
inscription_id: &str,
|
||||
outpoint_post_transfer: &str,
|
||||
offset: u64,
|
||||
hord_db_conn: &Connection,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
if let Err(e) = hord_db_conn.execute(
|
||||
if let Err(e) = inscriptions_db_conn_rw.execute(
|
||||
"UPDATE inscriptions SET outpoint_to_watch = ?, offset = ? WHERE inscription_id = ?",
|
||||
rusqlite::params![&outpoint_post_transfer, &offset, &inscription_id],
|
||||
) {
|
||||
@@ -315,10 +468,10 @@ pub fn update_transfered_inscription(
|
||||
pub fn patch_inscription_number(
|
||||
inscription_id: &str,
|
||||
inscription_number: u64,
|
||||
hord_db_conn: &Connection,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
if let Err(e) = hord_db_conn.execute(
|
||||
if let Err(e) = inscriptions_db_conn_rw.execute(
|
||||
"UPDATE inscriptions SET inscription_number = ? WHERE inscription_id = ?",
|
||||
rusqlite::params![&inscription_number, &inscription_id],
|
||||
) {
|
||||
@@ -327,11 +480,11 @@ pub fn patch_inscription_number(
|
||||
}
|
||||
|
||||
pub fn find_latest_inscription_block_height(
|
||||
hord_db_conn: &Connection,
|
||||
inscriptions_db_conn: &Connection,
|
||||
_ctx: &Context,
|
||||
) -> Result<Option<u64>, String> {
|
||||
let args: &[&dyn ToSql] = &[];
|
||||
let mut stmt = hord_db_conn
|
||||
let mut stmt = inscriptions_db_conn
|
||||
.prepare("SELECT block_height FROM inscriptions ORDER BY block_height DESC LIMIT 1")
|
||||
.unwrap();
|
||||
let mut rows = stmt.query(args).unwrap();
|
||||
@@ -343,11 +496,11 @@ pub fn find_latest_inscription_block_height(
|
||||
}
|
||||
|
||||
pub fn find_latest_inscription_number(
|
||||
hord_db_conn: &Connection,
|
||||
inscriptions_db_conn: &Connection,
|
||||
_ctx: &Context,
|
||||
) -> Result<u64, String> {
|
||||
let args: &[&dyn ToSql] = &[];
|
||||
let mut stmt = hord_db_conn
|
||||
let mut stmt = inscriptions_db_conn
|
||||
.prepare(
|
||||
"SELECT inscription_number FROM inscriptions ORDER BY inscription_number DESC LIMIT 1",
|
||||
)
|
||||
@@ -362,11 +515,11 @@ pub fn find_latest_inscription_number(
|
||||
|
||||
pub fn find_inscription_with_ordinal_number(
|
||||
ordinal_number: &u64,
|
||||
hord_db_conn: &Connection,
|
||||
inscriptions_db_conn: &Connection,
|
||||
_ctx: &Context,
|
||||
) -> Option<String> {
|
||||
let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()];
|
||||
let mut stmt = hord_db_conn
|
||||
let mut stmt = inscriptions_db_conn
|
||||
.prepare("SELECT inscription_id FROM inscriptions WHERE ordinal_number = ?")
|
||||
.unwrap();
|
||||
let mut rows = stmt.query(args).unwrap();
|
||||
@@ -378,10 +531,10 @@ pub fn find_inscription_with_ordinal_number(
|
||||
}
|
||||
|
||||
pub fn find_all_inscriptions(
|
||||
hord_db_conn: &Connection,
|
||||
inscriptions_db_conn: &Connection,
|
||||
) -> BTreeMap<u64, Vec<(TransactionIdentifier, TraversalResult)>> {
|
||||
let args: &[&dyn ToSql] = &[];
|
||||
let mut stmt = hord_db_conn
|
||||
let mut stmt = inscriptions_db_conn
|
||||
.prepare("SELECT inscription_number, ordinal_number, block_height, inscription_id FROM inscriptions ORDER BY inscription_number ASC")
|
||||
.unwrap();
|
||||
let mut results: BTreeMap<u64, Vec<(TransactionIdentifier, TraversalResult)>> = BTreeMap::new();
|
||||
@@ -444,52 +597,13 @@ pub fn find_inscriptions_at_wached_outpoint(
|
||||
return Ok(results);
|
||||
}
|
||||
|
||||
pub fn insert_entry_in_blocks(
|
||||
block_id: u32,
|
||||
compacted_block: &CompactedBlock,
|
||||
hord_db_conn: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
let serialized_compacted_block = compacted_block.to_hex_bytes();
|
||||
|
||||
if let Err(e) = hord_db_conn.execute(
|
||||
"INSERT INTO blocks (id, compacted_bytes) VALUES (?1, ?2)",
|
||||
rusqlite::params![&block_id, &serialized_compacted_block],
|
||||
) {
|
||||
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_entry_from_blocks(block_id: u32, hord_db_conn: &Connection, ctx: &Context) {
|
||||
if let Err(e) = hord_db_conn.execute(
|
||||
"DELETE FROM blocks WHERE id = ?1",
|
||||
rusqlite::params![&block_id],
|
||||
) {
|
||||
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_blocks_in_block_range(
|
||||
start_block: u32,
|
||||
end_block: u32,
|
||||
rw_hord_db_conn: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
if let Err(e) = rw_hord_db_conn.execute(
|
||||
"DELETE FROM blocks WHERE id >= ?1 AND id <= ?2",
|
||||
rusqlite::params![&start_block, &end_block],
|
||||
) {
|
||||
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_inscriptions_in_block_range(
|
||||
start_block: u32,
|
||||
end_block: u32,
|
||||
rw_hord_db_conn: &Connection,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
if let Err(e) = rw_hord_db_conn.execute(
|
||||
if let Err(e) = inscriptions_db_conn_rw.execute(
|
||||
"DELETE FROM inscriptions WHERE block_height >= ?1 AND block_height <= ?2",
|
||||
rusqlite::params![&start_block, &end_block],
|
||||
) {
|
||||
@@ -499,10 +613,10 @@ pub fn delete_inscriptions_in_block_range(
|
||||
|
||||
pub fn remove_entry_from_inscriptions(
|
||||
inscription_id: &str,
|
||||
hord_db_conn: &Connection,
|
||||
inscriptions_db_rw_conn: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
if let Err(e) = hord_db_conn.execute(
|
||||
if let Err(e) = inscriptions_db_rw_conn.execute(
|
||||
"DELETE FROM inscriptions WHERE inscription_id = ?1",
|
||||
rusqlite::params![&inscription_id],
|
||||
) {
|
||||
@@ -513,17 +627,24 @@ pub fn remove_entry_from_inscriptions(
|
||||
pub fn delete_data_in_hord_db(
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
rw_hord_db_conn: &Connection,
|
||||
blocks_db_rw: &DB,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
delete_blocks_in_block_range(start_block as u32, end_block as u32, rw_hord_db_conn, &ctx);
|
||||
delete_inscriptions_in_block_range(start_block as u32, end_block as u32, rw_hord_db_conn, &ctx);
|
||||
delete_blocks_in_block_range(start_block as u32, end_block as u32, blocks_db_rw, &ctx);
|
||||
delete_inscriptions_in_block_range(
|
||||
start_block as u32,
|
||||
end_block as u32,
|
||||
inscriptions_db_conn_rw,
|
||||
&ctx,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
rw_hord_db_conn: &Connection,
|
||||
blocks_db_rw: &DB,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
network_thread: usize,
|
||||
@@ -532,11 +653,11 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
) -> Result<(), String> {
|
||||
let number_of_blocks_to_process = end_block - start_block + 1;
|
||||
let retrieve_block_hash_pool = ThreadPool::new(network_thread);
|
||||
let (block_hash_tx, block_hash_rx) = crossbeam_channel::bounded(128);
|
||||
let (block_hash_tx, block_hash_rx) = crossbeam_channel::bounded(256);
|
||||
let retrieve_block_data_pool = ThreadPool::new(network_thread);
|
||||
let (block_data_tx, block_data_rx) = crossbeam_channel::bounded(64);
|
||||
let (block_data_tx, block_data_rx) = crossbeam_channel::bounded(256);
|
||||
let compress_block_data_pool = ThreadPool::new(16);
|
||||
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(32);
|
||||
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(512);
|
||||
|
||||
// Thread pool #1: given a block height, retrieve the block hash
|
||||
for block_cursor in start_block..=end_block {
|
||||
@@ -565,11 +686,8 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
retrieve_block_data_pool.execute(move || {
|
||||
moved_ctx
|
||||
.try_log(|logger| slog::debug!(logger, "Fetching block #{block_height}"));
|
||||
let future = retrieve_full_block_breakdown_with_retry(
|
||||
&block_hash,
|
||||
&moved_bitcoin_config,
|
||||
&moved_ctx,
|
||||
);
|
||||
let future =
|
||||
download_block_with_retry(&block_hash, &moved_bitcoin_config, &moved_ctx);
|
||||
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
|
||||
let _ = block_data_tx.send(Some(block_data));
|
||||
});
|
||||
@@ -581,9 +699,10 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Block data compression")
|
||||
.spawn(move || {
|
||||
while let Ok(Some(block_data)) = block_data_rx.recv() {
|
||||
while let Ok(Some(downloaded_block)) = block_data_rx.recv() {
|
||||
let block_compressed_tx_moved = block_compressed_tx.clone();
|
||||
compress_block_data_pool.execute(move || {
|
||||
let block_data = parse_downloaded_block(downloaded_block).unwrap();
|
||||
let compressed_block = CompactedBlock::from_full_block(&block_data);
|
||||
let block_index = block_data.height as u32;
|
||||
let _ = block_compressed_tx_moved.send(Some((
|
||||
@@ -604,25 +723,15 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
let mut inbox = HashMap::new();
|
||||
|
||||
while let Ok(Some((block_height, compacted_block, raw_block))) = block_compressed_rx.recv() {
|
||||
insert_entry_in_blocks(block_height, &compacted_block, &rw_hord_db_conn, &ctx);
|
||||
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &ctx);
|
||||
blocks_stored += 1;
|
||||
|
||||
// println!("{} < {}", raw_block.height, cursor);
|
||||
// Early return, only considering blocks after 1st inscription
|
||||
// if raw_block.height < cursor {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// let block_height = raw_block.height;
|
||||
inbox.insert(raw_block.height, raw_block);
|
||||
|
||||
// In the context of ordinals, we're constrained to process blocks sequentially
|
||||
// Blocks are processed by a threadpool and could be coming out of order.
|
||||
// Inbox block for later if the current block is not the one we should be
|
||||
// processing.
|
||||
// if block_height != cursor {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// Is the action of processing a block allows us
|
||||
// to process more blocks present in the inbox?
|
||||
@@ -641,7 +750,8 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
&mut new_block,
|
||||
&rw_hord_db_conn,
|
||||
blocks_db_rw,
|
||||
&inscriptions_db_conn_rw,
|
||||
false,
|
||||
&hord_db_path,
|
||||
&ctx,
|
||||
@@ -692,7 +802,7 @@ impl TraversalResult {
|
||||
}
|
||||
|
||||
pub fn retrieve_satoshi_point_using_local_storage(
|
||||
hord_db_conn: &Connection,
|
||||
blocks_db: &DB,
|
||||
block_identifier: &BlockIdentifier,
|
||||
transaction_identifier: &TransactionIdentifier,
|
||||
inscription_number: u64,
|
||||
@@ -717,18 +827,17 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
let mut hops: u32 = 0;
|
||||
loop {
|
||||
hops += 1;
|
||||
let res = match find_compacted_block_at_block_height(ordinal_block_number, &hord_db_conn) {
|
||||
let res = match find_block_at_block_height(ordinal_block_number, &blocks_db) {
|
||||
Some(res) => res,
|
||||
None => {
|
||||
return Err(format!("block #{ordinal_block_number} not in database"));
|
||||
}
|
||||
};
|
||||
|
||||
let coinbase_txid = &res.0 .0 .0;
|
||||
let txid = tx_cursor.0;
|
||||
|
||||
// ctx.try_log(|logger| {
|
||||
// slog::debug!(
|
||||
// slog::info!(
|
||||
// logger,
|
||||
// "{ordinal_block_number}:{:?}:{:?}",
|
||||
// hex::encode(&coinbase_txid),
|
||||
@@ -737,7 +846,6 @@ pub fn retrieve_satoshi_point_using_local_storage(
|
||||
// });
|
||||
|
||||
// to remove
|
||||
//std::thread::sleep(std::time::Duration::from_millis(300));
|
||||
|
||||
// evaluate exit condition: did we reach the **final** coinbase transaction
|
||||
if coinbase_txid.eq(&txid) {
|
||||
|
||||
@@ -8,6 +8,7 @@ use chainhook_types::{
|
||||
BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
|
||||
};
|
||||
use hiro_system_kit::slog;
|
||||
use rocksdb::DB;
|
||||
use rusqlite::Connection;
|
||||
use std::collections::{BTreeMap, HashMap, VecDeque};
|
||||
use std::path::PathBuf;
|
||||
@@ -28,17 +29,18 @@ use crate::{
|
||||
};
|
||||
|
||||
use self::db::{
|
||||
open_readonly_hord_db_conn, remove_entry_from_blocks, remove_entry_from_inscriptions,
|
||||
open_readonly_hord_db_conn_rocks_db, remove_entry_from_blocks, remove_entry_from_inscriptions,
|
||||
TraversalResult, WatchedSatpoint,
|
||||
};
|
||||
|
||||
pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
block: &BitcoinBlockData,
|
||||
rw_hord_db_conn: &Connection,
|
||||
blocks_db_rw: &DB,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
// Remove block from
|
||||
remove_entry_from_blocks(block.block_identifier.index as u32, &rw_hord_db_conn, ctx);
|
||||
remove_entry_from_blocks(block.block_identifier.index as u32, &blocks_db_rw, ctx);
|
||||
for tx_index in 1..=block.transactions.len() {
|
||||
// Undo the changes in reverse order
|
||||
let tx = &block.transactions[block.transactions.len() - tx_index];
|
||||
@@ -46,7 +48,11 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
match ordinal_event {
|
||||
OrdinalOperation::InscriptionRevealed(data) => {
|
||||
// We remove any new inscription created
|
||||
remove_entry_from_inscriptions(&data.inscription_id, &rw_hord_db_conn, ctx);
|
||||
remove_entry_from_inscriptions(
|
||||
&data.inscription_id,
|
||||
&inscriptions_db_conn_rw,
|
||||
ctx,
|
||||
);
|
||||
}
|
||||
OrdinalOperation::InscriptionTransferred(data) => {
|
||||
// We revert the outpoint to the pre-transfer value
|
||||
@@ -59,7 +65,7 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
&&data.inscription_id,
|
||||
&outpoint_pre_transfer,
|
||||
offset_pre_transfer,
|
||||
&rw_hord_db_conn,
|
||||
&inscriptions_db_conn_rw,
|
||||
&ctx,
|
||||
);
|
||||
}
|
||||
@@ -71,7 +77,8 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
|
||||
pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
new_block: &mut BitcoinBlockData,
|
||||
rw_hord_db_conn: &Connection,
|
||||
blocks_db_rw: &DB,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
write_block: bool,
|
||||
hord_db_path: &PathBuf,
|
||||
ctx: &Context,
|
||||
@@ -89,7 +96,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
insert_entry_in_blocks(
|
||||
new_block.block_identifier.index as u32,
|
||||
&compacted_block,
|
||||
&rw_hord_db_conn,
|
||||
&blocks_db_rw,
|
||||
&ctx,
|
||||
);
|
||||
}
|
||||
@@ -114,11 +121,10 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
let moved_traversal_tx = traversal_tx.clone();
|
||||
let moved_ctx = ctx.clone();
|
||||
let block_identifier = new_block.block_identifier.clone();
|
||||
let hord_db_path = hord_db_path.clone();
|
||||
let blocks_db = open_readonly_hord_db_conn_rocks_db(hord_db_path, &ctx)?;
|
||||
traversal_data_pool.execute(move || {
|
||||
let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &moved_ctx).unwrap();
|
||||
let traversal = retrieve_satoshi_point_using_local_storage(
|
||||
&hord_db_conn,
|
||||
&blocks_db,
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
0,
|
||||
@@ -139,19 +145,19 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
}
|
||||
}
|
||||
|
||||
let mut storage = Storage::Sqlite(rw_hord_db_conn);
|
||||
let mut storage = Storage::Sqlite(inscriptions_db_conn_rw);
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
|
||||
new_block,
|
||||
&mut storage,
|
||||
&traversals,
|
||||
rw_hord_db_conn,
|
||||
&inscriptions_db_conn_rw,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
// Have inscriptions been transfered?
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data(
|
||||
new_block,
|
||||
&mut Storage::Sqlite(rw_hord_db_conn),
|
||||
&mut storage,
|
||||
&ctx,
|
||||
)?;
|
||||
Ok(())
|
||||
@@ -167,7 +173,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
|
||||
block: &mut BitcoinBlockData,
|
||||
storage: &mut Storage,
|
||||
traversals: &HashMap<TransactionIdentifier, TraversalResult>,
|
||||
hord_db_conn: &Connection,
|
||||
inscription_db_conn: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
for new_tx in block.transactions.iter_mut().skip(1) {
|
||||
@@ -196,7 +202,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
|
||||
Storage::Sqlite(rw_hord_db_conn) => {
|
||||
if let Some(_entry) = find_inscription_with_ordinal_number(
|
||||
&traversal.ordinal_number,
|
||||
&hord_db_conn,
|
||||
&inscription_db_conn,
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
@@ -211,7 +217,7 @@ pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
|
||||
ordinals_events_indexes_to_discard.push_front(ordinal_event_index);
|
||||
} else {
|
||||
inscription.inscription_number =
|
||||
match find_latest_inscription_number(&hord_db_conn, &ctx) {
|
||||
match find_latest_inscription_number(&inscription_db_conn, &ctx) {
|
||||
Ok(inscription_number) => inscription_number + 1,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
|
||||
@@ -148,14 +148,14 @@ pub struct RewardParticipant {
|
||||
amt: u64,
|
||||
}
|
||||
|
||||
pub async fn retrieve_full_block_breakdown_with_retry(
|
||||
pub async fn download_and_parse_block_with_retry(
|
||||
block_hash: &str,
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<BitcoinBlockFullBreakdown, String> {
|
||||
let mut errors_count = 0;
|
||||
let block = loop {
|
||||
match retrieve_full_block_breakdown(block_hash, bitcoin_config, ctx).await {
|
||||
match download_and_parse_block(block_hash, bitcoin_config, ctx).await {
|
||||
Ok(result) => break result,
|
||||
Err(e) => {
|
||||
errors_count += 1;
|
||||
@@ -170,6 +170,28 @@ pub async fn retrieve_full_block_breakdown_with_retry(
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
pub async fn download_block_with_retry(
|
||||
block_hash: &str,
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<Vec<u8>, String> {
|
||||
let mut errors_count = 0;
|
||||
let block = loop {
|
||||
match download_block(block_hash, bitcoin_config, ctx).await {
|
||||
Ok(result) => break result,
|
||||
Err(e) => {
|
||||
errors_count += 1;
|
||||
error!(
|
||||
"unable to retrieve block #{block_hash} (attempt #{errors_count}): {}",
|
||||
e.to_string()
|
||||
);
|
||||
std::thread::sleep(std::time::Duration::from_millis(500));
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
pub async fn retrieve_block_hash_with_retry(
|
||||
block_height: &u64,
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
@@ -192,11 +214,11 @@ pub async fn retrieve_block_hash_with_retry(
|
||||
Ok(block_hash)
|
||||
}
|
||||
|
||||
pub async fn retrieve_full_block_breakdown(
|
||||
pub async fn download_block(
|
||||
block_hash: &str,
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
_ctx: &Context,
|
||||
) -> Result<BitcoinBlockFullBreakdown, String> {
|
||||
) -> Result<Vec<u8>, String> {
|
||||
use reqwest::Client as HttpClient;
|
||||
let body = json!({
|
||||
"jsonrpc": "1.0",
|
||||
@@ -217,15 +239,32 @@ pub async fn retrieve_full_block_breakdown(
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("unable to send request ({})", e))?
|
||||
.json::<bitcoincore_rpc::jsonrpc::Response>()
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|e| format!("unable to parse response ({})", e))?
|
||||
.result::<BitcoinBlockFullBreakdown>()
|
||||
.map_err(|e| format!("unable to parse response ({})", e))?;
|
||||
|
||||
.map_err(|e| format!("unable to get bytes ({})", e))?
|
||||
.to_vec();
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
pub fn parse_downloaded_block(
|
||||
downloaded_block: Vec<u8>,
|
||||
) -> Result<BitcoinBlockFullBreakdown, String> {
|
||||
let block = serde_json::from_slice::<bitcoincore_rpc::jsonrpc::Response>(&downloaded_block[..])
|
||||
.map_err(|e| format!("unable to parse jsonrpc payload ({})", e))?
|
||||
.result::<BitcoinBlockFullBreakdown>()
|
||||
.map_err(|e| format!("unable to parse block ({})", e))?;
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
pub async fn download_and_parse_block(
|
||||
block_hash: &str,
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
_ctx: &Context,
|
||||
) -> Result<BitcoinBlockFullBreakdown, String> {
|
||||
let response = download_block(block_hash, bitcoin_config, _ctx).await?;
|
||||
parse_downloaded_block(response)
|
||||
}
|
||||
|
||||
pub async fn retrieve_block_hash(
|
||||
block_height: &u64,
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
@@ -580,8 +619,17 @@ fn try_parse_stacks_operation(
|
||||
Err(_) => None,
|
||||
};
|
||||
}
|
||||
|
||||
// let mining_address_pre_commit = match inputs[0].script_sig {
|
||||
// Some(script) => match script.script() {
|
||||
|
||||
// }
|
||||
|
||||
// }
|
||||
// mining_address_post_commit = match inputs.first().and_then(|i| i.script_sig).and_then(|s| s.script()).script_pub_key.script() {
|
||||
// Ok(script) => Address::from_script(&script, bitcoin::Network::Bitcoin).and_then(|a| Ok(a.to_string())).ok(),
|
||||
// Err(_) => None
|
||||
// };
|
||||
|
||||
let pox_cycle_index = pox_config.get_pox_cycle_id(block_height);
|
||||
let pox_cycle_length = pox_config.get_pox_cycle_len();
|
||||
|
||||
@@ -10,12 +10,12 @@ use crate::chainhooks::types::{
|
||||
ChainhookConfig, ChainhookFullSpecification, ChainhookSpecification,
|
||||
};
|
||||
|
||||
use crate::hord::db::open_readwrite_hord_db_conn;
|
||||
use crate::hord::db::{open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db};
|
||||
use crate::hord::{
|
||||
revert_hord_db_with_augmented_bitcoin_block, update_hord_db_and_augment_bitcoin_block,
|
||||
};
|
||||
use crate::indexer::bitcoin::{
|
||||
retrieve_full_block_breakdown_with_retry, standardize_bitcoin_block, BitcoinBlockFullBreakdown,
|
||||
download_and_parse_block_with_retry, standardize_bitcoin_block, BitcoinBlockFullBreakdown,
|
||||
NewBitcoinBlock,
|
||||
};
|
||||
use crate::indexer::fork_scratch_pad::ForkScratchPad;
|
||||
@@ -452,7 +452,7 @@ pub async fn start_event_observer(
|
||||
};
|
||||
let block_hash = hex::encode(message.get(1).unwrap().to_vec());
|
||||
|
||||
let block = match retrieve_full_block_breakdown_with_retry(
|
||||
let block = match download_and_parse_block_with_retry(
|
||||
&block_hash,
|
||||
&bitcoin_config,
|
||||
&ctx_moved,
|
||||
@@ -464,7 +464,7 @@ pub async fn start_event_observer(
|
||||
ctx_moved.try_log(|logger| {
|
||||
slog::warn!(
|
||||
logger,
|
||||
"unable to retrieve_full_block_breakdown: {}",
|
||||
"unable to download_and_parse_block: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
@@ -557,7 +557,7 @@ pub fn gather_proofs<'a>(
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
logger,
|
||||
"collecting proof for transaction {}",
|
||||
"Collecting proof for transaction {}",
|
||||
transaction.transaction_identifier.hash
|
||||
)
|
||||
});
|
||||
@@ -646,7 +646,27 @@ pub async fn start_observer_commands_handler(
|
||||
BlockchainEvent::BlockchainUpdatedWithHeaders(data) => {
|
||||
let mut new_blocks = vec![];
|
||||
let mut confirmed_blocks = vec![];
|
||||
let rw_hord_db_conn =
|
||||
let blocks_db = match open_readwrite_hord_db_conn_rocks_db(
|
||||
&config.get_cache_path_buf(),
|
||||
&ctx,
|
||||
) {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
if let Some(ref tx) = observer_events_tx {
|
||||
let _ = tx.send(ObserverEvent::Error(format!(
|
||||
"Channel error: {:?}",
|
||||
e
|
||||
)));
|
||||
} else {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(logger, "Unable to open readwtite connection",)
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let inscriptions_db_conn_rw =
|
||||
match open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx) {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
@@ -666,12 +686,14 @@ pub async fn start_observer_commands_handler(
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for header in data.new_headers.iter() {
|
||||
match bitcoin_block_store.get_mut(&header.block_identifier) {
|
||||
Some(block) => {
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
block,
|
||||
&rw_hord_db_conn,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
true,
|
||||
&config.get_cache_path_buf(),
|
||||
&ctx,
|
||||
@@ -726,7 +748,27 @@ pub async fn start_observer_commands_handler(
|
||||
let mut blocks_to_rollback = vec![];
|
||||
let mut confirmed_blocks = vec![];
|
||||
|
||||
let rw_hord_db_conn =
|
||||
let blocks_db = match open_readwrite_hord_db_conn_rocks_db(
|
||||
&config.get_cache_path_buf(),
|
||||
&ctx,
|
||||
) {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
if let Some(ref tx) = observer_events_tx {
|
||||
let _ = tx.send(ObserverEvent::Error(format!(
|
||||
"Channel error: {:?}",
|
||||
e
|
||||
)));
|
||||
} else {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(logger, "Unable to open readwtite connection",)
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let inscriptions_db_conn_rw =
|
||||
match open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx) {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
@@ -752,7 +794,8 @@ pub async fn start_observer_commands_handler(
|
||||
Some(block) => {
|
||||
if let Err(e) = revert_hord_db_with_augmented_bitcoin_block(
|
||||
block,
|
||||
&rw_hord_db_conn,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
@@ -782,7 +825,8 @@ pub async fn start_observer_commands_handler(
|
||||
Some(block) => {
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
block,
|
||||
&rw_hord_db_conn,
|
||||
&blocks_db,
|
||||
&inscriptions_db_conn_rw,
|
||||
true,
|
||||
&config.get_cache_path_buf(),
|
||||
&ctx,
|
||||
@@ -1298,23 +1342,22 @@ pub async fn handle_new_bitcoin_block(
|
||||
// into account the last 7 blocks.
|
||||
|
||||
let block_hash = bitcoin_block.burn_block_hash.strip_prefix("0x").unwrap();
|
||||
let block =
|
||||
match retrieve_full_block_breakdown_with_retry(block_hash, bitcoin_config, ctx).await {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::warn!(
|
||||
logger,
|
||||
"unable to retrieve_full_block_breakdown: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
return Json(json!({
|
||||
"status": 500,
|
||||
"result": "unable to retrieve_full_block",
|
||||
}));
|
||||
}
|
||||
};
|
||||
let block = match download_and_parse_block_with_retry(block_hash, bitcoin_config, ctx).await {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::warn!(
|
||||
logger,
|
||||
"unable to download_and_parse_block: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
return Json(json!({
|
||||
"status": 500,
|
||||
"result": "unable to retrieve_full_block",
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
let header = block.get_block_header();
|
||||
match background_job_tx.lock() {
|
||||
|
||||
Reference in New Issue
Block a user