feat: improve transfers handling

Release 2.1.0
This commit is contained in:
Ludo Galabru
2024-02-12 10:55:29 -05:00
committed by GitHub
30 changed files with 1271 additions and 895 deletions

View File

@@ -109,6 +109,8 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
file: ./dockerfiles/components/ordhook.dockerfile
build-args: |
GIT_COMMIT=${{ env.GITHUB_SHA_SHORT }}
cache-from: type=gha
cache-to: type=gha,mode=max
# Only push if (there's a new release on main branch, or if building a non-main branch) and (Only run on non-PR events or only PRs that aren't from forks)

101
Cargo.lock generated
View File

@@ -259,6 +259,29 @@ dependencies = [
"syn 2.0.41",
]
[[package]]
name = "bindgen"
version = "0.68.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "726e4313eb6ec35d2730258ad4e15b547ee75d6afaa1361a922e78e59b7d8078"
dependencies = [
"bitflags 2.4.1",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"log",
"peeking_take_while",
"prettyplease",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 2.0.41",
"which",
]
[[package]]
name = "bitcoin"
version = "0.31.0"
@@ -454,12 +477,12 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chainhook-sdk"
version = "0.12.0"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4689cd74c5a15903b5e078b1c8247469dadbb835475793c1cd9cc93da24c5acd"
checksum = "a9f1d697633a4b8394f185f41634faaf555ba04f2881ac19e1db4b1bd0130155"
dependencies = [
"base58 0.2.0",
"base64 0.13.1",
"base64 0.21.5",
"bitcoincore-rpc",
"bitcoincore-rpc-json",
"chainhook-types",
@@ -489,9 +512,9 @@ dependencies = [
[[package]]
name = "chainhook-types"
version = "1.3.0"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03f5e0a721da223301c8043cdb9e54dae7731273584efda2ade15e0ea46516d4"
checksum = "44b67edc1e9b87382a973d203eada222554a774d9fae55e7d40fb2accb372716"
dependencies = [
"hex",
"schemars 0.8.16",
@@ -515,6 +538,33 @@ dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "ciborium"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "effd91f6c78e5a4ace8a5d3c0b6bfaec9e2baaef55f3efc00e45fb2e477ee926"
dependencies = [
"ciborium-io",
"ciborium-ll",
"serde",
]
[[package]]
name = "ciborium-io"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdf919175532b369853f5d5e20b26b43112613fd6fe7aee757e35f7a44642656"
[[package]]
name = "ciborium-ll"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b"
dependencies = [
"ciborium-io",
"half",
]
[[package]]
name = "clang-sys"
version = "1.6.1"
@@ -1344,6 +1394,12 @@ dependencies = [
"tracing",
]
[[package]]
name = "half"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "hashbrown"
version = "0.11.2"
@@ -1504,6 +1560,15 @@ dependencies = [
"hmac 0.8.1",
]
[[package]]
name = "home"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "http"
version = "0.2.11"
@@ -1795,7 +1860,7 @@ version = "0.11.0+8.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e"
dependencies = [
"bindgen",
"bindgen 0.65.1",
"bzip2-sys",
"cc",
"glob",
@@ -2236,6 +2301,7 @@ dependencies = [
"anyhow",
"atty",
"chainhook-sdk",
"ciborium",
"crossbeam-channel",
"dashmap",
"flate2",
@@ -2280,6 +2346,7 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"tcmalloc2",
"toml 0.5.11",
]
@@ -3804,6 +3871,16 @@ version = "0.12.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c39fd04924ca3a864207c66fc2cd7d22d7c016007f9ce846cbb9326331930a"
[[package]]
name = "tcmalloc2"
version = "0.1.2+2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5af9a4fe36969c3ae05e5ba829b191d671fc1bb6569e0245f0b57970da9f04a5"
dependencies = [
"bindgen 0.68.1",
"num_cpus",
]
[[package]]
name = "tempfile"
version = "3.8.1"
@@ -4402,6 +4479,18 @@ version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10"
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@@ -24,9 +24,11 @@ clap = { version = "3.2.23", features = ["derive"], optional = true }
clap_generate = { version = "3.0.3", optional = true }
toml = { version = "0.5.6", features = ["preserve_order"], optional = true }
ctrlc = { version = "3.2.2", optional = true }
tcmalloc2 = { version = "0.1.2+2.13", optional = true }
[features]
default = ["cli"]
cli = ["clap", "clap_generate", "toml", "ctrlc", "hiro-system-kit/log"]
debug = ["hiro-system-kit/debug"]
release = ["hiro-system-kit/release"]
tcmalloc = ["tcmalloc2"]

View File

@@ -0,0 +1,29 @@
use std::process::Command;
fn current_git_hash() -> Option<String> {
if option_env!("GIT_COMMIT") == None {
let commit = Command::new("git")
.arg("log")
.arg("-1")
.arg("--pretty=format:%h") // Abbreviated commit hash
.current_dir(env!("CARGO_MANIFEST_DIR"))
.output();
if let Ok(commit) = commit {
if let Ok(commit) = String::from_utf8(commit.stdout) {
return Some(commit);
}
}
} else {
return option_env!("GIT_COMMIT").map(String::from);
}
None
}
fn main() {
// note: add error checking yourself.
if let Some(git) = current_git_hash() {
println!("cargo:rustc-env=GIT_COMMIT={}", git);
}
}

View File

@@ -32,6 +32,7 @@ use ordhook::db::{
use ordhook::download::download_ordinals_dataset_if_required;
use ordhook::hex;
use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use ordhook::service::observers::initialize_observers_db;
use ordhook::service::{start_observer_forwarding, Service};
use reqwest::Client as HttpClient;
use std::io::{BufReader, Read};
@@ -163,6 +164,8 @@ struct ScanTransactionCommand {
pub block_height: u64,
/// Inscription Id
pub transaction_id: String,
/// Input index
pub input_index: usize,
/// Target Regtest network
#[clap(
long = "regtest",
@@ -342,6 +345,9 @@ struct StartCommand {
/// Check blocks integrity
#[clap(long = "check-blocks-integrity")]
pub block_integrity_check: bool,
/// Stream indexing to observers
#[clap(long = "stream-indexing")]
pub stream_indexing_to_observers: bool,
}
#[derive(Subcommand, PartialEq, Clone, Debug)]
@@ -558,6 +564,9 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
None,
cmd.auth_token,
)?;
let _ = initialize_observers_db(&config.expected_cache_path(), ctx);
scan_bitcoin_chainstate_via_rpc_using_predicate(
&predicate_spec,
&config,
@@ -575,15 +584,14 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
while let Some(block_height) = block_range.pop_front() {
let inscriptions =
find_all_inscriptions_in_block(&block_height, &inscriptions_db_conn, ctx);
let mut locations =
let locations =
find_all_transfers_in_block(&block_height, &inscriptions_db_conn, ctx);
let mut total_transfers_in_block = 0;
for (_, inscription) in inscriptions.iter() {
println!("Inscription {} revealed at block #{} (inscription_number {}, ordinal_number {})", inscription.get_inscription_id(), block_height, inscription.inscription_number.jubilee, inscription.ordinal_number);
if let Some(transfers) = locations.remove(&inscription.get_inscription_id())
{
if let Some(transfers) = locations.get(&inscription.ordinal_number) {
for t in transfers.iter().skip(1) {
total_transfers_in_block += 1;
println!(
@@ -672,18 +680,21 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
.await?;
let transaction_identifier = TransactionIdentifier::new(&cmd.transaction_id);
let cache = new_traversals_lazy_cache(100);
let (res, mut back_trace) = compute_satoshi_number(
let (res, _, mut back_trace) = compute_satoshi_number(
&config.get_ordhook_config().db_path,
&block.block_identifier,
&transaction_identifier,
cmd.input_index,
0,
&Arc::new(cache),
config.resources.ulimit,
config.resources.memory_available,
true,
ctx,
)?;
back_trace.reverse();
for (block_height, tx) in back_trace.iter() {
println!("{}\t{}", block_height, hex::encode(tx));
for (block_height, tx, index) in back_trace.iter() {
println!("{}\t{}:{}", block_height, hex::encode(tx), index);
}
println!("{:?}", res);
}
@@ -707,12 +718,21 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let last_known_block =
find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?;
if last_known_block.is_none() {
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
ctx,
);
}
let ordhook_config = config.get_ordhook_config();
info!(ctx.expect_logger(), "Starting service...",);
let version = env!("GIT_COMMIT");
info!(
ctx.expect_logger(),
"Starting service (git_commit = {})...", version
);
let start_block = match cmd.start_at_block {
Some(entry) => entry,
@@ -744,7 +764,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut service = Service::new(config, ctx.clone());
return service
.run(predicates, None, cmd.block_integrity_check)
.run(
predicates,
None,
cmd.block_integrity_check,
cmd.stream_indexing_to_observers,
)
.await;
}
},
@@ -766,7 +791,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(OrdhookDbCommand::New(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
initialize_ordhook_db(&config.expected_cache_path(), ctx);
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
ctx,
);
}
Command::Db(OrdhookDbCommand::Sync(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
@@ -779,10 +810,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
let mut ordhook_config = config.get_ordhook_config();
if let Some(network_threads) = cmd.network_threads {
ordhook_config.network_thread_max = network_threads;
}
if let Some(network_threads) = cmd.network_threads {
ordhook_config.network_thread_max = network_threads;
ordhook_config.resources.bitcoind_rpc_threads = network_threads;
}
let blocks = cmd.get_blocks();
let block_ingestion_processor =
@@ -800,6 +828,8 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
false,
&config.get_ordhook_config().db_path,
config.resources.ulimit,
config.resources.memory_available,
ctx,
);
for i in cmd.get_blocks().into_iter() {
@@ -819,7 +849,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
let mut ordhook_config = config.get_ordhook_config();
if let Some(network_threads) = cmd.network_threads {
ordhook_config.network_thread_max = network_threads;
ordhook_config.resources.bitcoind_rpc_threads = network_threads;
}
let block_post_processor = match cmd.repair_observers {
Some(true) => {
@@ -870,8 +900,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(OrdhookDbCommand::Check(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
{
let blocks_db =
open_readonly_ordhook_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
let blocks_db = open_readonly_ordhook_db_conn_rocks_db(
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
ctx,
)?;
let tip = find_last_block_inserted(&blocks_db);
println!("Tip: {}", tip);
let missing_blocks = find_missing_blocks(&blocks_db, 1, tip, ctx);
@@ -880,11 +914,26 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
}
Command::Db(OrdhookDbCommand::Drop(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
let blocks_db =
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
ctx,
);
let inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), ctx)?;
println!(
"{} blocks will be deleted. Confirm? [Y/n]",
cmd.end_block - cmd.start_block + 1
);
let mut buffer = String::new();
std::io::stdin().read_line(&mut buffer).unwrap();
if buffer.starts_with('n') {
return Err("Deletion aborted".to_string());
}
delete_data_in_ordhook_db(
cmd.start_block,
cmd.end_block,

View File

@@ -1,29 +1,24 @@
use ordhook::chainhook_sdk::indexer::IndexerConfig;
use ordhook::chainhook_sdk::observer::DEFAULT_INGESTION_PORT;
use ordhook::chainhook_sdk::types::{
BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig,
};
use ordhook::config::{
BootstrapConfig, Config, LimitsConfig, LogConfig, PredicatesApi, PredicatesApiConfig,
StorageConfig,
Config, LogConfig, PredicatesApi, PredicatesApiConfig, ResourcesConfig, SnapshotConfig,
StorageConfig, DEFAULT_BITCOIND_RPC_THREADS, DEFAULT_BITCOIND_RPC_TIMEOUT,
DEFAULT_CONTROL_PORT, DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
};
use std::fs::File;
use std::io::{BufReader, Read};
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 10;
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 10;
pub const STACKS_MAX_PREDICATE_REGISTRATION: usize = 50;
pub const BITCOIN_MAX_PREDICATE_REGISTRATION: usize = 50;
#[derive(Deserialize, Debug, Clone)]
pub struct ConfigFile {
pub storage: StorageConfigFile,
pub http_api: Option<PredicatesApiConfigFile>,
pub limits: LimitsConfigFile,
pub resources: ResourcesConfigFile,
pub network: NetworkConfigFile,
pub logs: Option<LogConfigFile>,
pub bootstrap: Option<BootstrapConfigFile>,
pub snapshot: Option<SnapshotConfigFile>,
}
impl ConfigFile {
@@ -54,12 +49,12 @@ impl ConfigFile {
_ => return Err("network.mode not supported".to_string()),
};
let bootstrap = match config_file.bootstrap {
let snapshot = match config_file.snapshot {
Some(bootstrap) => match bootstrap.download_url {
Some(ref url) => BootstrapConfig::Download(url.to_string()),
None => BootstrapConfig::Build,
Some(ref url) => SnapshotConfig::Download(url.to_string()),
None => SnapshotConfig::Build,
},
None => BootstrapConfig::Build,
None => SnapshotConfig::Build,
};
let config = Config {
@@ -76,36 +71,29 @@ impl ConfigFile {
}),
},
},
bootstrap,
limits: LimitsConfig {
max_number_of_stacks_predicates: config_file
.limits
.max_number_of_stacks_predicates
.unwrap_or(STACKS_MAX_PREDICATE_REGISTRATION),
max_number_of_bitcoin_predicates: config_file
.limits
.max_number_of_bitcoin_predicates
.unwrap_or(BITCOIN_MAX_PREDICATE_REGISTRATION),
max_number_of_concurrent_stacks_scans: config_file
.limits
.max_number_of_concurrent_stacks_scans
.unwrap_or(STACKS_SCAN_THREAD_POOL_SIZE),
max_number_of_concurrent_bitcoin_scans: config_file
.limits
.max_number_of_concurrent_bitcoin_scans
.unwrap_or(BITCOIN_SCAN_THREAD_POOL_SIZE),
max_number_of_processing_threads: config_file
.limits
.max_number_of_processing_threads
.unwrap_or(1.max(num_cpus::get().saturating_sub(1))),
bitcoin_concurrent_http_requests_max: config_file
.limits
.bitcoin_concurrent_http_requests_max
.unwrap_or(1.max(num_cpus::get().saturating_sub(1))),
max_caching_memory_size_mb: config_file
.limits
.max_caching_memory_size_mb
.unwrap_or(2048),
snapshot,
resources: ResourcesConfig {
ulimit: config_file.resources.ulimit.unwrap_or(DEFAULT_ULIMIT),
cpu_core_available: config_file
.resources
.cpu_core_available
.unwrap_or(num_cpus::get()),
memory_available: config_file
.resources
.memory_available
.unwrap_or(DEFAULT_MEMORY_AVAILABLE),
bitcoind_rpc_threads: config_file
.resources
.bitcoind_rpc_threads
.unwrap_or(DEFAULT_BITCOIND_RPC_THREADS),
bitcoind_rpc_timeout: config_file
.resources
.bitcoind_rpc_timeout
.unwrap_or(DEFAULT_BITCOIND_RPC_TIMEOUT),
expected_observers_count: config_file
.resources
.expected_observers_count
.unwrap_or(1),
},
network: IndexerConfig {
bitcoind_rpc_url: config_file.network.bitcoind_rpc_url.to_string(),
@@ -176,19 +164,18 @@ pub struct PredicatesApiConfigFile {
}
#[derive(Deserialize, Debug, Clone)]
pub struct BootstrapConfigFile {
pub struct SnapshotConfigFile {
pub download_url: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct LimitsConfigFile {
pub max_number_of_bitcoin_predicates: Option<usize>,
pub max_number_of_concurrent_bitcoin_scans: Option<usize>,
pub max_number_of_stacks_predicates: Option<usize>,
pub max_number_of_concurrent_stacks_scans: Option<usize>,
pub max_number_of_processing_threads: Option<usize>,
pub max_caching_memory_size_mb: Option<usize>,
pub bitcoin_concurrent_http_requests_max: Option<usize>,
pub struct ResourcesConfigFile {
pub ulimit: Option<usize>,
pub cpu_core_available: Option<usize>,
pub memory_available: Option<usize>,
pub bitcoind_rpc_threads: Option<usize>,
pub bitcoind_rpc_timeout: Option<u32>,
pub expected_observers_count: Option<usize>,
}
#[derive(Deserialize, Debug, Clone)]

View File

@@ -26,16 +26,17 @@ bitcoind_zmq_url = "tcp://0.0.0.0:18543"
# but stacks can also be used:
# stacks_node_rpc_url = "http://0.0.0.0:20443"
[limits]
max_number_of_bitcoin_predicates = 100
max_number_of_concurrent_bitcoin_scans = 100
max_number_of_processing_threads = 16
bitcoin_concurrent_http_requests_max = 16
max_caching_memory_size_mb = 32000
[resources]
ulimit = 2048
cpu_core_available = 16
memory_available = 32
bitcoind_rpc_threads = 4
bitcoind_rpc_timeout = 15
expected_observers_count = 1
# Disable the following section if the state
# must be built locally
[bootstrap]
[snapshot]
download_url = "https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest"
[logs]

View File

@@ -7,6 +7,10 @@ extern crate hiro_system_kit;
pub mod cli;
pub mod config;
#[cfg(feature = "tcmalloc")]
#[global_allocator]
static GLOBAL: tcmalloc2::TcMalloc = tcmalloc2::TcMalloc;
fn main() {
cli::main();
}

View File

@@ -10,8 +10,8 @@ serde_json = "1"
serde_derive = "1"
hex = "0.4.3"
rand = "0.8.5"
chainhook-sdk = { version = "=0.12.0", features = ["zeromq"] }
# chainhook-sdk = { version = "=0.12.0", path = "../../../chainhook/components/chainhook-sdk", features = ["zeromq"] }
chainhook-sdk = { version = "=0.12.5", features = ["zeromq"] }
# chainhook-sdk = { version = "=0.12.1", path = "../../../chainhook/components/chainhook-sdk", features = ["zeromq"] }
hiro-system-kit = "0.3.1"
reqwest = { version = "0.11", default-features = false, features = [
"stream",
@@ -43,6 +43,7 @@ rocksdb = { version = "0.21.0", default-features = false, features = [
pprof = { version = "0.13.0", features = ["flamegraph"], optional = true }
hyper = { version = "=0.14.27" }
lazy_static = { version = "1.4.0" }
ciborium = "0.2.1"
# [profile.release]
# debug = true

View File

@@ -11,18 +11,18 @@ const DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE: &str =
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 10;
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 10;
pub const STACKS_MAX_PREDICATE_REGISTRATION: usize = 50;
pub const BITCOIN_MAX_PREDICATE_REGISTRATION: usize = 50;
pub const DEFAULT_ULIMIT: usize = 2048;
pub const DEFAULT_MEMORY_AVAILABLE: usize = 8;
pub const DEFAULT_BITCOIND_RPC_THREADS: usize = 4;
pub const DEFAULT_BITCOIND_RPC_TIMEOUT: u32 = 15;
#[derive(Clone, Debug)]
pub struct Config {
pub storage: StorageConfig,
pub http_api: PredicatesApi,
pub limits: LimitsConfig,
pub resources: ResourcesConfig,
pub network: IndexerConfig,
pub bootstrap: BootstrapConfig,
pub snapshot: SnapshotConfig,
pub logs: LogConfig,
}
@@ -50,7 +50,7 @@ pub struct PredicatesApiConfig {
}
#[derive(Clone, Debug)]
pub enum BootstrapConfig {
pub enum SnapshotConfig {
Build,
Download(String),
}
@@ -65,15 +65,23 @@ pub struct UrlConfig {
pub file_url: String,
}
#[derive(Clone, Debug)]
pub struct LimitsConfig {
pub max_number_of_bitcoin_predicates: usize,
pub max_number_of_concurrent_bitcoin_scans: usize,
pub max_number_of_stacks_predicates: usize,
pub max_number_of_concurrent_stacks_scans: usize,
pub max_number_of_processing_threads: usize,
pub bitcoin_concurrent_http_requests_max: usize,
pub max_caching_memory_size_mb: usize,
#[derive(Deserialize, Debug, Clone)]
pub struct ResourcesConfig {
pub ulimit: usize,
pub cpu_core_available: usize,
pub memory_available: usize,
pub bitcoind_rpc_threads: usize,
pub bitcoind_rpc_timeout: u32,
pub expected_observers_count: usize,
}
impl ResourcesConfig {
pub fn get_optimal_thread_pool_capacity(&self) -> usize {
// Generally speaking when dealing a pool, we need one thread for
// feeding the thread pool and eventually another thread for
// handling the "reduce" step.
self.cpu_core_available.saturating_sub(2).max(1)
}
}
impl Config {
@@ -86,10 +94,7 @@ impl Config {
pub fn get_ordhook_config(&self) -> OrdhookConfig {
OrdhookConfig {
network_thread_max: self.limits.bitcoin_concurrent_http_requests_max,
ingestion_thread_max: self.limits.max_number_of_processing_threads,
ingestion_thread_queue_size: 4,
cache_size: self.limits.max_caching_memory_size_mb,
resources: self.resources.clone(),
db_path: self.expected_cache_path(),
first_inscription_height: match self.network.bitcoin_network {
BitcoinNetwork::Mainnet => 767430,
@@ -119,9 +124,9 @@ impl Config {
}
pub fn should_bootstrap_through_download(&self) -> bool {
match &self.bootstrap {
BootstrapConfig::Build => false,
BootstrapConfig::Download(_) => true,
match &self.snapshot {
SnapshotConfig::Build => false,
SnapshotConfig::Download(_) => true,
}
}
@@ -139,9 +144,9 @@ impl Config {
}
fn expected_remote_ordinals_sqlite_base_url(&self) -> &str {
match &self.bootstrap {
BootstrapConfig::Build => unreachable!(),
BootstrapConfig::Download(url) => &url,
match &self.snapshot {
SnapshotConfig::Build => unreachable!(),
SnapshotConfig::Download(url) => &url,
}
}
@@ -159,15 +164,14 @@ impl Config {
working_dir: default_cache_path(),
},
http_api: PredicatesApi::Off,
bootstrap: BootstrapConfig::Build,
limits: LimitsConfig {
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE,
max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE,
max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)),
bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)),
max_caching_memory_size_mb: 2048,
snapshot: SnapshotConfig::Build,
resources: ResourcesConfig {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
expected_observers_count: 1,
},
network: IndexerConfig {
bitcoind_rpc_url: "http://0.0.0.0:18443".into(),
@@ -192,15 +196,14 @@ impl Config {
working_dir: default_cache_path(),
},
http_api: PredicatesApi::Off,
bootstrap: BootstrapConfig::Build,
limits: LimitsConfig {
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE,
max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE,
max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)),
bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)),
max_caching_memory_size_mb: 2048,
snapshot: SnapshotConfig::Build,
resources: ResourcesConfig {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
expected_observers_count: 1,
},
network: IndexerConfig {
bitcoind_rpc_url: "http://0.0.0.0:18332".into(),
@@ -225,17 +228,14 @@ impl Config {
working_dir: default_cache_path(),
},
http_api: PredicatesApi::Off,
bootstrap: BootstrapConfig::Download(
DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string(),
),
limits: LimitsConfig {
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE,
max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION,
max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE,
max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)),
bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)),
max_caching_memory_size_mb: 2048,
snapshot: SnapshotConfig::Download(DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string()),
resources: ResourcesConfig {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
expected_observers_count: 1,
},
network: IndexerConfig {
bitcoind_rpc_url: "http://0.0.0.0:8332".into(),

View File

@@ -13,7 +13,7 @@ use chainhook_sdk::{
};
use crate::{
config::{Config, LogConfig},
config::{Config, LogConfig, ResourcesConfig},
db::{find_pinned_block_bytes_at_block_height, open_ordhook_db_conn_rocks_db_loop},
};
@@ -26,10 +26,7 @@ use crate::db::TransactionBytesCursor;
#[derive(Clone, Debug)]
pub struct OrdhookConfig {
pub network_thread_max: usize,
pub ingestion_thread_max: usize,
pub ingestion_thread_queue_size: usize,
pub cache_size: usize,
pub resources: ResourcesConfig,
pub db_path: PathBuf,
pub first_inscription_height: u64,
pub logs: LogConfig,
@@ -59,43 +56,73 @@ pub enum SatPosition {
Fee(u64),
}
pub fn resolve_absolute_pointer(inputs: &Vec<u64>, absolute_pointer_value: u64) -> (usize, u64) {
let mut selected_index = 0;
let mut cumulated_input_value = 0;
// Check for overflow
let total: u64 = inputs.iter().sum();
if absolute_pointer_value > total {
return (0, 0);
}
// Identify the input + satoshi offset being inscribed
for (index, input_value) in inputs.iter().enumerate() {
if (cumulated_input_value + input_value) > absolute_pointer_value {
selected_index = index;
break;
}
cumulated_input_value += input_value;
}
let relative_pointer_value = absolute_pointer_value - cumulated_input_value;
(selected_index, relative_pointer_value)
}
pub fn compute_next_satpoint_data(
_tx_index: usize,
input_index: usize,
offset_intra_input: u64,
inputs: &Vec<u64>,
outputs: &Vec<u64>,
relative_pointer_value: u64,
_ctx: Option<&Context>,
) -> SatPosition {
let mut offset_cross_inputs = 0;
let mut absolute_offset_in_inputs = 0;
for (index, input_value) in inputs.iter().enumerate() {
if index == input_index {
break;
}
offset_cross_inputs += input_value;
absolute_offset_in_inputs += input_value;
}
offset_cross_inputs += offset_intra_input;
absolute_offset_in_inputs += relative_pointer_value;
let mut offset_intra_outputs = 0;
let mut output_index = 0;
let mut absolute_offset_of_first_satoshi_in_selected_output = 0;
let mut selected_output_index = 0;
let mut floating_bound = 0;
for (index, output_value) in outputs.iter().enumerate() {
floating_bound += output_value;
output_index = index;
if floating_bound > offset_cross_inputs {
selected_output_index = index;
if floating_bound > absolute_offset_in_inputs {
break;
}
offset_intra_outputs += output_value;
absolute_offset_of_first_satoshi_in_selected_output += output_value;
}
if output_index == (outputs.len() - 1) && offset_cross_inputs >= floating_bound {
if selected_output_index == (outputs.len() - 1) && absolute_offset_in_inputs >= floating_bound {
// Satoshi spent in fees
return SatPosition::Fee(offset_cross_inputs - floating_bound);
return SatPosition::Fee(absolute_offset_in_inputs - floating_bound);
}
SatPosition::Output((output_index, (offset_cross_inputs - offset_intra_outputs)))
let relative_offset_in_selected_output =
absolute_offset_in_inputs - absolute_offset_of_first_satoshi_in_selected_output;
SatPosition::Output((selected_output_index, relative_offset_in_selected_output))
}
pub fn should_sync_rocks_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
);
let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
let last_compressed_block = find_last_block_inserted(&blocks_db) as u64;
let last_indexed_block = match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?
@@ -128,7 +155,13 @@ pub fn should_sync_ordhook_db(
}
};
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
);
let mut start_block = find_last_block_inserted(&blocks_db) as u64;
if start_block == 0 {
@@ -185,53 +218,57 @@ pub fn should_sync_ordhook_db(
#[test]
fn test_identify_next_output_index_destination() {
assert_eq!(
compute_next_satpoint_data(0, 10, &vec![20, 30, 45], &vec![20, 30, 45]),
compute_next_satpoint_data(0, 0, &vec![20, 30, 45], &vec![20, 30, 45], 10, None),
SatPosition::Output((0, 10))
);
assert_eq!(
compute_next_satpoint_data(0, 20, &vec![20, 30, 45], &vec![20, 30, 45]),
compute_next_satpoint_data(0, 0, &vec![20, 30, 45], &vec![20, 30, 45], 20, None),
SatPosition::Output((1, 0))
);
assert_eq!(
compute_next_satpoint_data(1, 5, &vec![20, 30, 45], &vec![20, 30, 45]),
SatPosition::Output((1, 5))
compute_next_satpoint_data(0, 1, &vec![20, 30, 45], &vec![20, 30, 45], 25, None),
SatPosition::Output((1, 25))
);
assert_eq!(
compute_next_satpoint_data(1, 6, &vec![20, 30, 45], &vec![20, 5, 45]),
SatPosition::Output((2, 1))
compute_next_satpoint_data(0, 1, &vec![20, 30, 45], &vec![20, 5, 45], 26, None),
SatPosition::Output((2, 21))
);
assert_eq!(
compute_next_satpoint_data(1, 10, &vec![10, 10, 10], &vec![30]),
SatPosition::Output((0, 20))
);
assert_eq!(
compute_next_satpoint_data(0, 30, &vec![10, 10, 10], &vec![30]),
compute_next_satpoint_data(0, 1, &vec![10, 10, 10], &vec![30], 20, None),
SatPosition::Fee(0)
);
assert_eq!(
compute_next_satpoint_data(0, 0, &vec![10, 10, 10], &vec![30]),
compute_next_satpoint_data(0, 0, &vec![10, 10, 10], &vec![30], 30, None),
SatPosition::Fee(0)
);
assert_eq!(
compute_next_satpoint_data(0, 0, &vec![10, 10, 10], &vec![30], 0, None),
SatPosition::Output((0, 0))
);
assert_eq!(
compute_next_satpoint_data(2, 45, &vec![20, 30, 45], &vec![20, 30, 45]),
SatPosition::Fee(0)
compute_next_satpoint_data(0, 2, &vec![20, 30, 45], &vec![20, 30, 45], 95, None),
SatPosition::Fee(50)
);
assert_eq!(
compute_next_satpoint_data(
0,
2,
0,
&vec![1000, 600, 546, 63034],
&vec![1600, 10000, 15000]
&vec![1600, 10000, 15000],
1600,
None
),
SatPosition::Output((1, 0))
SatPosition::Output((1, 1600))
);
assert_eq!(
compute_next_satpoint_data(
3,
0,
3,
&vec![6100, 148660, 103143, 7600],
&vec![81434, 173995]
&vec![81434, 173995],
257903,
None
),
SatPosition::Fee(2474)
SatPosition::Fee(260377)
);
}

View File

@@ -73,11 +73,27 @@ pub async fn download_and_pipeline_blocks(
let end_block = *blocks.last().expect("no blocks to pipeline");
let mut block_heights = VecDeque::from(blocks);
for _ in 0..ordhook_config.ingestion_thread_queue_size {
// All the requests are being processed on the same thread.
// As soon as we are getting the bytes back from wire, the
// processing is moved to a thread pool, to defer the parsing, quite expensive.
// We are initially seeding the networking thread with N requests,
// with N being the number of threads in the pool handling the response.
// We need:
// - 1 thread for the thread handling networking
// - 1 thread for the thread handling disk serialization
let thread_pool_network_response_processing_capacity =
ordhook_config.resources.get_optimal_thread_pool_capacity();
// For each worker in that pool, we want to bound the size of the queue to avoid OOM
// Blocks size can range from 1 to 4Mb (when packed with witness data).
// Start blocking networking when each worker has a backlog of 8 blocks seems reasonable.
let worker_queue_size = 2;
for _ in 0..ordhook_config.resources.bitcoind_rpc_threads {
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = moved_ctx.clone();
let http_client = moved_http_client.clone();
// We interleave the initial requests to avoid DDOSing bitcoind from the get go.
sleep(Duration::from_millis(500));
set.spawn(try_download_block_bytes_with_retry(
http_client,
@@ -95,8 +111,8 @@ pub async fn download_and_pipeline_blocks(
let mut rx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
for _ in 0..ordhook_config.ingestion_thread_max {
let (tx, rx) = bounded::<Option<Vec<u8>>>(ordhook_config.ingestion_thread_queue_size);
for _ in 0..thread_pool_network_response_processing_capacity {
let (tx, rx) = bounded::<Option<Vec<u8>>>(worker_queue_size);
tx_thread_pool.push(tx);
rx_thread_pool.push(rx);
}
@@ -244,11 +260,22 @@ pub async fn download_and_pipeline_blocks(
})
.expect("unable to spawn thread");
let mut thread_index = 0;
let mut round_robin_worker_thread_index = 0;
while let Some(res) = set.join_next().await {
let block = res.unwrap().unwrap();
let block = res
.expect("unable to retrieve block")
.expect("unable to deserialize block");
loop {
let res = tx_thread_pool[round_robin_worker_thread_index].send(Some(block.clone()));
round_robin_worker_thread_index = (round_robin_worker_thread_index + 1)
% thread_pool_network_response_processing_capacity;
if res.is_ok() {
break;
}
sleep(Duration::from_millis(500));
}
let _ = tx_thread_pool[thread_index].send(Some(block));
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = ctx.clone();
@@ -260,7 +287,6 @@ pub async fn download_and_pipeline_blocks(
ctx,
));
}
thread_index = (thread_index + 1) % ordhook_config.ingestion_thread_max;
}
ctx.try_log(|logger| {

View File

@@ -25,8 +25,13 @@ pub fn start_block_archiving_processor(
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
.spawn(move || {
let blocks_db_rw =
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
);
let mut processed_blocks = 0;
loop {

View File

@@ -26,9 +26,10 @@ use crate::{
},
inscription_sequencing::{
augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx,
get_bitcoin_network, get_jubilee_block_height,
parallelize_inscription_data_computations, SequenceCursor,
},
inscription_tracking::augment_block_with_ordinals_transfer_data,
satoshi_tracking::augment_block_with_ordinals_transfer_data,
},
OrdhookConfig,
},
@@ -107,6 +108,8 @@ pub fn start_inscription_indexing_processor(
let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
);
store_compacted_blocks(
@@ -188,6 +191,13 @@ pub fn process_blocks(
ctx,
);
// Invalidate and recompute cursor when crossing the jubilee height
let jubilee_height =
get_jubilee_block_height(&get_bitcoin_network(&block.metadata.network));
if block.block_identifier.index == jubilee_height {
sequence_cursor.reset();
}
let _ = process_block(
&mut block,
&next_blocks,
@@ -260,7 +270,7 @@ pub fn process_block(
block: &mut BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
sequence_cursor: &mut SequenceCursor,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
inscriptions_db_tx: &Transaction,
ordhook_config: &OrdhookConfig,

View File

@@ -12,11 +12,11 @@ use crate::{
pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
protocol::{
inscription_sequencing::consolidate_block_with_pre_computed_ordinals_data,
inscription_tracking::augment_block_with_ordinals_transfer_data,
satoshi_tracking::augment_block_with_ordinals_transfer_data,
},
},
db::{
insert_new_inscriptions_from_block_in_locations, open_readwrite_ordhook_db_conn,
insert_entries_from_block_in_inscriptions, open_readwrite_ordhook_db_conn,
remove_entries_from_locations_at_block_height,
},
};
@@ -83,11 +83,7 @@ pub fn start_transfers_recomputing_processor(
&ctx,
);
insert_new_inscriptions_from_block_in_locations(
block,
&inscriptions_db_tx,
&ctx,
);
insert_entries_from_block_in_inscriptions(block, &inscriptions_db_tx, &ctx);
augment_block_with_ordinals_transfer_data(
block,

View File

@@ -7,6 +7,7 @@ use chainhook_sdk::types::{
OrdinalOperation,
};
use chainhook_sdk::utils::Context;
use serde_json::json;
use std::collections::BTreeMap;
use std::str::FromStr;
@@ -20,6 +21,11 @@ pub fn parse_inscriptions_from_witness(
witness_bytes: Vec<Vec<u8>>,
txid: &str,
) -> Option<Vec<OrdinalInscriptionRevealData>> {
// Efficient debugging: Isolate one specific transaction
// if !txid.eq("aa2ab56587c7d6609c95157e6dff37c5c3fa6531702f41229a289a5613887077") {
// return None
// }
let witness = Witness::from_slice(&witness_bytes);
let tapscript = witness.tapscript()?;
let envelopes: Vec<Envelope<Inscription>> = RawEnvelope::from_tapscript(tapscript, input_index)
@@ -59,6 +65,17 @@ pub fn parse_inscriptions_from_witness(
let mut content_bytes = "0x".to_string();
content_bytes.push_str(&hex::encode(&inscription_content_bytes));
let parent = envelope.payload.parent().and_then(|i| Some(i.to_string()));
let delegate = envelope
.payload
.delegate()
.and_then(|i| Some(i.to_string()));
let metaprotocol = envelope
.payload
.metaprotocol()
.and_then(|p| Some(p.to_string()));
let metadata = envelope.payload.metadata().and_then(|m| Some(json!(m)));
let reveal_data = OrdinalInscriptionRevealData {
content_type: envelope
.payload
@@ -71,9 +88,14 @@ pub fn parse_inscriptions_from_witness(
inscription_input_index: input_index,
tx_index: 0,
inscription_output_value: 0,
inscription_pointer: envelope.payload.pointer(),
inscription_fee: 0,
inscription_number: OrdinalInscriptionNumber::zero(),
inscriber_address: None,
parent,
delegate,
metaprotocol,
metadata,
ordinal_number: 0,
ordinal_block_height: 0,
ordinal_offset: 0,

View File

@@ -1,15 +1,15 @@
use std::{
collections::{BTreeMap, HashMap, VecDeque},
collections::{BTreeMap, HashMap, HashSet, VecDeque},
hash::BuildHasherDefault,
sync::Arc,
};
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{Address, Network, ScriptBuf},
bitcoincore_rpc_json::bitcoin::Network,
types::{
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier,
OrdinalInscriptionCurseType, OrdinalInscriptionNumber, OrdinalOperation,
TransactionIdentifier,
OrdinalInscriptionCurseType, OrdinalInscriptionNumber,
OrdinalInscriptionTransferDestination, OrdinalOperation, TransactionIdentifier,
},
utils::Context,
};
@@ -19,26 +19,26 @@ use fxhash::FxHasher;
use rusqlite::{Connection, Transaction};
use crate::{
core::OrdhookConfig,
core::{resolve_absolute_pointer, OrdhookConfig},
db::{
find_blessed_inscription_with_ordinal_number, find_nth_classic_neg_number_at_block_height,
find_nth_classic_pos_number_at_block_height, find_nth_jubilee_number_at_block_height,
format_inscription_id, format_satpoint_to_watch, update_inscriptions_with_block,
update_sequence_metadata_with_block, TransactionBytesCursor, TraversalResult,
format_inscription_id, update_ordinals_db_with_block, update_sequence_metadata_with_block,
TransactionBytesCursor, TraversalResult,
},
ord::height::Height,
};
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::sync::mpsc::channel;
use crate::db::find_all_inscriptions_in_block;
use super::{
inscription_parsing::get_inscriptions_revealed_in_block,
inscription_tracking::augment_transaction_with_ordinals_transfers_data,
satoshi_numbering::compute_satoshi_number,
satoshi_tracking::{
augment_transaction_with_ordinals_transfers_data, compute_satpoint_post_transfer,
},
};
/// Parallelize the computation of ordinals numbers for inscriptions present in a block.
@@ -67,7 +67,7 @@ use super::{
pub fn parallelize_inscription_data_computations(
block: &BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
inscriptions_db_tx: &Transaction,
ordhook_config: &OrdhookConfig,
@@ -86,12 +86,12 @@ pub fn parallelize_inscription_data_computations(
)
});
let (mut transactions_ids, l1_cache_hits) =
let (transactions_ids, l1_cache_hits) =
get_transactions_to_process(block, cache_l1, inscriptions_db_tx, ctx);
let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty();
let thread_max = ordhook_config.ingestion_thread_max;
let thread_pool_capacity = ordhook_config.resources.get_optimal_thread_pool_capacity();
// Nothing to do? early return
if !has_transactions_to_process {
@@ -104,29 +104,41 @@ pub fn parallelize_inscription_data_computations(
let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
for thread_index in 0..thread_max {
for thread_index in 0..thread_pool_capacity {
let (tx, rx) = channel();
tx_thread_pool.push(tx);
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = inner_ctx.clone();
let moved_ordhook_db_path = ordhook_config.db_path.clone();
let ulimit = ordhook_config.resources.ulimit;
let memory_available = ordhook_config.resources.memory_available;
let local_cache = cache_l2.clone();
let handle = hiro_system_kit::thread_named("Worker")
.spawn(move || {
while let Ok(Some((transaction_id, block_identifier, input_index, prioritary))) =
rx.recv()
while let Ok(Some((
transaction_id,
block_identifier,
input_index,
inscription_pointer,
prioritary,
))) = rx.recv()
{
let traversal: Result<(TraversalResult, _), String> = compute_satoshi_number(
&moved_ordhook_db_path,
&block_identifier,
&transaction_id,
input_index,
&local_cache,
false,
&moved_ctx,
);
let traversal: Result<(TraversalResult, u64, _), String> =
compute_satoshi_number(
&moved_ordhook_db_path,
&block_identifier,
&transaction_id,
input_index,
inscription_pointer,
&local_cache,
ulimit,
memory_available,
false,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
}
})
@@ -135,12 +147,16 @@ pub fn parallelize_inscription_data_computations(
}
// Consume L1 cache: if the traversal was performed in a previous round
// retrieve it and use it.
let mut thread_index = 0;
// retrieve it and inject it to the "reduce" worker (by-passing the "map" thread pool)
let mut round_robin_thread_index = 0;
for key in l1_cache_hits.iter() {
if let Some(entry) = cache_l1.get(key) {
let _ = traversal_tx.send((Ok((entry.clone(), vec![])), true, thread_index));
thread_index = (thread_index + 1) % thread_max;
let _ = traversal_tx.send((
Ok((entry.clone(), key.2, vec![])),
true,
round_robin_thread_index,
));
round_robin_thread_index = (round_robin_thread_index + 1) % thread_pool_capacity;
}
}
@@ -162,25 +178,24 @@ pub fn parallelize_inscription_data_computations(
)
});
let mut rng = thread_rng();
transactions_ids.shuffle(&mut rng);
let mut priority_queue = VecDeque::new();
let mut warmup_queue = VecDeque::new();
for (transaction_id, input_index) in transactions_ids.into_iter() {
for (transaction_id, input_index, inscription_pointer) in transactions_ids.into_iter() {
priority_queue.push_back((
transaction_id,
block.block_identifier.clone(),
input_index,
inscription_pointer,
true,
));
}
// Feed each workers with 2 workitems each
for thread_index in 0..thread_max {
// Feed each worker from the thread pool with 2 workitems each
for thread_index in 0..thread_pool_capacity {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
for thread_index in 0..thread_max {
for thread_index in 0..thread_pool_capacity {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
@@ -191,13 +206,14 @@ pub fn parallelize_inscription_data_computations(
traversals_received += 1;
}
match traversal_result {
Ok((traversal, _)) => {
Ok((traversal, inscription_pointer, _)) => {
inner_ctx.try_log(|logger| {
info!(
logger,
"Completed ordinal number retrieval for Satpoint {}:{}:0 (block: #{}:{}, transfers: {}, progress: {traversals_received}/{expected_traversals}, priority queue: {prioritary}, thread: {thread_index})",
"Completed ordinal number retrieval for Satpoint {}:{}:{} (block: #{}:{}, transfers: {}, progress: {traversals_received}/{expected_traversals}, priority queue: {prioritary}, thread: {thread_index})",
traversal.transaction_identifier_inscription.hash,
traversal.inscription_input_index,
inscription_pointer,
traversal.get_ordinal_coinbase_height(),
traversal.get_ordinal_coinbase_offset(),
traversal.transfers
@@ -207,6 +223,7 @@ pub fn parallelize_inscription_data_computations(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
inscription_pointer,
),
traversal,
);
@@ -229,7 +246,7 @@ pub fn parallelize_inscription_data_computations(
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(next_block) = next_block_iter.next() {
let (mut transactions_ids, _) =
let (transactions_ids, _) =
get_transactions_to_process(next_block, cache_l1, inscriptions_db_tx, ctx);
inner_ctx.try_log(|logger| {
@@ -241,12 +258,14 @@ pub fn parallelize_inscription_data_computations(
)
});
transactions_ids.shuffle(&mut rng);
for (transaction_id, input_index) in transactions_ids.into_iter() {
for (transaction_id, input_index, inscription_pointer) in
transactions_ids.into_iter()
{
warmup_queue.push_back((
transaction_id,
next_block.block_identifier.clone(),
input_index,
inscription_pointer,
false,
));
}
@@ -266,13 +285,14 @@ pub fn parallelize_inscription_data_computations(
for tx in tx_thread_pool.iter() {
// Empty the queue
if let Ok((traversal_result, _prioritary, thread_index)) = traversal_rx.try_recv() {
if let Ok((traversal, _)) = traversal_result {
if let Ok((traversal, inscription_pointer, _)) = traversal_result {
inner_ctx.try_log(|logger| {
info!(
logger,
"Completed ordinal number retrieval for Satpoint {}:{}:0 (block: #{}:{}, transfers: {}, pre-retrieval, thread: {thread_index})",
"Completed ordinal number retrieval for Satpoint {}:{}:{} (block: #{}:{}, transfers: {}, pre-retrieval, thread: {thread_index})",
traversal.transaction_identifier_inscription.hash,
traversal.inscription_input_index,
inscription_pointer,
traversal.get_ordinal_coinbase_height(),
traversal.get_ordinal_coinbase_offset(),
traversal.transfers
@@ -282,6 +302,7 @@ pub fn parallelize_inscription_data_computations(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
inscription_pointer,
),
traversal,
);
@@ -292,12 +313,9 @@ pub fn parallelize_inscription_data_computations(
let ctx_moved = inner_ctx.clone();
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
ctx_moved.try_log(|logger| info!(logger, "Cleanup: threadpool deallocation started",));
for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
ctx_moved.try_log(|logger| info!(logger, "Cleanup: threadpool deallocation ended",));
});
inner_ctx.try_log(|logger| {
@@ -326,20 +344,27 @@ pub fn parallelize_inscription_data_computations(
///
fn get_transactions_to_process(
block: &BitcoinBlockData,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>,
inscriptions_db_tx: &Transaction,
ctx: &Context,
) -> (
Vec<(TransactionIdentifier, usize)>,
Vec<(TransactionIdentifier, usize)>,
HashSet<(TransactionIdentifier, usize, u64)>,
Vec<(TransactionIdentifier, usize, u64)>,
) {
let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![];
let mut transactions_ids = HashSet::new();
let mut l1_cache_hits = vec![];
let known_transactions =
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_tx, ctx);
for tx in block.transactions.iter().skip(1) {
let inputs = tx
.metadata
.inputs
.iter()
.map(|i| i.previous_output.value)
.collect::<Vec<u64>>();
// Have a new inscription been revealed, if so, are looking at a re-inscription
for ordinal_event in tx.metadata.ordinal_operations.iter() {
let inscription_data = match ordinal_event {
@@ -348,9 +373,16 @@ fn get_transactions_to_process(
continue;
}
};
let (input_index, relative_offset) = match inscription_data.inscription_pointer {
Some(pointer) => resolve_absolute_pointer(&inputs, pointer),
None => (inscription_data.inscription_input_index, 0),
};
let key = (
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
input_index,
relative_offset,
);
if cache_l1.contains_key(&key) {
l1_cache_hits.push(key);
@@ -361,11 +393,12 @@ fn get_transactions_to_process(
continue;
}
if transactions_ids.contains(&key) {
continue;
}
// Enqueue for traversals
transactions_ids.push((
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
));
transactions_ids.insert(key);
}
}
(transactions_ids, l1_cache_hits)
@@ -420,14 +453,8 @@ impl<'a> SequenceCursor<'a> {
true => self.pick_next_neg_classic(ctx),
false => self.pick_next_pos_classic(ctx),
};
let jubilee_height = match network {
Network::Bitcoin => 824544,
Network::Regtest => 110,
Network::Signet => 175392,
Network::Testnet => 2544192,
_ => unreachable!(),
};
let jubilee = if block_height >= jubilee_height {
let jubilee = if block_height >= get_jubilee_block_height(&network) {
self.pick_next_jubilee_number(ctx)
} else {
classic
@@ -505,6 +532,25 @@ impl<'a> SequenceCursor<'a> {
}
}
pub fn get_jubilee_block_height(network: &Network) -> u64 {
match network {
Network::Bitcoin => 824544,
Network::Regtest => 110,
Network::Signet => 175392,
Network::Testnet => 2544192,
_ => unreachable!(),
}
}
pub fn get_bitcoin_network(network: &BitcoinNetwork) -> Network {
match network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
BitcoinNetwork::Signet => Network::Signet,
}
}
/// Given a `BitcoinBlockData` that have been augmented with the functions `parse_inscriptions_in_raw_tx`, `parse_inscriptions_in_standardized_tx`
/// or `parse_inscriptions_and_standardize_block`, mutate the ordinals drafted informations with actual, consensus data.
///
@@ -514,7 +560,7 @@ impl<'a> SequenceCursor<'a> {
pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
block: &mut BitcoinBlockData,
sequence_cursor: &mut SequenceCursor,
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>,
inscriptions_db_tx: &Transaction,
ctx: &Context,
) -> bool {
@@ -541,7 +587,7 @@ pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
);
// Store inscriptions
update_inscriptions_with_block(block, inscriptions_db_tx, ctx);
update_ordinals_db_with_block(block, inscriptions_db_tx, ctx);
update_sequence_metadata_with_block(block, inscriptions_db_tx, ctx);
any_events
}
@@ -557,7 +603,7 @@ pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
pub fn augment_block_with_ordinals_inscriptions_data(
block: &mut BitcoinBlockData,
sequence_cursor: &mut SequenceCursor,
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>,
reinscriptions_data: &mut HashMap<u64, String>,
ctx: &Context,
) -> bool {
@@ -565,12 +611,10 @@ pub fn augment_block_with_ordinals_inscriptions_data(
let mut sats_overflows = VecDeque::new();
let mut any_event = false;
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
BitcoinNetwork::Signet => Network::Signet,
};
let network = get_bitcoin_network(&block.metadata.network);
let coinbase_subsidy = Height(block.block_identifier.index).subsidy();
let coinbase_txid = &block.transactions[0].transaction_identifier.clone();
let mut cumulated_fees = 0u64;
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
any_event |= augment_transaction_with_ordinals_inscriptions_data(
@@ -580,6 +624,9 @@ pub fn augment_block_with_ordinals_inscriptions_data(
sequence_cursor,
&network,
inscriptions_data,
coinbase_txid,
coinbase_subsidy,
&mut cumulated_fees,
&mut sats_overflows,
reinscriptions_data,
ctx,
@@ -598,6 +645,7 @@ pub fn augment_block_with_ordinals_inscriptions_data(
sequence_cursor.pick_next(is_curse, block.block_identifier.index, &network, &ctx);
inscription_data.inscription_number = inscription_number;
sequence_cursor.increment_jubilee_number(ctx);
if is_curse {
sequence_cursor.increment_neg_classic(ctx);
} else {
@@ -631,14 +679,26 @@ fn augment_transaction_with_ordinals_inscriptions_data(
block_identifier: &BlockIdentifier,
sequence_cursor: &mut SequenceCursor,
network: &Network,
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>,
coinbase_txid: &TransactionIdentifier,
coinbase_subsidy: u64,
cumulated_fees: &mut u64,
sats_overflows: &mut VecDeque<(usize, usize)>,
reinscriptions_data: &mut HashMap<u64, String>,
ctx: &Context,
) -> bool {
let inputs = tx
.metadata
.inputs
.iter()
.map(|i| i.previous_output.value)
.collect::<Vec<u64>>();
let any_event = tx.metadata.ordinal_operations.is_empty() == false;
let mut mutated_operations = vec![];
mutated_operations.append(&mut tx.metadata.ordinal_operations);
let mut inscription_subindex = 0;
for (op_index, op) in tx.metadata.ordinal_operations.iter_mut().enumerate() {
for (op_index, op) in mutated_operations.iter_mut().enumerate() {
let (mut is_cursed, inscription) = match op {
OrdinalOperation::InscriptionRevealed(inscription) => {
(inscription.curse_type.as_ref().is_some(), inscription)
@@ -646,23 +706,27 @@ fn augment_transaction_with_ordinals_inscriptions_data(
OrdinalOperation::InscriptionTransferred(_) => continue,
};
let (input_index, relative_offset) = match inscription.inscription_pointer {
Some(pointer) => resolve_absolute_pointer(&inputs, pointer),
None => (inscription.inscription_input_index, 0),
};
let transaction_identifier = tx.transaction_identifier.clone();
let inscription_id = format_inscription_id(&transaction_identifier, inscription_subindex);
let traversal = match inscriptions_data
.get(&(transaction_identifier, inscription.inscription_input_index))
{
Some(traversal) => traversal,
None => {
let err_msg = format!(
"Unable to retrieve backward traversal result for inscription {}",
tx.transaction_identifier.hash
);
ctx.try_log(|logger| {
error!(logger, "{}", err_msg);
});
std::process::exit(1);
}
};
let traversal =
match inscriptions_data.get(&(transaction_identifier, input_index, relative_offset)) {
Some(traversal) => traversal,
None => {
let err_msg = format!(
"Unable to retrieve backward traversal result for inscription {}",
tx.transaction_identifier.hash
);
ctx.try_log(|logger| {
error!(logger, "{}", err_msg);
});
std::process::exit(1);
}
};
// Do we need to curse the inscription?
let mut inscription_number =
@@ -692,7 +756,6 @@ fn augment_transaction_with_ordinals_inscriptions_data(
inscription.inscription_id = inscription_id;
inscription.inscription_number = inscription_number;
let outputs = &tx.metadata.outputs;
inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset();
inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height();
inscription.ordinal_number = traversal.ordinal_number;
@@ -703,41 +766,41 @@ fn augment_transaction_with_ordinals_inscriptions_data(
Some(curse_type) => Some(curse_type),
None => inscription.curse_type.take(),
};
inscription.satpoint_post_inscription = format_satpoint_to_watch(
&traversal.transfer_data.transaction_identifier_location,
traversal.transfer_data.output_index,
traversal.transfer_data.inscription_offset_intra_output,
);
if let Some(output) = outputs.get(traversal.transfer_data.output_index) {
inscription.inscription_output_value = output.value;
inscription.inscriber_address = {
let script_pub_key = output.get_script_pubkey_hex();
match ScriptBuf::from_hex(&script_pub_key) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(a) => Some(a.to_string()),
_ => None,
},
_ => None,
}
};
} else {
ctx.try_log(|logger| {
warn!(
logger,
"Database corrupted, skipping cursed inscription => {:?} / {:?}",
traversal,
outputs
);
});
}
if traversal.ordinal_number == 0 {
// If the satoshi inscribed correspond to a sat overflow, we will store the inscription
// and assign an inscription number after the other inscriptions, to mimick the
// bug in ord.
sats_overflows.push_back((tx_index, op_index));
continue;
}
let (destination, satpoint_post_transfer, output_value) = compute_satpoint_post_transfer(
&&*tx,
tx_index,
input_index,
relative_offset,
network,
coinbase_txid,
coinbase_subsidy,
cumulated_fees,
ctx,
);
// Compute satpoint_post_inscription
inscription.satpoint_post_inscription = satpoint_post_transfer;
inscription_subindex += 1;
match destination {
OrdinalInscriptionTransferDestination::SpentInFees => {
// Inscriptions are assigned inscription numbers starting at zero, first by the
// order reveal transactions appear in blocks, and the order that reveal envelopes
// appear in those transactions.
// Due to a historical bug in `ord` which cannot be fixed without changing a great
// many inscription numbers, inscriptions which are revealed and then immediately
// spent to fees are numbered as if they appear last in the block in which they
// are revealed.
sats_overflows.push_back((tx_index, op_index));
continue;
}
OrdinalInscriptionTransferDestination::Burnt(_) => {}
OrdinalInscriptionTransferDestination::Transferred(address) => {
inscription.inscription_output_value = output_value.unwrap_or(0);
inscription.inscriber_address = Some(address);
}
};
// The reinscriptions_data needs to be augmented as we go, to handle transaction chaining.
if !is_cursed {
@@ -762,8 +825,11 @@ fn augment_transaction_with_ordinals_inscriptions_data(
} else {
sequence_cursor.increment_pos_classic(ctx);
}
inscription_subindex += 1;
}
tx.metadata
.ordinal_operations
.append(&mut mutated_operations);
any_event
}
@@ -773,12 +839,24 @@ fn consolidate_transaction_with_pre_computed_inscription_data(
tx: &mut BitcoinTransactionData,
tx_index: usize,
coinbase_txid: &TransactionIdentifier,
coinbase_subsidy: u64,
cumulated_fees: &mut u64,
network: &Network,
inscriptions_data: &mut BTreeMap<String, TraversalResult>,
_ctx: &Context,
ctx: &Context,
) {
let mut subindex = 0;
for operation in tx.metadata.ordinal_operations.iter_mut() {
let mut mutated_operations = vec![];
mutated_operations.append(&mut tx.metadata.ordinal_operations);
let inputs = tx
.metadata
.inputs
.iter()
.map(|i| i.previous_output.value)
.collect::<Vec<u64>>();
for operation in mutated_operations.iter_mut() {
let inscription = match operation {
OrdinalOperation::InscriptionRevealed(ref mut inscription) => inscription,
OrdinalOperation::InscriptionTransferred(_) => continue,
@@ -799,42 +877,42 @@ fn consolidate_transaction_with_pre_computed_inscription_data(
inscription.transfers_pre_inscription = traversal.transfers;
inscription.inscription_fee = tx.metadata.fee;
inscription.tx_index = tx_index;
inscription.satpoint_post_inscription = format_satpoint_to_watch(
&traversal.transfer_data.transaction_identifier_location,
traversal.transfer_data.output_index,
traversal.transfer_data.inscription_offset_intra_output,
let (input_index, relative_offset) = match inscription.inscription_pointer {
Some(pointer) => resolve_absolute_pointer(&inputs, pointer),
None => (traversal.inscription_input_index, 0),
};
// Compute satpoint_post_inscription
let (destination, satpoint_post_transfer, output_value) = compute_satpoint_post_transfer(
tx,
tx_index,
input_index,
relative_offset,
network,
coinbase_txid,
coinbase_subsidy,
cumulated_fees,
ctx,
);
inscription.satpoint_post_inscription = satpoint_post_transfer;
if inscription.inscription_number.classic < 0 {
inscription.curse_type = Some(OrdinalInscriptionCurseType::Generic);
}
if traversal
.transfer_data
.transaction_identifier_location
.eq(coinbase_txid)
{
continue;
}
if let Some(output) = tx
.metadata
.outputs
.get(traversal.transfer_data.output_index)
{
inscription.inscription_output_value = output.value;
inscription.inscriber_address = {
let script_pub_key = output.get_script_pubkey_hex();
match ScriptBuf::from_hex(&script_pub_key) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(a) => Some(a.to_string()),
_ => None,
},
_ => None,
}
};
match destination {
OrdinalInscriptionTransferDestination::SpentInFees => continue,
OrdinalInscriptionTransferDestination::Burnt(_) => continue,
OrdinalInscriptionTransferDestination::Transferred(address) => {
inscription.inscription_output_value = output_value.unwrap_or(0);
inscription.inscriber_address = Some(address);
}
}
}
tx.metadata
.ordinal_operations
.append(&mut mutated_operations);
}
/// Best effort to re-augment a `BitcoinBlockData` with data coming from `inscriptions` and `locations` tables.
@@ -845,13 +923,7 @@ pub fn consolidate_block_with_pre_computed_ordinals_data(
include_transfers: bool,
ctx: &Context,
) {
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
BitcoinNetwork::Signet => Network::Signet,
};
let network = get_bitcoin_network(&block.metadata.network);
let coinbase_subsidy = Height(block.block_identifier.index).subsidy();
let coinbase_txid = &block.transactions[0].transaction_identifier.clone();
let mut cumulated_fees = 0;
@@ -878,6 +950,8 @@ pub fn consolidate_block_with_pre_computed_ordinals_data(
tx,
tx_index,
coinbase_txid,
coinbase_subsidy,
&mut cumulated_fees,
&network,
&mut inscriptions_data,
ctx,
@@ -888,7 +962,6 @@ pub fn consolidate_block_with_pre_computed_ordinals_data(
let _ = augment_transaction_with_ordinals_transfers_data(
tx,
tx_index,
&block.block_identifier,
&network,
&coinbase_txid,
coinbase_subsidy,

View File

@@ -1,215 +0,0 @@
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{Address, Network, ScriptBuf},
types::{
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier,
OrdinalInscriptionTransferData, OrdinalInscriptionTransferDestination, OrdinalOperation,
TransactionIdentifier,
},
utils::Context,
};
use crate::{
core::{compute_next_satpoint_data, SatPosition},
db::{
find_inscriptions_at_wached_outpoint, format_outpoint_to_watch,
insert_transfer_in_locations_tx,
},
ord::height::Height,
};
use rusqlite::Transaction;
pub fn augment_block_with_ordinals_transfer_data(
block: &mut BitcoinBlockData,
inscriptions_db_tx: &Transaction,
update_db_tx: bool,
ctx: &Context,
) -> bool {
let mut any_event = false;
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
BitcoinNetwork::Signet => Network::Signet,
};
let coinbase_subsidy = Height(block.block_identifier.index).subsidy();
let coinbase_txid = &block.transactions[0].transaction_identifier.clone();
let mut cumulated_fees = 0;
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
let transfers = augment_transaction_with_ordinals_transfers_data(
tx,
tx_index,
&block.block_identifier,
&network,
&coinbase_txid,
coinbase_subsidy,
&mut cumulated_fees,
inscriptions_db_tx,
ctx,
);
any_event |= !transfers.is_empty();
if update_db_tx {
// Store transfers between each iteration
for transfer_data in transfers.into_iter() {
insert_transfer_in_locations_tx(
&transfer_data,
&block.block_identifier,
&inscriptions_db_tx,
&ctx,
);
}
}
}
any_event
}
pub fn augment_transaction_with_ordinals_transfers_data(
tx: &mut BitcoinTransactionData,
tx_index: usize,
block_identifier: &BlockIdentifier,
network: &Network,
coinbase_txid: &TransactionIdentifier,
coinbase_subsidy: u64,
cumulated_fees: &mut u64,
inscriptions_db_tx: &Transaction,
ctx: &Context,
) -> Vec<OrdinalInscriptionTransferData> {
let mut transfers = vec![];
for (input_index, input) in tx.metadata.inputs.iter().enumerate() {
let outpoint_pre_transfer = format_outpoint_to_watch(
&input.previous_output.txid,
input.previous_output.vout as usize,
);
let entries =
find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &inscriptions_db_tx, ctx);
// For each satpoint inscribed retrieved, we need to compute the next
// outpoint to watch
for watched_satpoint in entries.into_iter() {
let satpoint_pre_transfer =
format!("{}:{}", outpoint_pre_transfer, watched_satpoint.offset);
// Question is: are inscriptions moving to a new output,
// burnt or lost in fees and transfered to the miner?
let inputs = tx
.metadata
.inputs
.iter()
.map(|o| o.previous_output.value)
.collect::<_>();
let outputs = tx.metadata.outputs.iter().map(|o| o.value).collect::<_>();
let post_transfer_data =
compute_next_satpoint_data(input_index, watched_satpoint.offset, &inputs, &outputs);
let (
outpoint_post_transfer,
offset_post_transfer,
destination,
post_transfer_output_value,
) = match post_transfer_data {
SatPosition::Output((output_index, offset)) => {
let outpoint =
format_outpoint_to_watch(&tx.transaction_identifier, output_index);
let script_pub_key_hex =
tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match ScriptBuf::from_hex(&script_pub_key_hex) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(address) => OrdinalInscriptionTransferDestination::Transferred(
address.to_string(),
),
Err(e) => {
ctx.try_log(|logger| {
info!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
)
});
OrdinalInscriptionTransferDestination::Burnt(script.to_string())
}
},
Err(e) => {
ctx.try_log(|logger| {
info!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
)
});
OrdinalInscriptionTransferDestination::Burnt(
script_pub_key_hex.to_string(),
)
}
};
// At this point we know that inscriptions are being moved.
ctx.try_log(|logger| {
info!(
logger,
"Inscription {} moved from {} to {} (block: {})",
watched_satpoint.inscription_id,
satpoint_pre_transfer,
outpoint,
block_identifier.index,
)
});
(
outpoint,
offset,
updated_address,
Some(tx.metadata.outputs[output_index].value),
)
}
SatPosition::Fee(offset) => {
// Get Coinbase TX
let total_offset = coinbase_subsidy + *cumulated_fees + offset;
let outpoint = format_outpoint_to_watch(&coinbase_txid, 0);
ctx.try_log(|logger| {
info!(
logger,
"Inscription {} spent in fees ({}+{}+{})",
watched_satpoint.inscription_id,
coinbase_subsidy,
cumulated_fees,
offset
)
});
(
outpoint,
total_offset,
OrdinalInscriptionTransferDestination::SpentInFees,
None,
)
}
};
let satpoint_post_transfer =
format!("{}:{}", outpoint_post_transfer, offset_post_transfer);
let transfer_data = OrdinalInscriptionTransferData {
inscription_id: watched_satpoint.inscription_id.clone(),
destination,
tx_index,
satpoint_pre_transfer,
satpoint_post_transfer,
post_transfer_output_value,
};
transfers.push(transfer_data.clone());
// Attach transfer event
tx.metadata
.ordinal_operations
.push(OrdinalOperation::InscriptionTransferred(transfer_data));
}
}
*cumulated_fees += tx.metadata.fee;
transfers
}

View File

@@ -1,4 +1,4 @@
pub mod inscription_parsing;
pub mod inscription_sequencing;
pub mod inscription_tracking;
pub mod satoshi_numbering;
pub mod satoshi_tracking;

View File

@@ -8,7 +8,6 @@ use std::sync::Arc;
use crate::db::{
find_pinned_block_bytes_at_block_height, open_ordhook_db_conn_rocks_db_loop, BlockBytesCursor,
TransferData,
};
use crate::db::{TransactionBytesCursor, TraversalResult};
@@ -19,28 +18,33 @@ pub fn compute_satoshi_number(
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
inscription_input_index: usize,
inscription_pointer: u64,
traversals_cache: &Arc<
DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>,
>,
ulimit: usize,
memory_available: usize,
_back_tracking: bool,
ctx: &Context,
) -> Result<(TraversalResult, Vec<(u32, [u8; 8])>), String> {
let mut inscription_offset_intra_output = 0;
let mut inscription_output_index: usize = 0;
let mut ordinal_offset = 0;
let mut ordinal_block_number = block_identifier.index as u32;
) -> Result<(TraversalResult, u64, Vec<(u32, [u8; 8], usize)>), String> {
let mut ordinal_offset = inscription_pointer;
let ordinal_block_number = block_identifier.index as u32;
let txid = transaction_identifier.get_8_hash_bytes();
let mut back_track = vec![];
let blocks_db = open_ordhook_db_conn_rocks_db_loop(false, &blocks_db_dir, &ctx);
let blocks_db =
open_ordhook_db_conn_rocks_db_loop(false, &blocks_db_dir, ulimit, memory_available, &ctx);
let (sats_ranges, inscription_offset_cross_outputs) = match traversals_cache
let (mut tx_cursor, mut ordinal_block_number) = match traversals_cache
.get(&(block_identifier.index as u32, txid.clone()))
{
Some(entry) => {
let tx = entry.value();
(
tx.get_sat_ranges(),
tx.get_cumulated_sats_in_until_input_index(inscription_input_index),
(
tx.inputs[inscription_input_index].txin.clone(),
tx.inputs[inscription_input_index].vout.into(),
),
tx.inputs[inscription_input_index].block_height,
)
}
None => loop {
@@ -53,12 +57,13 @@ pub fn compute_satoshi_number(
let cursor = BlockBytesCursor::new(&block_bytes.as_ref());
match cursor.find_and_serialize_transaction_with_txid(&txid) {
Some(tx) => {
let sats_ranges = tx.get_sat_ranges();
let inscription_offset_cross_outputs =
tx.get_cumulated_sats_in_until_input_index(inscription_input_index);
traversals_cache.insert((ordinal_block_number, txid.clone()), tx);
back_track.push((ordinal_block_number, txid.clone()));
break (sats_ranges, inscription_offset_cross_outputs);
break (
(
tx.inputs[inscription_input_index].txin.clone(),
tx.inputs[inscription_input_index].vout.into(),
),
tx.inputs[inscription_input_index].block_height,
);
}
None => return Err(format!("txid not in block #{ordinal_block_number}")),
}
@@ -67,23 +72,6 @@ pub fn compute_satoshi_number(
},
};
for (i, (min, max)) in sats_ranges.into_iter().enumerate() {
if inscription_offset_cross_outputs >= min && inscription_offset_cross_outputs < max {
inscription_output_index = i;
inscription_offset_intra_output = inscription_offset_cross_outputs - min;
}
}
ctx.try_log(|logger| {
debug!(
logger,
"Start ordinal number retrieval for Satpoint {}:{}:0 (block #{})",
transaction_identifier.hash,
inscription_input_index,
block_identifier.index
)
});
let mut tx_cursor: ([u8; 8], usize) = (txid, inscription_input_index);
let mut hops: u32 = 0;
loop {
@@ -140,13 +128,8 @@ pub fn compute_satoshi_number(
transfers: 0,
inscription_input_index,
transaction_identifier_inscription: transaction_identifier.clone(),
transfer_data: TransferData {
inscription_offset_intra_output,
transaction_identifier_location: transaction_identifier.clone(),
output_index: inscription_output_index,
tx_index: 0,
},
},
inscription_pointer,
back_track,
));
}
@@ -228,7 +211,8 @@ pub fn compute_satoshi_number(
}
} else {
// isolate the target transaction
let lazy_tx = match block_cursor.find_and_serialize_transaction_with_txid(&txid) {
let tx_bytes_cursor = match block_cursor.find_and_serialize_transaction_with_txid(&txid)
{
Some(entry) => entry,
None => {
ctx.try_log(|logger| {
@@ -244,7 +228,7 @@ pub fn compute_satoshi_number(
};
let mut sats_out = 0;
for (index, output_value) in lazy_tx.outputs.iter().enumerate() {
for (index, output_value) in tx_bytes_cursor.outputs.iter().enumerate() {
if index == tx_cursor.1 {
break;
}
@@ -253,12 +237,13 @@ pub fn compute_satoshi_number(
sats_out += ordinal_offset;
let mut sats_in = 0;
for input in lazy_tx.inputs.iter() {
for input in tx_bytes_cursor.inputs.iter() {
sats_in += input.txin_value;
if sats_out < sats_in {
back_track.push((ordinal_block_number, tx_cursor.0.clone()));
traversals_cache.insert((ordinal_block_number, tx_cursor.0), lazy_tx.clone());
back_track.push((ordinal_block_number, tx_cursor.0.clone(), tx_cursor.1));
traversals_cache
.insert((ordinal_block_number, tx_cursor.0), tx_bytes_cursor.clone());
ordinal_offset = sats_out - (sats_in - input.txin_value);
ordinal_block_number = input.block_height;
tx_cursor = (input.txin.clone(), input.vout as usize);
@@ -281,13 +266,8 @@ pub fn compute_satoshi_number(
transfers: 0,
inscription_input_index,
transaction_identifier_inscription: transaction_identifier.clone(),
transfer_data: TransferData {
inscription_offset_intra_output,
transaction_identifier_location: transaction_identifier.clone(),
output_index: inscription_output_index,
tx_index: 0,
},
},
inscription_pointer,
back_track,
));
}
@@ -295,7 +275,7 @@ pub fn compute_satoshi_number(
}
let height = Height(ordinal_block_number.into());
let ordinal_number = height.starting_sat().0 + ordinal_offset + inscription_offset_intra_output;
let ordinal_number = height.starting_sat().0 + ordinal_offset;
Ok((
TraversalResult {
@@ -304,13 +284,8 @@ pub fn compute_satoshi_number(
transfers: hops,
inscription_input_index,
transaction_identifier_inscription: transaction_identifier.clone(),
transfer_data: TransferData {
inscription_offset_intra_output,
transaction_identifier_location: transaction_identifier.clone(),
output_index: inscription_output_index,
tx_index: 0,
},
},
inscription_pointer,
back_track,
))
}

View File

@@ -0,0 +1,236 @@
use std::collections::HashSet;
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{Address, Network, ScriptBuf},
types::{
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, OrdinalInscriptionTransferData,
OrdinalInscriptionTransferDestination, OrdinalOperation, TransactionIdentifier,
},
utils::Context,
};
use crate::{
core::{compute_next_satpoint_data, SatPosition},
db::{
find_inscribed_ordinals_at_wached_outpoint, format_outpoint_to_watch,
insert_ordinal_transfer_in_locations_tx, parse_satpoint_to_watch, OrdinalLocation,
},
ord::height::Height,
};
use rusqlite::Transaction;
use super::inscription_sequencing::get_bitcoin_network;
pub fn augment_block_with_ordinals_transfer_data(
block: &mut BitcoinBlockData,
inscriptions_db_tx: &Transaction,
update_db_tx: bool,
ctx: &Context,
) -> bool {
let mut any_event = false;
let network = get_bitcoin_network(&block.metadata.network);
let coinbase_subsidy = Height(block.block_identifier.index).subsidy();
let coinbase_txid = &block.transactions[0].transaction_identifier.clone();
let mut cumulated_fees = 0;
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
let transfers = augment_transaction_with_ordinals_transfers_data(
tx,
tx_index,
&network,
&coinbase_txid,
coinbase_subsidy,
&mut cumulated_fees,
inscriptions_db_tx,
ctx,
);
any_event |= !transfers.is_empty();
if update_db_tx {
// Store transfers between each iteration
for transfer_data in transfers.into_iter() {
let (tx, output_index, offset) =
parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
let data = OrdinalLocation {
offset,
block_height: block.block_identifier.index,
tx_index: transfer_data.tx_index,
};
insert_ordinal_transfer_in_locations_tx(
transfer_data.ordinal_number,
&outpoint_to_watch,
data,
inscriptions_db_tx,
&ctx,
);
}
}
}
any_event
}
pub fn compute_satpoint_post_transfer(
tx: &BitcoinTransactionData,
tx_index: usize,
input_index: usize,
relative_pointer_value: u64,
network: &Network,
coinbase_txid: &TransactionIdentifier,
coinbase_subsidy: u64,
cumulated_fees: &mut u64,
ctx: &Context,
) -> (OrdinalInscriptionTransferDestination, String, Option<u64>) {
let inputs: Vec<u64> = tx
.metadata
.inputs
.iter()
.map(|o| o.previous_output.value)
.collect::<_>();
let outputs = tx.metadata.outputs.iter().map(|o| o.value).collect::<_>();
let post_transfer_data = compute_next_satpoint_data(
tx_index,
input_index,
&inputs,
&outputs,
relative_pointer_value,
Some(ctx),
);
let (outpoint_post_transfer, offset_post_transfer, destination, post_transfer_output_value) =
match post_transfer_data {
SatPosition::Output((output_index, offset)) => {
let outpoint = format_outpoint_to_watch(&tx.transaction_identifier, output_index);
let script_pub_key_hex = tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match ScriptBuf::from_hex(&script_pub_key_hex) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(address) => {
OrdinalInscriptionTransferDestination::Transferred(address.to_string())
}
Err(e) => {
ctx.try_log(|logger| {
info!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
)
});
OrdinalInscriptionTransferDestination::Burnt(script.to_string())
}
},
Err(e) => {
ctx.try_log(|logger| {
info!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
)
});
OrdinalInscriptionTransferDestination::Burnt(script_pub_key_hex.to_string())
}
};
(
outpoint,
offset,
updated_address,
Some(tx.metadata.outputs[output_index].value),
)
}
SatPosition::Fee(offset) => {
// Get Coinbase TX
let total_offset = coinbase_subsidy + *cumulated_fees + offset;
let outpoint = format_outpoint_to_watch(&coinbase_txid, 0);
(
outpoint,
total_offset,
OrdinalInscriptionTransferDestination::SpentInFees,
None,
)
}
};
let satpoint_post_transfer = format!("{}:{}", outpoint_post_transfer, offset_post_transfer);
(
destination,
satpoint_post_transfer,
post_transfer_output_value,
)
}
pub fn augment_transaction_with_ordinals_transfers_data(
tx: &mut BitcoinTransactionData,
tx_index: usize,
network: &Network,
coinbase_txid: &TransactionIdentifier,
coinbase_subsidy: u64,
cumulated_fees: &mut u64,
inscriptions_db_tx: &Transaction,
ctx: &Context,
) -> Vec<OrdinalInscriptionTransferData> {
let mut transfers = vec![];
// The transfers are inserted in storage after the inscriptions.
// We have a unicity constraing, and can only have 1 ordinals per satpoint.
let mut updated_sats = HashSet::new();
for op in tx.metadata.ordinal_operations.iter() {
if let OrdinalOperation::InscriptionRevealed(data) = op {
updated_sats.insert(data.ordinal_number);
}
}
for (input_index, input) in tx.metadata.inputs.iter().enumerate() {
let outpoint_pre_transfer = format_outpoint_to_watch(
&input.previous_output.txid,
input.previous_output.vout as usize,
);
let entries = find_inscribed_ordinals_at_wached_outpoint(
&outpoint_pre_transfer,
&inscriptions_db_tx,
ctx,
);
// For each satpoint inscribed retrieved, we need to compute the next
// outpoint to watch
for watched_satpoint in entries.into_iter() {
if updated_sats.contains(&watched_satpoint.ordinal_number) {
continue;
}
let satpoint_pre_transfer =
format!("{}:{}", outpoint_pre_transfer, watched_satpoint.offset);
let (destination, satpoint_post_transfer, post_transfer_output_value) =
compute_satpoint_post_transfer(
&&*tx,
tx_index,
input_index,
watched_satpoint.offset,
network,
coinbase_txid,
coinbase_subsidy,
cumulated_fees,
ctx,
);
let transfer_data = OrdinalInscriptionTransferData {
ordinal_number: watched_satpoint.ordinal_number,
destination,
tx_index,
satpoint_pre_transfer,
satpoint_post_transfer,
post_transfer_output_value,
};
transfers.push(transfer_data.clone());
// Attach transfer event
tx.metadata
.ordinal_operations
.push(OrdinalOperation::InscriptionTransferred(transfer_data));
}
}
*cumulated_fees += tx.metadata.fee;
transfers
}

View File

@@ -1,5 +1,5 @@
use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet, HashMap},
io::{Read, Write},
path::PathBuf,
thread::sleep,
@@ -63,7 +63,10 @@ pub fn initialize_ordhook_db(base_dir: &PathBuf, ctx: &Context) -> Connection {
block_height INTEGER NOT NULL,
ordinal_number INTEGER NOT NULL,
jubilee_inscription_number INTEGER NOT NULL,
classic_inscription_number INTEGER NOT NULL
classic_inscription_number INTEGER NOT NULL,
CONSTRAINT inscription_id_uniqueness UNIQUE (inscription_id),
CONSTRAINT jubilee_inscription_number_uniqueness UNIQUE (inscription_id),
CONSTRAINT classic_inscription_number_uniqueness UNIQUE (inscription_id)
)",
[],
) {
@@ -79,36 +82,37 @@ pub fn initialize_ordhook_db(base_dir: &PathBuf, ctx: &Context) -> Connection {
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_ordinal_number ON inscriptions(ordinal_number);",
[],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_jubilee_inscription_number ON inscriptions(jubilee_inscription_number);",
[],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_classic_inscription_number ON inscriptions(classic_inscription_number);",
[],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_block_height ON inscriptions(block_height);",
[],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string()));
}
}
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS locations (
inscription_id TEXT NOT NULL,
ordinal_number INTEGER NOT NULL,
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
outpoint_to_watch TEXT NOT NULL,
offset INTEGER NOT NULL
offset INTEGER NOT NULL,
CONSTRAINT ordinal_number_outpoint_to_watch_offset_uniqueness UNIQUE (ordinal_number, outpoint_to_watch)
)",
[],
) {
@@ -124,19 +128,19 @@ pub fn initialize_ordhook_db(base_dir: &PathBuf, ctx: &Context) -> Connection {
"CREATE INDEX IF NOT EXISTS locations_indexed_on_block_height ON locations(block_height);",
[],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS locations_indexed_on_outpoint_to_watch ON locations(outpoint_to_watch);",
[],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS locations_indexed_on_inscription_id ON locations(inscription_id);",
"CREATE INDEX IF NOT EXISTS locations_indexed_on_ordinal_number ON locations(ordinal_number);",
[],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string()));
}
}
@@ -161,7 +165,7 @@ pub fn initialize_ordhook_db(base_dir: &PathBuf, ctx: &Context) -> Connection {
"CREATE INDEX IF NOT EXISTS sequence_metadata_indexed_on_block_height ON sequence_metadata(block_height);",
[],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to create hord.sqlite: {}", e.to_string()));
}
}
@@ -248,7 +252,7 @@ fn get_default_ordhook_db_file_path_rocks_db(base_dir: &PathBuf) -> PathBuf {
destination_path
}
fn rocks_db_default_options() -> rocksdb::Options {
fn rocks_db_default_options(ulimit: usize, memory_available: usize) -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
// Per rocksdb's documentation:
// If cache_index_and_filter_blocks is false (which is default),
@@ -262,8 +266,12 @@ fn rocks_db_default_options() -> rocksdb::Options {
// opts.set_write_buffer_size(64 * 1024 * 1024);
// opts.set_blob_file_size(1 * 1024 * 1024 * 1024);
// opts.set_target_file_size_base(64 * 1024 * 1024);
opts.set_max_open_files(2048);
opts.set_max_open_files(ulimit as i32);
opts.create_if_missing(true);
// opts.set_allow_mmap_reads(true);
// set_arena_block_size
// opts.optimize_for_point_lookup(1 * 1024 * 1024 * 1024);
// opts.set_level_zero_stop_writes_trigger(64);
// opts.set_level_zero_slowdown_writes_trigger(20);
@@ -279,10 +287,12 @@ fn rocks_db_default_options() -> rocksdb::Options {
pub fn open_readonly_ordhook_db_conn_rocks_db(
base_dir: &PathBuf,
ulimit: usize,
memory_available: usize,
_ctx: &Context,
) -> Result<DB, String> {
let path = get_default_ordhook_db_file_path_rocks_db(&base_dir);
let mut opts = rocks_db_default_options();
let mut opts = rocks_db_default_options(ulimit, memory_available);
opts.set_disable_auto_compactions(true);
opts.set_max_background_jobs(0);
let db = DB::open_for_read_only(&opts, path, false)
@@ -293,14 +303,16 @@ pub fn open_readonly_ordhook_db_conn_rocks_db(
pub fn open_ordhook_db_conn_rocks_db_loop(
readwrite: bool,
base_dir: &PathBuf,
ulimit: usize,
memory_available: usize,
ctx: &Context,
) -> DB {
let mut retries = 0;
let blocks_db = loop {
let res = if readwrite {
open_readwrite_ordhook_db_conn_rocks_db(&base_dir, &ctx)
open_readwrite_ordhook_db_conn_rocks_db(&base_dir, ulimit, memory_available, &ctx)
} else {
open_readonly_ordhook_db_conn_rocks_db(&base_dir, &ctx)
open_readonly_ordhook_db_conn_rocks_db(&base_dir, ulimit, memory_available, &ctx)
};
match res {
Ok(db) => break db,
@@ -323,19 +335,24 @@ pub fn open_ordhook_db_conn_rocks_db_loop(
pub fn open_readwrite_ordhook_dbs(
base_dir: &PathBuf,
ulimit: usize,
memory_available: usize,
ctx: &Context,
) -> Result<(DB, Connection), String> {
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &base_dir, &ctx);
let blocks_db =
open_ordhook_db_conn_rocks_db_loop(true, &base_dir, ulimit, memory_available, &ctx);
let inscriptions_db = open_readwrite_ordhook_db_conn(&base_dir, &ctx)?;
Ok((blocks_db, inscriptions_db))
}
fn open_readwrite_ordhook_db_conn_rocks_db(
base_dir: &PathBuf,
ulimit: usize,
memory_available: usize,
_ctx: &Context,
) -> Result<DB, String> {
let path = get_default_ordhook_db_file_path_rocks_db(&base_dir);
let opts = rocks_db_default_options();
let opts = rocks_db_default_options(ulimit, memory_available);
let db = DB::open(&opts, path)
.map_err(|e| format!("unable to read-write hord.rocksdb: {}", e.to_string()))?;
Ok(db)
@@ -396,7 +413,6 @@ pub fn find_pinned_block_bytes_at_block_height<'a>(
// read_options.set_verify_checksums(false);
let mut backoff: f64 = 1.0;
let mut rng = thread_rng();
loop {
match blocks_db.get_pinned(block_height.to_be_bytes()) {
Ok(Some(res)) => return Some(res),
@@ -500,30 +516,19 @@ pub fn insert_entry_in_inscriptions(
"INSERT INTO inscriptions (inscription_id, ordinal_number, jubilee_inscription_number, classic_inscription_number, block_height, input_index) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
rusqlite::params![&inscription_data.inscription_id, &inscription_data.ordinal_number, &inscription_data.inscription_number.jubilee, &inscription_data.inscription_number.classic, &block_identifier.index, &inscription_data.inscription_input_index],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to insert inscription in hord.sqlite: {} - {:?}", e.to_string(), inscription_data));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
pub fn insert_inscription_in_locations(
inscription_data: &OrdinalInscriptionRevealData,
block_identifier: &BlockIdentifier,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
let (tx, output_index, offset) =
parse_satpoint_to_watch(&inscription_data.satpoint_post_inscription);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
while let Err(e) = inscriptions_db_conn_rw.execute(
"INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&inscription_data.inscription_id, &outpoint_to_watch, offset, &block_identifier.index, &inscription_data.tx_index],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct OrdinalLocation {
pub offset: u64,
pub block_height: u64,
pub tx_index: usize,
}
pub fn update_inscriptions_with_block(
pub fn insert_entries_from_block_in_inscriptions(
block: &BitcoinBlockData,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
@@ -535,24 +540,75 @@ pub fn update_inscriptions_with_block(
inscriptions_db_conn_rw,
&ctx,
);
insert_inscription_in_locations(
&inscription_data,
&block.block_identifier,
&inscriptions_db_conn_rw,
ctx,
);
}
}
pub fn update_locations_with_block(
pub fn update_ordinals_db_with_block(
block: &BitcoinBlockData,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
for transfer_data in get_inscriptions_transferred_in_block(&block).iter() {
insert_transfer_in_locations(
&transfer_data,
let mut locations_to_insert = HashMap::new();
for inscription_data in get_inscriptions_revealed_in_block(&block).iter() {
insert_entry_in_inscriptions(
inscription_data,
&block.block_identifier,
inscriptions_db_conn_rw,
&ctx,
);
let (tx, output_index, offset) =
parse_satpoint_to_watch(&inscription_data.satpoint_post_inscription);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
let insertion_res = locations_to_insert.insert(
(inscription_data.ordinal_number, outpoint_to_watch),
OrdinalLocation {
offset,
block_height: block.block_identifier.index,
tx_index: inscription_data.tx_index,
},
);
if let Some(prev_location) = insertion_res {
ctx.try_log(|logger| {
warn!(
logger,
"Ignoring location insertion from inscriptions: {}, {:?}",
inscription_data.ordinal_number,
prev_location
)
});
}
}
for transfer_data in get_inscriptions_transferred_in_block(&block).iter() {
let (tx, output_index, offset) =
parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
let insertion_res = locations_to_insert.insert(
(transfer_data.ordinal_number, outpoint_to_watch),
OrdinalLocation {
offset,
block_height: block.block_identifier.index,
tx_index: transfer_data.tx_index,
},
);
if let Some(prev_location) = insertion_res {
ctx.try_log(|logger| {
warn!(
logger,
"Ignoring location insertion from transfers: {}, {:?}",
transfer_data.ordinal_number,
prev_location
)
});
}
}
for ((ordinal_number, outpoint_to_watch), location_data) in locations_to_insert {
insert_ordinal_transfer_in_locations_tx(
ordinal_number,
&outpoint_to_watch,
location_data,
&inscriptions_db_conn_rw,
ctx,
);
@@ -598,52 +654,25 @@ pub fn update_sequence_metadata_with_block(
}
}
pub fn insert_new_inscriptions_from_block_in_locations(
block: &BitcoinBlockData,
pub fn insert_ordinal_transfer_in_locations_tx(
ordinal_number: u64,
outpoint_to_watch: &str,
data: OrdinalLocation,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
for inscription_data in get_inscriptions_revealed_in_block(&block).iter() {
insert_inscription_in_locations(
inscription_data,
&block.block_identifier,
inscriptions_db_conn_rw,
&ctx,
);
}
}
pub fn insert_transfer_in_locations_tx(
transfer_data: &OrdinalInscriptionTransferData,
block_identifier: &BlockIdentifier,
inscriptions_db_conn_rw: &Transaction,
ctx: &Context,
) {
let (tx, output_index, offset) = parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
let mut retry = 0;
while let Err(e) = inscriptions_db_conn_rw.execute(
"INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&transfer_data.inscription_id, &outpoint_to_watch, offset, &block_identifier.index, &transfer_data.tx_index],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
pub fn insert_transfer_in_locations(
transfer_data: &OrdinalInscriptionTransferData,
block_identifier: &BlockIdentifier,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
let (tx, output_index, offset) = parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
while let Err(e) = inscriptions_db_conn_rw.execute(
"INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&transfer_data.inscription_id, &outpoint_to_watch, offset, &block_identifier.index, &transfer_data.tx_index],
"INSERT INTO locations (ordinal_number, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&ordinal_number, &outpoint_to_watch, data.offset, data.block_height, &data.tx_index],
) {
retry += 1;
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
if retry > 2 {
ctx.try_log(|logger| error!(logger, "unable to insert inscription in location in hord.sqlite: {}", e.to_string()));
return
}
}
}
@@ -777,12 +806,12 @@ pub fn find_latest_inscription_block_height(
}
pub fn find_initial_inscription_transfer_data(
inscription_id: &str,
ordinal_number: &u64,
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<TransferData>, String> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let query = "SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE inscription_id = ? ORDER BY block_height ASC, tx_index ASC LIMIT 1";
let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()];
let query = "SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE ordinal_number = ? ORDER BY block_height ASC, tx_index ASC LIMIT 1";
let entry = perform_query_one(query, args, db_conn, ctx, |row| {
let outpoint_to_watch: String = row.get(0).unwrap();
let (transaction_identifier_location, output_index) =
@@ -800,12 +829,12 @@ pub fn find_initial_inscription_transfer_data(
}
pub fn find_latest_inscription_transfer_data(
inscription_id: &str,
ordinal_number: &u64,
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<TransferData>, String> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let query = "SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE inscription_id = ? ORDER BY block_height DESC, tx_index DESC LIMIT 1";
let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()];
let query = "SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE ordinal_number = ? ORDER BY block_height DESC, tx_index DESC LIMIT 1";
let entry = perform_query_one(query, args, db_conn, ctx, |row| {
let outpoint_to_watch: String = row.get(0).unwrap();
let (transaction_identifier_location, output_index) =
@@ -844,11 +873,11 @@ pub fn find_all_transfers_in_block(
block_height: &u64,
db_conn: &Connection,
ctx: &Context,
) -> BTreeMap<String, Vec<TransferData>> {
) -> BTreeMap<u64, Vec<TransferData>> {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = loop {
match db_conn.prepare("SELECT inscription_id, offset, outpoint_to_watch, tx_index FROM locations WHERE block_height = ? ORDER BY tx_index ASC")
match db_conn.prepare("SELECT ordinal_number, offset, outpoint_to_watch, tx_index FROM locations WHERE block_height = ? ORDER BY tx_index ASC")
{
Ok(stmt) => break stmt,
Err(e) => {
@@ -860,7 +889,7 @@ pub fn find_all_transfers_in_block(
}
};
let mut results: BTreeMap<String, Vec<TransferData>> = BTreeMap::new();
let mut results: BTreeMap<u64, Vec<TransferData>> = BTreeMap::new();
let mut rows = loop {
match stmt.query(args) {
Ok(rows) => break rows,
@@ -875,7 +904,7 @@ pub fn find_all_transfers_in_block(
loop {
match rows.next() {
Ok(Some(row)) => {
let inscription_id: String = row.get(0).unwrap();
let ordinal_number: u64 = row.get(0).unwrap();
let inscription_offset_intra_output: u64 = row.get(1).unwrap();
let outpoint_to_watch: String = row.get(2).unwrap();
let tx_index: u64 = row.get(3).unwrap();
@@ -888,7 +917,7 @@ pub fn find_all_transfers_in_block(
tx_index,
};
results
.entry(inscription_id)
.entry(ordinal_number)
.and_modify(|v| v.push(transfer.clone()))
.or_insert(vec![transfer]);
}
@@ -1046,10 +1075,6 @@ pub fn find_inscription_with_id(
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<(TraversalResult, u64)>, String> {
let Some(transfer_data) = find_initial_inscription_transfer_data(inscription_id, db_conn, ctx)?
else {
return Err(format!("unable to retrieve location for {inscription_id}"));
};
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let query = "SELECT classic_inscription_number, jubilee_inscription_number, ordinal_number, block_height, input_index FROM inscriptions WHERE inscription_id = ?";
let entry = perform_query_one(query, args, db_conn, ctx, move |row| {
@@ -1069,27 +1094,30 @@ pub fn find_inscription_with_id(
block_height,
)
});
Ok(entry.map(
|(
let Some((
inscription_number,
ordinal_number,
inscription_input_index,
transaction_identifier_inscription,
block_height,
)) = entry
else {
return Err(format!(
"unable to retrieve inscription for {inscription_id}"
));
};
Ok(Some((
TraversalResult {
inscription_number,
ordinal_number,
inscription_input_index,
transaction_identifier_inscription,
block_height,
)| {
(
TraversalResult {
inscription_number,
ordinal_number,
inscription_input_index,
transaction_identifier_inscription,
transfers: 0,
transfer_data,
},
block_height,
)
transfers: 0,
},
))
block_height,
)))
}
pub fn find_all_inscriptions_in_block(
@@ -1097,8 +1125,6 @@ pub fn find_all_inscriptions_in_block(
inscriptions_db_tx: &Connection,
ctx: &Context,
) -> BTreeMap<String, TraversalResult> {
let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_tx, ctx);
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = loop {
@@ -1138,26 +1164,12 @@ pub fn find_all_inscriptions_in_block(
let inscription_input_index: usize = row.get(4).unwrap();
let (transaction_identifier_inscription, _) =
{ parse_inscription_id(&inscription_id) };
let Some(transfer_data) = transfers_data
.get(&inscription_id)
.and_then(|entries| entries.first())
else {
ctx.try_log(|logger| {
error!(
logger,
"unable to retrieve inscription genesis transfer data: {}",
inscription_id,
)
});
continue;
};
let traversal = TraversalResult {
inscription_number,
ordinal_number,
inscription_input_index,
transfers: 0,
transaction_identifier_inscription: transaction_identifier_inscription.clone(),
transfer_data: transfer_data.clone(),
};
results.insert(inscription_id, traversal);
}
@@ -1173,45 +1185,24 @@ pub fn find_all_inscriptions_in_block(
return results;
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Ord, PartialOrd, PartialEq, Eq)]
pub struct WatchedSatpoint {
pub inscription_id: String,
pub ordinal_number: u64,
pub offset: u64,
}
pub fn find_watched_satpoint_for_inscription(
inscription_id: &str,
db_conn: &Connection,
ctx: &Context,
) -> Option<(u64, WatchedSatpoint)> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let query = "SELECT inscription_id, offset, block_height FROM locations WHERE inscription_id = ? ORDER BY offset ASC";
perform_query_one(query, args, db_conn, ctx, |row| {
let inscription_id: String = row.get(0).unwrap();
let offset: u64 = row.get(1).unwrap();
let block_height: u64 = row.get(2).unwrap();
(
block_height,
WatchedSatpoint {
inscription_id,
offset,
},
)
})
}
pub fn find_inscriptions_at_wached_outpoint(
pub fn find_inscribed_ordinals_at_wached_outpoint(
outpoint: &str,
db_conn: &Connection,
ctx: &Context,
) -> Vec<WatchedSatpoint> {
let args: &[&dyn ToSql] = &[&outpoint.to_sql().unwrap()];
let query = "SELECT inscription_id, offset FROM locations WHERE outpoint_to_watch = ? ORDER BY offset ASC";
let query = "SELECT ordinal_number, offset FROM locations WHERE outpoint_to_watch = ? ORDER BY offset ASC";
perform_query_set(query, args, db_conn, ctx, |row| {
let inscription_id: String = row.get(0).unwrap();
let ordinal_number: u64 = row.get(0).unwrap();
let offset: u64 = row.get(1).unwrap();
WatchedSatpoint {
inscription_id,
ordinal_number,
offset,
}
})
@@ -1281,26 +1272,6 @@ pub fn remove_entries_from_locations_at_block_height(
}
}
pub fn insert_entry_in_locations(
inscription_id: &str,
block_height: u64,
transfer_data: &TransferData,
inscriptions_db_rw_conn: &Transaction,
ctx: &Context,
) {
let outpoint_to_watch = format_outpoint_to_watch(
&transfer_data.transaction_identifier_location,
transfer_data.output_index,
);
while let Err(e) = inscriptions_db_rw_conn.execute(
"INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&inscription_id, &outpoint_to_watch, &transfer_data.inscription_offset_intra_output, &block_height, &transfer_data.tx_index],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
pub fn delete_data_in_ordhook_db(
start_block: u64,
end_block: u64,
@@ -1337,7 +1308,6 @@ pub struct TraversalResult {
pub transaction_identifier_inscription: TransactionIdentifier,
pub ordinal_number: u64,
pub transfers: u32,
pub transfer_data: TransferData,
}
impl TraversalResult {

View File

@@ -25,6 +25,7 @@ pub const PARENT_TAG: [u8; 1] = [3];
pub const METADATA_TAG: [u8; 1] = [5];
pub const METAPROTOCOL_TAG: [u8; 1] = [7];
pub const CONTENT_ENCODING_TAG: [u8; 1] = [9];
pub const DELEGATE_TAG: [u8; 1] = [11];
type Result<T> = std::result::Result<T, script::Error>;
pub type RawEnvelope = Envelope<Vec<Vec<u8>>>;
@@ -91,6 +92,7 @@ impl From<RawEnvelope> for ParsedEnvelope {
let content_encoding = remove_field(&mut fields, &CONTENT_ENCODING_TAG);
let content_type = remove_field(&mut fields, &CONTENT_TYPE_TAG);
let delegate = remove_field(&mut fields, &DELEGATE_TAG);
let metadata = remove_and_concatenate_field(&mut fields, &METADATA_TAG);
let metaprotocol = remove_field(&mut fields, &METAPROTOCOL_TAG);
let parent = remove_field(&mut fields, &PARENT_TAG);
@@ -109,13 +111,14 @@ impl From<RawEnvelope> for ParsedEnvelope {
.cloned()
.collect()
}),
metaprotocol,
parent,
delegate,
content_encoding,
content_type,
duplicate_field,
incomplete_field,
metadata,
metaprotocol,
parent,
pointer,
unrecognized_even_field,
},
@@ -475,25 +478,6 @@ mod tests {
);
}
#[test]
fn with_unknown_tag() {
assert_eq!(
parse(&[envelope(&[
b"ord",
&[1],
b"text/plain;charset=utf-8",
&[11],
b"bar",
&[],
b"ord",
])]),
vec![ParsedEnvelope {
payload: inscription("text/plain;charset=utf-8", "ord"),
..Default::default()
}]
);
}
#[test]
fn no_body() {
assert_eq!(
@@ -809,17 +793,6 @@ mod tests {
);
}
#[test]
fn unknown_odd_fields_are_ignored() {
assert_eq!(
parse(&[envelope(&[b"ord", &[11], &[0]])]),
vec![ParsedEnvelope {
payload: Inscription::default(),
..Default::default()
}],
);
}
#[test]
fn unknown_even_fields() {
assert_eq!(

View File

@@ -1,3 +1,5 @@
use std::io::Cursor;
use chainhook_sdk::bitcoin::{hashes::Hash, Txid};
use super::{inscription_id::InscriptionId, media::Media};
@@ -26,6 +28,7 @@ pub struct Inscription {
pub parent: Option<Vec<u8>>,
pub pointer: Option<Vec<u8>>,
pub unrecognized_even_field: bool,
pub delegate: Option<Vec<u8>>,
}
impl Inscription {
@@ -159,6 +162,49 @@ impl Inscription {
str::from_utf8(self.metaprotocol.as_ref()?).ok()
}
fn inscription_id_field(field: &Option<Vec<u8>>) -> Option<InscriptionId> {
let value = field.as_ref()?;
if value.len() < Txid::LEN {
return None;
}
if value.len() > Txid::LEN + 4 {
return None;
}
let (txid, index) = value.split_at(Txid::LEN);
if let Some(last) = index.last() {
// Accept fixed length encoding with 4 bytes (with potential trailing zeroes)
// or variable length (no trailing zeroes)
if index.len() != 4 && *last == 0 {
return None;
}
}
let txid = Txid::from_slice(txid).unwrap();
let index = [
index.first().copied().unwrap_or(0),
index.get(1).copied().unwrap_or(0),
index.get(2).copied().unwrap_or(0),
index.get(3).copied().unwrap_or(0),
];
let index = u32::from_le_bytes(index);
Some(InscriptionId { txid, index })
}
pub(crate) fn delegate(&self) -> Option<InscriptionId> {
Self::inscription_id_field(&self.delegate)
}
pub(crate) fn metadata(&self) -> Option<ciborium::Value> {
ciborium::from_reader(Cursor::new(self.metadata.as_ref()?)).ok()
}
pub(crate) fn parent(&self) -> Option<InscriptionId> {
use chainhook_sdk::bitcoin::hash_types::Txid as TXID_LEN;
let value = self.parent.as_ref()?;

View File

@@ -178,16 +178,25 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
)
}
if block_heights_to_scan.is_empty() && floating_end_block {
match bitcoin_rpc.get_blockchain_info() {
Ok(result) => {
for entry in (current_block_height + 1)..=result.blocks {
block_heights_to_scan.push_back(entry);
let new_tip = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => match predicate_spec.end_block {
Some(end_block) => {
if end_block > result.blocks {
result.blocks
} else {
end_block
}
}
}
None => result.blocks,
},
Err(_e) => {
continue;
}
};
for entry in (current_block_height + 1)..new_tip {
block_heights_to_scan.push_back(entry);
}
}
}
info!(

View File

@@ -16,8 +16,8 @@ use crate::core::protocol::inscription_sequencing::SequenceCursor;
use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db, should_sync_rocks_db};
use crate::db::{
delete_data_in_ordhook_db, insert_entry_in_blocks, open_ordhook_db_conn_rocks_db_loop,
open_readwrite_ordhook_db_conn, open_readwrite_ordhook_dbs, update_inscriptions_with_block,
update_locations_with_block, BlockBytesCursor, TransactionBytesCursor,
open_readwrite_ordhook_db_conn, open_readwrite_ordhook_dbs, update_ordinals_db_with_block,
BlockBytesCursor, TransactionBytesCursor,
};
use crate::db::{
find_last_block_inserted, find_missing_blocks, run_compaction,
@@ -33,7 +33,8 @@ use crate::service::observers::{
use crate::service::runloops::start_bitcoin_scan_runloop;
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookOccurrencePayload;
use chainhook_sdk::chainhooks::types::{
BitcoinChainhookSpecification, ChainhookFullSpecification, ChainhookSpecification,
BitcoinChainhookSpecification, ChainhookConfig, ChainhookFullSpecification,
ChainhookSpecification,
};
use chainhook_sdk::observer::{
start_event_observer, BitcoinBlockDataCached, DataHandlerEvent, EventObserverConfig,
@@ -63,17 +64,33 @@ impl Service {
pub async fn run(
&mut self,
predicates: Vec<BitcoinChainhookSpecification>,
observer_specs: Vec<BitcoinChainhookSpecification>,
predicate_activity_relayer: Option<
crossbeam_channel::Sender<BitcoinChainhookOccurrencePayload>,
>,
check_blocks_integrity: bool,
stream_indexing_to_observers: bool,
) -> Result<(), String> {
let mut event_observer_config = self.config.get_event_observer_config();
let block_post_processor = if stream_indexing_to_observers && !observer_specs.is_empty() {
let mut chainhook_config: ChainhookConfig = ChainhookConfig::new();
let specs = observer_specs.clone();
for mut observer_spec in specs.into_iter() {
observer_spec.enabled = true;
let spec = ChainhookSpecification::Bitcoin(observer_spec);
chainhook_config.register_specification(spec)?;
}
event_observer_config.chainhook_config = Some(chainhook_config);
let block_tx = start_observer_forwarding(&event_observer_config, &self.ctx);
Some(block_tx)
} else {
None
};
// Catch-up with chain tip
let chain_tip_height = self
.catch_up_with_chain_tip(false, check_blocks_integrity)
.catch_up_with_chain_tip(false, check_blocks_integrity, block_post_processor)
.await?;
info!(
self.ctx.expect_logger(),
@@ -98,7 +115,7 @@ impl Service {
// 2) catch-up outdated observers by dispatching replays
let (chainhook_config, outdated_observers) =
create_and_consolidate_chainhook_config_with_predicates(
predicates,
observer_specs,
chain_tip_height,
predicate_activity_relayer.is_some(),
&self.config,
@@ -413,9 +430,7 @@ impl Service {
bitcoin_blocks_mutator: Some((block_mutator_in_tx, block_mutator_out_rx)),
bitcoin_chain_event_notifier: Some(chain_event_notifier_tx),
};
let cache_l2 = Arc::new(new_traversals_lazy_cache(
self.config.limits.max_caching_memory_size_mb,
));
let cache_l2 = Arc::new(new_traversals_lazy_cache(100_000));
let ctx = self.ctx.clone();
let config = self.config.clone();
@@ -448,6 +463,7 @@ impl Service {
&mut self,
rebuild_from_scratch: bool,
compact_and_check_rocksdb_integrity: bool,
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
) -> Result<u64, String> {
{
if compact_and_check_rocksdb_integrity {
@@ -455,6 +471,8 @@ impl Service {
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
false,
&self.config.expected_cache_path(),
self.config.resources.ulimit,
self.config.resources.memory_available,
&self.ctx,
);
let tip = find_last_block_inserted(&blocks_db);
@@ -486,6 +504,8 @@ impl Service {
let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop(
false,
&self.config.expected_cache_path(),
self.config.resources.ulimit,
self.config.resources.memory_available,
&self.ctx,
);
info!(self.ctx.expect_logger(), "Running database compaction",);
@@ -496,6 +516,8 @@ impl Service {
let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop(
false,
&self.config.expected_cache_path(),
self.config.resources.ulimit,
self.config.resources.memory_available,
&self.ctx,
);
@@ -511,7 +533,7 @@ impl Service {
)?;
}
}
self.update_state(None).await
self.update_state(block_post_processor).await
}
pub async fn update_state(
@@ -616,14 +638,18 @@ impl Service {
}
fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ctx: &Context) {
let (blocks_db_rw, inscriptions_db_conn_rw) =
match open_readwrite_ordhook_dbs(&config.expected_cache_path(), &ctx) {
Ok(dbs) => dbs,
Err(e) => {
ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",));
return;
}
};
let (blocks_db_rw, inscriptions_db_conn_rw) = match open_readwrite_ordhook_dbs(
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
) {
Ok(dbs) => dbs,
Err(e) => {
ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",));
return;
}
};
match command {
HandleBlock::UndoBlock(block) => {
@@ -672,9 +698,7 @@ fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ct
);
let _ = blocks_db_rw.flush();
update_inscriptions_with_block(&block, &inscriptions_db_conn_rw, &ctx);
update_locations_with_block(&block, &inscriptions_db_conn_rw, &ctx);
update_ordinals_db_with_block(&block, &inscriptions_db_conn_rw, ctx);
update_sequence_metadata_with_block(&block, &inscriptions_db_conn_rw, &ctx);
}
@@ -725,14 +749,18 @@ pub fn chainhook_sidecar_mutate_blocks(
) {
let mut updated_blocks_ids = vec![];
let (blocks_db_rw, mut inscriptions_db_conn_rw) =
match open_readwrite_ordhook_dbs(&config.expected_cache_path(), &ctx) {
Ok(dbs) => dbs,
Err(e) => {
ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",));
return;
}
};
let (blocks_db_rw, mut inscriptions_db_conn_rw) = match open_readwrite_ordhook_dbs(
&config.expected_cache_path(),
config.resources.ulimit,
config.resources.memory_available,
&ctx,
) {
Ok(dbs) => dbs,
Err(e) => {
ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",));
return;
}
};
let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap();
@@ -781,8 +809,7 @@ pub fn chainhook_sidecar_mutate_blocks(
let _ = blocks_db_rw.flush();
if cache.processed_by_sidecar {
update_inscriptions_with_block(&cache.block, &inscriptions_db_tx, &ctx);
update_locations_with_block(&cache.block, &inscriptions_db_tx, &ctx);
update_ordinals_db_with_block(&cache.block, &inscriptions_db_tx, &ctx);
update_sequence_metadata_with_block(&cache.block, &inscriptions_db_tx, &ctx);
} else {
updated_blocks_ids.push(format!("{}", cache.block.block_identifier.index));

View File

@@ -35,7 +35,13 @@ pub fn update_observer_progress(
"UPDATE observers SET last_block_height_update = ? WHERE uuid = ?",
rusqlite::params![last_block_height_update, uuid],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| {
warn!(
logger,
"unable to query observers.sqlite: {}",
e.to_string()
)
});
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
@@ -50,7 +56,13 @@ pub fn update_observer_streaming_enabled(
"UPDATE observers SET streaming_enabled = ? WHERE uuid = ?",
rusqlite::params![streaming_enabled, uuid],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| {
warn!(
logger,
"unable to query observers.sqlite: {}",
e.to_string()
)
});
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
@@ -66,7 +78,7 @@ pub fn insert_entry_in_observers(
"INSERT INTO observers (uuid, spec, streaming_enabled, last_block_height_update) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![&spec.uuid(), json!(spec).to_string(), report.streaming_enabled, report.last_block_height_update],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query observers.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
@@ -177,7 +189,13 @@ pub fn remove_entry_from_observers(uuid: &str, db_conn: &Connection, ctx: &Conte
"DELETE FROM observers WHERE uuid = ?1",
rusqlite::params![&uuid],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
ctx.try_log(|logger| {
warn!(
logger,
"unable to query observers.sqlite: {}",
e.to_string()
)
});
std::thread::sleep(std::time::Duration::from_secs(1));
}
}

View File

@@ -21,7 +21,7 @@ pub fn start_bitcoin_scan_runloop(
observer_command_tx: Sender<ObserverCommand>,
ctx: &Context,
) {
let bitcoin_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_bitcoin_scans);
let bitcoin_scan_pool = ThreadPool::new(config.resources.expected_observers_count);
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();

View File

@@ -159,7 +159,7 @@ impl OrdinalsIndexingRunloop {
match cmd {
IndexerCommand::StreamBlocks => {
// We start the service as soon as the start() method is being called.
let future = service.catch_up_with_chain_tip(false, true);
let future = service.catch_up_with_chain_tip(false, true, None);
let _ = hiro_system_kit::nestable_block_on(future).expect("unable to start indexer");
let future = service.start_event_observer(observer_sidecar);
let (command_tx, event_rx) =

View File

@@ -1,8 +1,12 @@
FROM rust:bullseye as build
ARG GIT_COMMIT='0000000'
ENV GIT_COMMIT=${GIT_COMMIT}
WORKDIR /src
RUN apt-get update && apt-get install -y ca-certificates pkg-config libssl-dev libclang-11-dev curl gnupg
RUN apt-get update && apt-get install -y ca-certificates pkg-config libssl-dev libclang-11-dev libunwind-dev libunwind8 curl gnupg
RUN rustup update 1.72.0 && rustup default 1.72.0
@@ -50,7 +54,7 @@ FROM debian:bullseye-slim
WORKDIR /ordhook-sdk-js
RUN apt-get update && apt-get install -y ca-certificates libssl-dev
RUN apt-get update && apt-get install -y ca-certificates libssl-dev libclang-11-dev libunwind-dev libunwind8 sqlite3
# COPY --from=build /out/*.node /ordhook-sdk-js/