feat: ordinal inscription_transfer code complete

This commit is contained in:
Ludo Galabru
2023-03-21 17:24:29 -04:00
parent 5a001358e8
commit f55a5ee167
34 changed files with 1204 additions and 865 deletions

8
Cargo.lock generated
View File

@@ -425,7 +425,6 @@ dependencies = [
"atty",
"chainhook-event-observer",
"chainhook-types 1.0.3",
"ciborium",
"clap 3.2.23",
"clap_generate",
"clarinet-files",
@@ -443,13 +442,11 @@ dependencies = [
"rand 0.8.5",
"redis",
"reqwest",
"rusqlite",
"serde",
"serde-redis",
"serde_derive",
"serde_json",
"tar",
"threadpool",
"tokio",
"toml",
]
@@ -465,6 +462,7 @@ dependencies = [
"bitcoincore-rpc-json",
"chainhook-types 1.0.3",
"chrono",
"ciborium",
"clap 3.2.23",
"clap_generate",
"clarinet-utils",
@@ -475,16 +473,19 @@ dependencies = [
"hex",
"hiro-system-kit",
"hyper",
"rand 0.8.5",
"redb",
"reqwest",
"rocket",
"rocket_okapi",
"rusqlite",
"schemars",
"serde",
"serde-hex",
"serde_derive",
"serde_json",
"stacks-rpc-client",
"threadpool",
"tokio",
"toml",
]
@@ -506,6 +507,7 @@ dependencies = [
name = "chainhook-types"
version = "1.0.3"
dependencies = [
"hex",
"schemars",
"serde",
"serde_derive",

View File

@@ -14,7 +14,6 @@ serde_derive = "1"
redis = "0.21.5"
serde-redis = "0.12.0"
hex = "0.4.3"
ciborium = "0.2.0"
rand = "0.8.5"
# tikv-client = { git = "https://github.com/tikv/client-rust.git", rev = "8f54e6114227718e256027df2577bbacdf425f86" }
# raft-proto = { git = "https://github.com/tikv/raft-rs", rev="f73766712a538c2f6eb135b455297ad6c03fc58d", version = "0.7.0"}
@@ -37,8 +36,6 @@ flume = "0.10.14"
ansi_term = "0.12.1"
atty = "0.2.14"
crossbeam-channel = "0.5.6"
rusqlite = { version = "0.27.0", features = ["bundled"] }
threadpool = "1.8.1"
[dev-dependencies]
criterion = "0.3"

View File

@@ -1,13 +1,18 @@
use crate::block::DigestingCommand;
use crate::config::Config;
use crate::node::Node;
use crate::scan::bitcoin::{
build_bitcoin_traversal_local_storage, scan_bitcoin_chain_with_predicate,
};
use crate::scan::bitcoin::scan_bitcoin_chain_with_predicate;
use crate::scan::stacks::scan_stacks_chain_with_predicate;
use chainhook_event_observer::chainhooks::types::ChainhookFullSpecification;
use chainhook_event_observer::indexer::ordinals::db::{
build_bitcoin_traversal_local_storage, open_readonly_ordinals_db_conn,
retrieve_satoshi_point_using_local_storage,
};
use chainhook_event_observer::indexer::ordinals::ord::height::Height;
use chainhook_event_observer::observer::BitcoinConfig;
use chainhook_event_observer::utils::Context;
use chainhook_types::{BlockIdentifier, TransactionIdentifier};
use clap::{Parser, Subcommand};
use ctrlc;
use hiro_system_kit;
@@ -142,6 +147,8 @@ struct BuildOrdinalsTraversalsCommand {
pub start_block: u64,
/// Starting block
pub end_block: u64,
/// # of Networking thread
pub network_threads: usize,
/// Target Devnet network
#[clap(
long = "devnet",
@@ -175,6 +182,8 @@ struct BuildOrdinalsTraversalsCommand {
#[derive(Parser, PartialEq, Clone, Debug)]
struct GetSatoshiCommand {
/// Block height
pub block_height: u64,
/// Txid
pub txid: String,
/// Output index
@@ -276,19 +285,46 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
},
Command::Protocols(ProtocolsCommand::Ordinals(subcmd)) => match subcmd {
OrdinalsCommand::Satoshi(cmd) => {
let _config =
let config =
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
let transaction_identifier = TransactionIdentifier {
hash: cmd.txid.clone(),
};
let block_identifier = BlockIdentifier {
index: cmd.block_height,
hash: "".into(),
};
let storage_conn =
open_readonly_ordinals_db_conn(&config.expected_cache_path()).unwrap();
let (block_height, offset) = retrieve_satoshi_point_using_local_storage(
&storage_conn,
&block_identifier,
&transaction_identifier,
&ctx,
)?;
let satoshi_id = Height(block_height).starting_sat().0 + offset;
info!(
ctx.expect_logger(),
"Block: {block_height}, Offset {offset}:, Satoshi ID: {satoshi_id}",
);
}
OrdinalsCommand::Traversals(cmd) => {
let config =
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
build_bitcoin_traversal_local_storage(
config,
let bitcoin_config = BitcoinConfig {
username: config.network.bitcoin_node_rpc_username.clone(),
password: config.network.bitcoin_node_rpc_password.clone(),
rpc_url: config.network.bitcoin_node_rpc_url.clone(),
};
let _ = build_bitcoin_traversal_local_storage(
&bitcoin_config,
&config.expected_cache_path(),
cmd.start_block,
cmd.end_block,
&ctx,
6,
cmd.network_threads,
)
.await;
}

View File

@@ -10,6 +10,8 @@ use std::path::PathBuf;
const DEFAULT_MAINNET_TSV_ARCHIVE: &str = "https://storage.googleapis.com/hirosystems-archive/mainnet/api/mainnet-blockchain-api-latest.tar.gz";
const DEFAULT_TESTNET_TSV_ARCHIVE: &str = "https://storage.googleapis.com/hirosystems-archive/testnet/api/testnet-blockchain-api-latest.tar.gz";
// const DEFAULT_MAINNET_TSV_ARCHIVE: &str = "https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest.gz";
// const DEFAULT_TESTNET_TSV_ARCHIVE: &str = "https://archive.hiro.so/testnet/stacks-blockchain-api/testnet-stacks-blockchain-api-latest.gz";
#[derive(Clone, Debug)]
pub struct Config {
@@ -181,13 +183,6 @@ impl Config {
destination_path
}
pub fn get_bitcoin_block_traversal_db_path(&self) -> PathBuf {
let mut destination_path = PathBuf::new();
destination_path.push(&self.storage.cache_path);
destination_path.push("bitcoin_block_traversal.sqlite");
destination_path
}
pub fn expected_stacks_node_event_source(&self) -> &String {
for source in self.event_sources.iter() {
if let EventSourceConfig::StacksNode(config) = source {

View File

@@ -1,23 +1,16 @@
use crate::config::Config;
use crate::node::ordinals::inscription_id::InscriptionId;
use chainhook_event_observer::bitcoincore_rpc::bitcoin::BlockHash;
use chainhook_event_observer::bitcoincore_rpc::jsonrpc;
use chainhook_event_observer::chainhooks::bitcoin::{
handle_bitcoin_hook_action, BitcoinChainhookOccurrence, BitcoinTriggerChainhook,
};
use chainhook_event_observer::chainhooks::types::{
BitcoinPredicateType, ChainhookConfig, OrdinalOperations, Protocols,
BitcoinPredicateType, ChainhookConfig, ChainhookFullSpecification, OrdinalOperations, Protocols,
};
use chainhook_event_observer::indexer::ordinals::indexing::entry::Entry;
use chainhook_event_observer::indexer::ordinals::indexing::{
HEIGHT_TO_BLOCK_HASH, INSCRIPTION_NUMBER_TO_INSCRIPTION_ID,
};
use chainhook_event_observer::indexer::ordinals::{self, initialize_ordinal_index};
use chainhook_event_observer::indexer::ordinals::{self, ord::initialize_ordinal_index};
use chainhook_event_observer::indexer::{self, BitcoinChainContext};
use chainhook_event_observer::observer::{
start_event_observer, EventObserverConfig, ObserverEvent,
start_event_observer, ApiKey, EventObserverConfig, ObserverEvent,
};
use chainhook_event_observer::redb::ReadableTable;
use chainhook_event_observer::utils::{file_append, send_request, Context};
use chainhook_event_observer::{
chainhooks::stacks::{
@@ -71,7 +64,7 @@ impl Node {
for key in chainhooks_to_load.iter() {
let chainhook = match redis_con.hget::<_, _, String>(key, "specification") {
Ok(spec) => {
ChainhookSpecification::deserialize_specification(&spec, key).unwrap()
ChainhookFullSpecification::deserialize_specification(&spec, key).unwrap()
// todo
}
Err(e) => {
@@ -84,8 +77,30 @@ impl Node {
continue;
}
};
// TODO
// chainhook_config.register_hook(chainhook);
match chainhook_config.register_hook(
(
&self.config.network.bitcoin_network,
&self.config.network.stacks_network,
),
chainhook,
&ApiKey(None),
) {
Ok(spec) => {
info!(
self.ctx.expect_logger(),
"Predicate {} retrieved from storage and loaded",
spec.uuid(),
);
}
Err(e) => {
error!(
self.ctx.expect_logger(),
"Failed loading predicate from storage: {}",
e.to_string()
);
}
}
}
}
@@ -128,7 +143,6 @@ impl Node {
panic!()
}
};
let mut bitcoin_context = BitcoinChainContext::new(Some(ordinal_index));
let context_cloned = self.ctx.clone();
let _ = std::thread::spawn(move || {
@@ -311,50 +325,51 @@ impl Node {
}
ChainhookSpecification::Bitcoin(predicate_spec) => {
let mut inscriptions_hints = BTreeMap::new();
let mut use_hinting = false;
let use_hinting = false;
if let BitcoinPredicateType::Protocol(Protocols::Ordinal(
OrdinalOperations::InscriptionRevealed,
)) = &predicate_spec.predicate
{
if let Some(ref ordinal_index) = bitcoin_context.ordinal_index {
for (inscription_number, inscription_id) in ordinal_index
.database
.begin_read()
.unwrap()
.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)
.unwrap()
.iter()
.unwrap()
{
let inscription =
InscriptionId::load(*inscription_id.value());
println!(
"{} -> {}",
inscription_number.value(),
inscription
);
inscriptions_hints.insert(1, 1);
// if let Some(ref ordinal_index) = bitcoin_context.ordinal_index {
// for (inscription_number, inscription_id) in ordinal_index
// .database
// .begin_read()
// .unwrap()
// .open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)
// .unwrap()
// .iter()
// .unwrap()
// {
// let inscription =
// InscriptionId::load(*inscription_id.value());
// println!(
// "{} -> {}",
// inscription_number.value(),
// inscription
// );
let entry = ordinal_index
.get_inscription_entry(inscription)
.unwrap()
.unwrap();
println!("{:?}", entry);
// let entry = ordinal_index
// .get_inscription_entry(inscription)
// .unwrap()
// .unwrap();
// println!("{:?}", entry);
let blockhash = ordinal_index
.database
.begin_read()
.unwrap()
.open_table(HEIGHT_TO_BLOCK_HASH)
.unwrap()
.get(&entry.height)
.unwrap()
.map(|k| BlockHash::load(*k.value()))
.unwrap();
// let blockhash = ordinal_index
// .database
// .begin_read()
// .unwrap()
// .open_table(HEIGHT_TO_BLOCK_HASH)
// .unwrap()
// .get(&entry.height)
// .unwrap()
// .map(|k| BlockHash::load(*k.value()))
// .unwrap();
inscriptions_hints.insert(entry.height, blockhash);
use_hinting = true;
}
}
// inscriptions_hints.insert(entry.height, blockhash);
// use_hinting = true;
// }
// }
}
let start_block = match predicate_spec.start_block {
@@ -469,7 +484,6 @@ impl Node {
let block = indexer::bitcoin::standardize_bitcoin_block(
&self.config.network,
raw_block,
&mut bitcoin_context,
&self.ctx,
)?;

View File

@@ -1,10 +1,6 @@
use crate::config::Config;
use chainhook_event_observer::bitcoincore_rpc;
use chainhook_event_observer::bitcoincore_rpc::bitcoin::{BlockHash, OutPoint};
use chainhook_event_observer::bitcoincore_rpc::bitcoincore_rpc_json::{
GetBlockchainInfoResult, GetRawTransactionResult,
};
use chainhook_event_observer::bitcoincore_rpc::{jsonrpc, RpcApi};
use chainhook_event_observer::bitcoincore_rpc::bitcoin::BlockHash;
use chainhook_event_observer::bitcoincore_rpc::RpcApi;
use chainhook_event_observer::bitcoincore_rpc::{Auth, Client};
use chainhook_event_observer::chainhooks::bitcoin::{
handle_bitcoin_hook_action, BitcoinChainhookOccurrence, BitcoinTriggerChainhook,
@@ -14,223 +10,28 @@ use chainhook_event_observer::chainhooks::types::{
Protocols,
};
use chainhook_event_observer::indexer::bitcoin::{
retrieve_full_block_breakdown_with_retry, BitcoinBlockFullBreakdown,
BitcoinTransactionOutputFullBreakdown,
retrieve_block_hash, retrieve_full_block_breakdown_with_retry,
};
use chainhook_event_observer::indexer::ordinals::indexing::entry::Entry;
use chainhook_event_observer::indexer::ordinals::indexing::{
HEIGHT_TO_BLOCK_HASH, INSCRIPTION_NUMBER_TO_INSCRIPTION_ID, OUTPOINT_TO_SAT_RANGES,
SAT_TO_SATPOINT,
use chainhook_event_observer::indexer::ordinals::db::{
get_default_ordinals_db_file_path, initialize_ordinal_state_storage,
open_readonly_ordinals_db_conn, retrieve_satoshi_point_using_local_storage,
write_compacted_block_to_index, CompactedBlock,
};
use chainhook_event_observer::indexer::ordinals::initialize_ordinal_index;
use chainhook_event_observer::indexer::ordinals::inscription_id::InscriptionId;
use chainhook_event_observer::indexer::ordinals::sat_point::SatPoint;
use chainhook_event_observer::indexer::ordinals::ord::indexing::entry::Entry;
use chainhook_event_observer::indexer::ordinals::ord::indexing::{
HEIGHT_TO_BLOCK_HASH, INSCRIPTION_NUMBER_TO_INSCRIPTION_ID,
};
use chainhook_event_observer::indexer::ordinals::ord::initialize_ordinal_index;
use chainhook_event_observer::indexer::ordinals::ord::inscription_id::InscriptionId;
use chainhook_event_observer::indexer::{self, BitcoinChainContext};
use chainhook_event_observer::observer::{
BitcoinConfig, EventObserverConfig, DEFAULT_CONTROL_PORT, DEFAULT_INGESTION_PORT,
};
use chainhook_event_observer::redb::ReadableTable;
use chainhook_event_observer::utils::{file_append, send_request, Context};
use chainhook_types::{
BitcoinTransactionData, BlockIdentifier, OrdinalInscriptionRevealData, OrdinalOperation,
TransactionIdentifier,
};
use rand::prelude::*;
use reqwest::Client as HttpClient;
use rusqlite::{Connection, OpenFlags, Result, ToSql};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::path::{Path, PathBuf};
use std::time::Duration;
use threadpool::ThreadPool;
pub fn initialize_ordinal_state_storage(path: &PathBuf, ctx: &Context) -> Connection {
let conn = create_or_open_readwrite_db(path);
if let Err(e) = conn.execute(
"CREATE TABLE blocks (
id INTEGER NOT NULL PRIMARY KEY,
compacted_bytes TEXT NOT NULL
)",
[],
) {
error!(ctx.expect_logger(), "{}", e.to_string());
}
if let Err(e) = conn.execute(
"CREATE TABLE inscriptions (
inscription_id TEXT NOT NULL PRIMARY KEY,
outpoint_to_watch TEXT NOT NULL,
ancestors TEXT NOT NULL,
descendants TEXT
)",
[],
) {
error!(ctx.expect_logger(), "{}", e.to_string());
}
conn
}
fn create_or_open_readwrite_db(path: &PathBuf) -> Connection {
let open_flags = match std::fs::metadata(path) {
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
// need to create
if let Some(dirp) = PathBuf::from(path).parent() {
std::fs::create_dir_all(dirp).unwrap_or_else(|e| {
eprintln!("Failed to create {:?}: {:?}", dirp, &e);
});
}
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
} else {
panic!("FATAL: could not stat {}", path.display());
}
}
Ok(_md) => {
// can just open
OpenFlags::SQLITE_OPEN_READ_WRITE
}
};
let conn = Connection::open_with_flags(path, open_flags).unwrap();
// db.profile(Some(trace_profile));
// db.busy_handler(Some(tx_busy_handler))?;
conn.pragma_update(None, "journal_mode", &"WAL").unwrap();
conn.pragma_update(None, "synchronous", &"NORMAL").unwrap();
conn
}
fn open_existing_readonly_db(path: &PathBuf) -> Connection {
let open_flags = match std::fs::metadata(path) {
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
panic!("FATAL: could not find {}", path.display());
} else {
panic!("FATAL: could not stat {}", path.display());
}
}
Ok(_md) => {
// can just open
OpenFlags::SQLITE_OPEN_READ_ONLY
}
};
let conn = Connection::open_with_flags(path, open_flags).unwrap();
// db.profile(Some(trace_profile));
// db.busy_handler(Some(tx_busy_handler))?;
conn
}
#[derive(Debug, Serialize, Deserialize)]
// pub struct CompactedBlock(Vec<(Vec<(u32, u16, u64)>, Vec<u64>)>);
pub struct CompactedBlock(
(
([u8; 4], u64),
Vec<([u8; 4], Vec<([u8; 4], u32, u16, u64)>, Vec<u64>)>,
),
);
impl CompactedBlock {
pub fn from_full_block(block: &BitcoinBlockFullBreakdown) -> CompactedBlock {
let mut txs = vec![];
let mut coinbase_value = 0;
let coinbase_txid = {
let txid = hex::decode(block.tx[0].txid.to_string()).unwrap();
[txid[0], txid[1], txid[2], txid[3]]
};
for coinbase_output in block.tx[0].vout.iter() {
coinbase_value += coinbase_output.value.to_sat();
}
for tx in block.tx.iter().skip(1) {
let mut inputs = vec![];
for input in tx.vin.iter().skip(0) {
let txin = hex::decode(input.txid.unwrap().to_string()).unwrap();
inputs.push((
[txin[0], txin[1], txin[2], txin[3]],
input.prevout.as_ref().unwrap().height as u32,
input.vout.unwrap() as u16,
input.prevout.as_ref().unwrap().value.to_sat(),
));
}
let mut outputs = vec![];
for output in tx.vout.iter().skip(1) {
outputs.push(output.value.to_sat());
}
let txid = hex::decode(tx.txid.to_string()).unwrap();
txs.push(([txid[0], txid[1], txid[2], txid[3]], inputs, outputs));
}
CompactedBlock(((coinbase_txid, coinbase_value), txs))
}
pub fn from_hex_bytes(bytes: &str) -> CompactedBlock {
let bytes = hex::decode(&bytes).unwrap();
let value = ciborium::de::from_reader(&bytes[..]).unwrap();
value
}
pub fn to_hex_bytes(&self) -> String {
use ciborium::cbor;
let value = cbor!(self).unwrap();
let mut bytes = vec![];
let _ = ciborium::ser::into_writer(&value, &mut bytes);
let hex_bytes = hex::encode(bytes);
hex_bytes
}
}
pub fn retrieve_compacted_block_from_index(
block_id: u32,
storage_conn: &Connection,
) -> Option<CompactedBlock> {
let args: &[&dyn ToSql] = &[&block_id.to_sql().unwrap()];
let mut stmt = storage_conn
.prepare("SELECT compacted_bytes FROM blocks WHERE id = ?1")
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
let hex_bytes: String = row.get(0).unwrap();
Ok(CompactedBlock::from_hex_bytes(&hex_bytes))
})
.unwrap();
for result in result_iter {
return Some(result.unwrap());
}
return None;
}
pub fn scan_outpoints_to_watch_with_txin(txin: &str, storage_conn: &Connection) -> Option<String> {
let args: &[&dyn ToSql] = &[&txin.to_sql().unwrap()];
let mut stmt = storage_conn
.prepare("SELECT inscription_id FROM inscriptions WHERE outpoint_to_watch = ?1")
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
let inscription_id: String = row.get(0).unwrap();
Ok(inscription_id)
})
.unwrap();
for result in result_iter {
return Some(result.unwrap());
}
return None;
}
pub fn write_compacted_block_to_index(
block_id: u32,
compacted_block: &CompactedBlock,
storage_conn: &Connection,
ctx: &Context,
) {
let serialized_compacted_block = compacted_block.to_hex_bytes();
if let Err(e) = storage_conn.execute(
"INSERT INTO blocks (id, compacted_bytes) VALUES (?1, ?2)",
rusqlite::params![&block_id, &serialized_compacted_block],
) {
error!(ctx.expect_logger(), "Error: {}", e.to_string());
}
}
use chainhook_types::{BitcoinTransactionData, BlockIdentifier, OrdinalOperation};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::mpsc::channel;
pub async fn scan_bitcoin_chain_with_predicate(
predicate: BitcoinChainhookFullSpecification,
@@ -307,30 +108,30 @@ pub async fn scan_bitcoin_chain_with_predicate(
// Optimization: we will use the ordinal storage to provide a set of hints.
let mut inscriptions_hints = BTreeMap::new();
let mut is_scanning_inscriptions = false;
let event_observer_config = EventObserverConfig {
normalization_enabled: true,
grpc_server_enabled: false,
hooks_enabled: true,
bitcoin_rpc_proxy_enabled: true,
event_handlers: vec![],
chainhook_config: None,
ingestion_port: DEFAULT_INGESTION_PORT,
control_port: DEFAULT_CONTROL_PORT,
bitcoin_node_username: config.network.bitcoin_node_rpc_username.clone(),
bitcoin_node_password: config.network.bitcoin_node_rpc_password.clone(),
bitcoin_node_rpc_url: config.network.bitcoin_node_rpc_url.clone(),
stacks_node_rpc_url: config.network.stacks_node_rpc_url.clone(),
operators: HashSet::new(),
display_logs: false,
cache_path: format!("{}", config.expected_cache_path().display()),
bitcoin_network: config.network.bitcoin_network.clone(),
stacks_network: config.network.stacks_network.clone(),
};
let ordinal_index = if let BitcoinPredicateType::Protocol(Protocols::Ordinal(
OrdinalOperations::InscriptionRevealed,
)) = &predicate_spec.predicate
{
let event_observer_config = EventObserverConfig {
normalization_enabled: true,
grpc_server_enabled: false,
hooks_enabled: true,
bitcoin_rpc_proxy_enabled: true,
event_handlers: vec![],
chainhook_config: None,
ingestion_port: DEFAULT_INGESTION_PORT,
control_port: DEFAULT_CONTROL_PORT,
bitcoin_node_username: config.network.bitcoin_node_rpc_username.clone(),
bitcoin_node_password: config.network.bitcoin_node_rpc_password.clone(),
bitcoin_node_rpc_url: config.network.bitcoin_node_rpc_url.clone(),
stacks_node_rpc_url: config.network.stacks_node_rpc_url.clone(),
operators: HashSet::new(),
display_logs: false,
cache_path: format!("{}", config.expected_cache_path().display()),
bitcoin_network: config.network.bitcoin_network.clone(),
stacks_network: config.network.stacks_network.clone(),
};
let ordinal_index = match initialize_ordinal_index(&event_observer_config, None, &ctx) {
Ok(index) => index,
Err(e) => {
@@ -372,7 +173,6 @@ pub async fn scan_bitcoin_chain_with_predicate(
} else {
None
};
let mut bitcoin_context = BitcoinChainContext::new(ordinal_index);
let mut total_hits = vec![];
@@ -382,13 +182,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
let (process_ordinal_tx, process_ordinal_rx) = channel();
let (cache_block_tx, cache_block_rx) = channel();
// use threadpool::ThreadPool;
// use std::sync::mpsc::channel;
// let n_workers = 4;
// let pool = ThreadPool::new(n_workers);
// let (tx, rx) = channel();
let _config = config.clone();
let cache_path = config.expected_cache_path();
let ctx_ = ctx.clone();
let handle_1 = hiro_system_kit::thread_named("Ordinal retrieval")
.spawn(move || {
@@ -399,13 +193,14 @@ pub async fn scan_bitcoin_chain_with_predicate(
ctx_.expect_logger(),
"Retrieving satoshi point for {}", transaction.transaction_identifier.hash
);
let f = retrieve_satoshi_point_using_local_storage(
&_config,
let storage_conn = open_readonly_ordinals_db_conn(&cache_path).unwrap();
let res = retrieve_satoshi_point_using_local_storage(
&storage_conn,
&block_identifier,
&transaction.transaction_identifier,
&ctx_,
);
let (block_number, block_offset) = match hiro_system_kit::nestable_block_on(f) {
let (block_number, block_offset) = match res {
Ok(res) => res,
Err(err) => {
println!("{err}");
@@ -442,7 +237,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
.expect("unable to detach thread");
let ctx_ = ctx.clone();
let db_file = config.get_bitcoin_block_traversal_db_path();
let db_file = get_default_ordinals_db_file_path(&config.expected_cache_path());
let handle_3 = hiro_system_kit::thread_named("Ordinal ingestion")
.spawn(move || {
let conn = initialize_ordinal_state_storage(&db_file, &ctx_);
@@ -455,16 +250,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
let mut pipeline_started = false;
use std::sync::mpsc::channel;
use threadpool::ThreadPool;
let n_workers = 4;
let pool = ThreadPool::new(n_workers);
let bitcoin_config = BitcoinConfig {
username: config.network.bitcoin_node_rpc_username.clone(),
password: config.network.bitcoin_node_rpc_password.clone(),
rpc_url: config.network.bitcoin_node_rpc_url.clone(),
};
let bitcoin_config = event_observer_config.get_bitcoin_config();
for cursor in start_block..=end_block {
debug!(
@@ -479,7 +265,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
}
} else {
loop {
match retrieve_block_hash(config, &cursor).await {
match retrieve_block_hash(&bitcoin_config, &cursor).await {
Ok(res) => break res,
Err(e) => {
error!(ctx.expect_logger(), "Error retrieving block {}", cursor,);
@@ -497,12 +283,8 @@ pub async fn scan_bitcoin_chain_with_predicate(
CompactedBlock::from_full_block(&block_breakdown),
)));
let block = indexer::bitcoin::standardize_bitcoin_block(
&config.network,
block_breakdown,
&mut bitcoin_context,
ctx,
)?;
let block =
indexer::bitcoin::standardize_bitcoin_block(&config.network, block_breakdown, ctx)?;
let mut hits = vec![];
for tx in block.transactions.iter() {
@@ -571,262 +353,3 @@ pub async fn scan_bitcoin_chain_with_predicate(
Ok(())
}
pub async fn retrieve_block_hash(config: &Config, block_height: &u64) -> Result<String, String> {
let body = json!({
"jsonrpc": "1.0",
"id": "chainhook-cli",
"method": "getblockhash",
"params": [block_height]
});
let http_client = HttpClient::builder()
.timeout(Duration::from_secs(20))
.build()
.expect("Unable to build http client");
let block_hash = http_client
.post(&config.network.bitcoin_node_rpc_url)
.basic_auth(
&config.network.bitcoin_node_rpc_username,
Some(&config.network.bitcoin_node_rpc_password),
)
.header("Content-Type", "application/json")
.header("Host", &config.network.bitcoin_node_rpc_url[7..])
.json(&body)
.send()
.await
.map_err(|e| format!("unable to send request ({})", e))?
.json::<jsonrpc::Response>()
.await
.map_err(|e| format!("unable to parse response ({})", e))?
.result::<String>()
.map_err(|e| format!("unable to parse response ({})", e))?;
Ok(block_hash)
}
pub async fn build_bitcoin_traversal_local_storage(
config: Config,
start_block: u64,
end_block: u64,
ctx: &Context,
network_thread: usize,
) -> Result<(), String> {
let retrieve_block_hash_pool = ThreadPool::new(network_thread);
let (block_hash_tx, block_hash_rx) = crossbeam_channel::unbounded();
let retrieve_block_data_pool = ThreadPool::new(network_thread);
let (block_data_tx, block_data_rx) = crossbeam_channel::unbounded();
let compress_block_data_pool = ThreadPool::new(8);
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::unbounded();
let bitcoin_config = BitcoinConfig {
username: config.network.bitcoin_node_rpc_username.clone(),
password: config.network.bitcoin_node_rpc_password.clone(),
rpc_url: config.network.bitcoin_node_rpc_url.clone(),
};
for block_cursor in start_block..end_block {
let block_height = block_cursor.clone();
let block_hash_tx = block_hash_tx.clone();
let config = config.clone();
retrieve_block_hash_pool.execute(move || {
let mut err_count = 0;
let mut rng = rand::thread_rng();
loop {
let future = retrieve_block_hash(&config, &block_height);
match hiro_system_kit::nestable_block_on(future) {
Ok(block_hash) => {
err_count = 0;
block_hash_tx.send(Some((block_cursor, block_hash)));
break;
}
Err(e) => {
err_count += 1;
let delay = (err_count + (rng.next_u64() % 3)) * 1000;
println!("retry hash:fetch in {delay}");
std::thread::sleep(std::time::Duration::from_millis(delay));
}
}
}
});
}
let db_file = config.get_bitcoin_block_traversal_db_path();
let moved_ctx = ctx.clone();
let handle = hiro_system_kit::thread_named("Block data retrieval").spawn(move || {
while let Ok(Some((block_height, block_hash))) = block_hash_rx.recv() {
println!("fetch {block_height}:{block_hash}");
let moved_bitcoin_config = bitcoin_config.clone();
let block_data_tx = block_data_tx.clone();
let moved_ctx = moved_ctx.clone();
retrieve_block_data_pool.execute(move || {
let future = retrieve_full_block_breakdown_with_retry(
&moved_bitcoin_config,
&block_hash,
&moved_ctx,
);
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
block_data_tx.send(Some(block_data));
});
retrieve_block_data_pool.join()
}
});
let handle = hiro_system_kit::thread_named("Block data compression").spawn(move || {
while let Ok(Some(block_data)) = block_data_rx.recv() {
println!("store {}:{}", block_data.height, block_data.hash);
let block_compressed_tx = block_compressed_tx.clone();
compress_block_data_pool.execute(move || {
let compressed_block = CompactedBlock::from_full_block(&block_data);
block_compressed_tx.send(Some((block_data.height as u32, compressed_block)));
});
compress_block_data_pool.join()
}
});
let conn = initialize_ordinal_state_storage(&db_file, &ctx);
while let Ok(Some((block_height, compacted_block))) = block_compressed_rx.recv() {
info!(ctx.expect_logger(), "Storing block #{block_height}");
write_compacted_block_to_index(block_height, &compacted_block, &conn, &ctx);
}
retrieve_block_hash_pool.join();
// Pool of threads fetching block hash
// In: block numbers
// Out: block hash
// Pool of threads fetching full blocks
// In: block hash
// Out: full block
// Pool of thread compressing full blocks
// In: full block
// Out: compacted block
// Receive Compacted block, storing on disc
Ok(())
}
pub async fn retrieve_satoshi_point_using_local_storage(
config: &Config,
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
ctx: &Context,
) -> Result<(u64, u64), String> {
let path = config.get_bitcoin_block_traversal_db_path();
let storage_conn = open_existing_readonly_db(&path);
let mut ordinal_offset = 0;
let mut ordinal_block_number = block_identifier.index as u32;
let txid = {
let bytes = hex::decode(&transaction_identifier.hash[2..]).unwrap();
[bytes[0], bytes[1], bytes[2], bytes[3]]
};
let mut tx_cursor = (txid, 0);
loop {
let res = retrieve_compacted_block_from_index(ordinal_block_number, &storage_conn).unwrap();
info!(
ctx.expect_logger(),
"{ordinal_block_number}:{:?}:{:?}",
hex::encode(&res.0 .0 .0),
hex::encode(txid)
);
std::thread::sleep(std::time::Duration::from_millis(300));
// evaluate exit condition: did we reach a coinbase transaction?
let coinbase_txid = &res.0 .0 .0;
if coinbase_txid.eq(&tx_cursor.0) {
let coinbase_value = &res.0 .0 .1;
if ordinal_offset.lt(coinbase_value) {
break;
}
// loop over the transaction fees to detect the right range
let cut_off = ordinal_offset - coinbase_value;
let mut accumulated_fees = 0;
for (txid, inputs, outputs) in res.0 .1 {
let mut total_in = 0;
for (_, _, _, input_value) in inputs.iter() {
total_in += input_value;
}
let mut total_out = 0;
for output_value in outputs.iter() {
total_out += output_value;
}
let fee = total_in - total_out;
accumulated_fees += fee;
if accumulated_fees > cut_off {
// We are looking at the right transaction
// Retraverse the inputs to select the index to be picked
let mut sats_in = 0;
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
sats_in += txin_value;
if sats_in >= total_out {
ordinal_offset = total_out - (sats_in - txin_value);
ordinal_block_number = block_height;
// println!("{h}: {blockhash} -> {} [in:{} , out: {}] {}/{vout} (input #{in_index}) {compounded_offset}", transaction.txid, transaction.vin.len(), transaction.vout.len(), txid);
tx_cursor = (txin, vout as usize);
break;
}
}
break;
}
}
} else {
// isolate the target transaction
for (txid, inputs, outputs) in res.0 .1 {
// we iterate over the transactions, looking for the transaction target
if !txid.eq(&tx_cursor.0) {
continue;
}
info!(ctx.expect_logger(), "Evaluating {}", hex::encode(&txid));
let mut sats_out = 0;
for (index, output_value) in outputs.iter().enumerate() {
if index == tx_cursor.1 {
break;
}
sats_out += output_value;
}
sats_out += ordinal_offset;
let mut sats_in = 0;
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
sats_in += txin_value;
if sats_in >= sats_out {
ordinal_offset = sats_out - (sats_in - txin_value);
ordinal_block_number = block_height;
info!(
ctx.expect_logger(),
"Block {ordinal_block_number} / Tx {} / [in:{sats_in}, out:{sats_out}]: {block_height} -> {ordinal_block_number}:{ordinal_offset} -> {}:{vout}",
hex::encode(&txid),
hex::encode(&txin),
);
tx_cursor = (txin, vout as usize);
break;
}
}
}
}
}
Ok((ordinal_block_number.into(), ordinal_offset))
}
pub async fn scan_bitcoin_chain_for_ordinal_inscriptions(
subscribers: Vec<HookAction>,
first_inscription_height: u64,
config: &Config,
ctx: &Context,
) -> Result<(), String> {
Ok(())
}

View File

@@ -103,7 +103,7 @@ pub async fn scan_stacks_chain_with_predicate(
stacks_network: config.network.stacks_network.clone(),
};
let mut indexer = Indexer::new(config.network.clone(), None);
let mut indexer = Indexer::new(config.network.clone());
let mut canonical_fork = {
let mut cursor = BlockIdentifier::default();

View File

@@ -45,6 +45,10 @@ anyhow = { version = "1.0.56", features = ["backtrace"], optional = true }
futures = "0.3.21"
hyper = { version = "0.14.24", features = ["http1", "client"] }
hex = "0.4.3"
rusqlite = { version = "0.27.0", features = ["bundled"] }
ciborium = "0.2.0"
threadpool = "1.8.1"
rand = "0.8.5"
[replace]
"jsonrpc:0.13.0" = { git = 'https://github.com/apoelstra/rust-jsonrpc', rev = "1063671f122a8985c1b7c29030071253da515839" }

View File

@@ -236,16 +236,16 @@ pub fn evaluate_stacks_predicate_on_block<'a>(
_ctx: &Context,
) -> bool {
match &chainhook.predicate {
StacksPredicate::BlockIdentifierIndex(BlockIdentifierIndexRule::Between(a, b)) => {
StacksPredicate::BlockHeight(BlockIdentifierIndexRule::Between(a, b)) => {
block.get_identifier().index.gt(a) && block.get_identifier().index.lt(b)
}
StacksPredicate::BlockIdentifierIndex(BlockIdentifierIndexRule::HigherThan(a)) => {
StacksPredicate::BlockHeight(BlockIdentifierIndexRule::HigherThan(a)) => {
block.get_identifier().index.gt(a)
}
StacksPredicate::BlockIdentifierIndex(BlockIdentifierIndexRule::LowerThan(a)) => {
StacksPredicate::BlockHeight(BlockIdentifierIndexRule::LowerThan(a)) => {
block.get_identifier().index.lt(a)
}
StacksPredicate::BlockIdentifierIndex(BlockIdentifierIndexRule::Equals(a)) => {
StacksPredicate::BlockHeight(BlockIdentifierIndexRule::Equals(a)) => {
block.get_identifier().index.eq(a)
}
StacksPredicate::ContractDeployment(_)
@@ -380,7 +380,7 @@ pub fn evaluate_stacks_predicate_on_transaction<'a>(
false
}
StacksPredicate::Txid(txid) => txid.eq(&transaction.transaction_identifier.hash),
StacksPredicate::BlockIdentifierIndex(_) => unreachable!(),
StacksPredicate::BlockHeight(_) => unreachable!(),
}
}

View File

@@ -668,10 +668,8 @@ pub struct StacksChainhookSpecification {
impl StacksChainhookSpecification {
pub fn is_predicate_targeting_block_header(&self) -> bool {
match &self.predicate {
StacksPredicate::BlockIdentifierIndex(_)
// | StacksPredicate::BlockIdentifierHash(_)
// | StacksPredicate::BitcoinBlockIdentifierHash(_)
// | StacksPredicate::BitcoinBlockIdentifierIndex(_)
StacksPredicate::BlockHeight(_)
// | &StacksPredicate::BitcoinBlockHeight(_)
=> true,
_ => false,
}
@@ -682,10 +680,7 @@ impl StacksChainhookSpecification {
#[serde(rename_all = "snake_case")]
#[serde(tag = "scope")]
pub enum StacksPredicate {
BlockIdentifierIndex(BlockIdentifierIndexRule),
// BlockIdentifierHash(BlockIdentifierHashRule),
// BitcoinBlockIdentifierHash(BlockIdentifierHashRule),
// BitcoinBlockIdentifierIndex(BlockIdentifierHashRule),
BlockHeight(BlockIdentifierIndexRule),
ContractDeployment(StacksContractDeploymentPredicate),
ContractCall(StacksContractCallBasedPredicate),
PrintEvent(StacksPrintEventBasedPredicate),

View File

@@ -31,13 +31,13 @@ use hiro_system_kit::slog;
use rocket::serde::json::Value as JsonValue;
use serde::Deserialize;
use super::ordinals::chain::Chain;
use super::ordinals::indexing::entry::InscriptionEntry;
use super::ordinals::indexing::updater::{BlockData, OrdinalIndexUpdater};
use super::ordinals::indexing::OrdinalIndex;
use super::ordinals::inscription::InscriptionParser;
use super::ordinals::inscription_id::InscriptionId;
use super::ordinals::sat::Sat;
use super::ordinals::ord::chain::Chain;
use super::ordinals::ord::indexing::entry::InscriptionEntry;
use super::ordinals::ord::indexing::updater::{BlockData, OrdinalIndexUpdater};
use super::ordinals::ord::indexing::OrdinalIndex;
use super::ordinals::ord::inscription_id::InscriptionId;
use super::ordinals::ord::sat::Sat;
use super::BitcoinChainContext;
#[derive(Clone, PartialEq, Debug, Deserialize, Serialize)]
@@ -159,7 +159,7 @@ pub async fn retrieve_full_block_breakdown_with_retry(
Ok(block)
}
async fn retrieve_full_block_breakdown(
pub async fn retrieve_full_block_breakdown(
bitcoin_config: &BitcoinConfig,
block_hash: &str,
_ctx: &Context,
@@ -193,10 +193,42 @@ async fn retrieve_full_block_breakdown(
Ok(block)
}
pub async fn retrieve_block_hash(
bitcoin_config: &BitcoinConfig,
block_height: &u64,
) -> Result<String, String> {
use reqwest::Client as HttpClient;
let body = json!({
"jsonrpc": "1.0",
"id": "chainhook-cli",
"method": "getblockhash",
"params": [block_height]
});
let http_client = HttpClient::builder()
.timeout(Duration::from_secs(20))
.build()
.expect("Unable to build http client");
let block_hash = http_client
.post(&bitcoin_config.rpc_url)
.basic_auth(&bitcoin_config.username, Some(&bitcoin_config.password))
.header("Content-Type", "application/json")
.header("Host", &bitcoin_config.rpc_url[7..])
.json(&body)
.send()
.await
.map_err(|e| format!("unable to send request ({})", e))?
.json::<bitcoincore_rpc::jsonrpc::Response>()
.await
.map_err(|e| format!("unable to parse response ({})", e))?
.result::<String>()
.map_err(|e| format!("unable to parse response ({})", e))?;
Ok(block_hash)
}
pub fn standardize_bitcoin_block(
indexer_config: &IndexerConfig,
block: BitcoinBlockFullBreakdown,
bitcoin_context: &mut BitcoinChainContext,
ctx: &Context,
) -> Result<BitcoinBlockData, String> {
let mut transactions = vec![];
@@ -228,10 +260,18 @@ pub fn standardize_bitcoin_block(
}
let mut inputs = vec![];
let mut sats_in = 0;
for input in tx.vin.drain(..) {
if input.is_coinbase() {
continue;
}
let value = input
.prevout
.as_ref()
.expect("not provided for coinbase txs")
.value
.to_sat();
sats_in += value;
inputs.push(TxIn {
previous_output: OutPoint {
txid: input
@@ -239,6 +279,8 @@ pub fn standardize_bitcoin_block(
.expect("not provided for coinbase txs")
.to_string(),
vout: input.vout.expect("not provided for coinbase txs"),
block_height: input.prevout.expect("not provided for coinbase txs").height,
value,
},
script_sig: format!(
"0x{}",
@@ -256,9 +298,12 @@ pub fn standardize_bitcoin_block(
}
let mut outputs = vec![];
let mut sats_out = 0;
for output in tx.vout.drain(..) {
let value = output.value.to_sat();
sats_out += value;
outputs.push(TxOut {
value: output.value.to_sat(),
value,
script_pubkey: format!("0x{}", hex::encode(&output.script_pub_key.hex)),
});
}
@@ -274,6 +319,7 @@ pub fn standardize_bitcoin_block(
stacks_operations,
ordinal_operations,
proof: None,
fee: sats_out - sats_in,
},
};
transactions.push(tx);
@@ -323,18 +369,28 @@ fn try_parse_ordinal_operation(
let no_content_bytes = vec![];
let inscription_content_bytes = inscription.body().unwrap_or(&no_content_bytes);
let inscriber_address = if let Ok(authors) = Address::from_script(
&tx.vout[0].script_pub_key.script().unwrap(),
bitcoin::Network::Bitcoin,
) {
Some(authors.to_string())
} else {
None
};
return Some(OrdinalOperation::InscriptionRevealed(
OrdinalInscriptionRevealData {
content_type: inscription.content_type().unwrap_or("unknown").to_string(),
content_bytes: format!("0x{}", hex::encode(&inscription_content_bytes)),
content_length: inscription_content_bytes.len(),
inscription_id: inscription_id.to_string(),
inscription_authors: vec![],
inscriber_address,
inscription_number: 0,
inscription_fee: 0,
ordinal_number: 0,
ordinal_block_height: 0,
ordinal_offset: 0,
outpoint_post_inscription: format!("{}:0:0", tx.txid.clone()),
},
));
}

View File

@@ -9,11 +9,15 @@ use chainhook_types::{
};
use hiro_system_kit::slog;
use rocket::serde::json::Value as JsonValue;
use rusqlite::Connection;
use stacks::StacksBlockPool;
use stacks_rpc_client::PoxInfo;
use std::collections::{HashMap, VecDeque};
use std::{
collections::{HashMap, VecDeque},
path::PathBuf,
};
use self::{bitcoin::BitcoinBlockPool, ordinals::indexing::OrdinalIndex};
use self::{bitcoin::BitcoinBlockPool, ordinals::ord::indexing::OrdinalIndex};
#[derive(Deserialize, Debug, Clone, Default)]
pub struct AssetClassCache {
@@ -35,15 +39,11 @@ impl StacksChainContext {
}
}
pub struct BitcoinChainContext {
pub ordinal_index: Option<OrdinalIndex>,
}
pub struct BitcoinChainContext {}
impl BitcoinChainContext {
pub fn new(ordinal_index: Option<OrdinalIndex>) -> BitcoinChainContext {
BitcoinChainContext {
ordinal_index: ordinal_index,
}
pub fn new() -> BitcoinChainContext {
BitcoinChainContext {}
}
}
@@ -66,11 +66,11 @@ pub struct Indexer {
}
impl Indexer {
pub fn new(config: IndexerConfig, ordinal_index: Option<OrdinalIndex>) -> Indexer {
pub fn new(config: IndexerConfig) -> Indexer {
let stacks_blocks_pool = StacksBlockPool::new();
let bitcoin_blocks_pool = BitcoinBlockPool::new();
let stacks_context = StacksChainContext::new();
let bitcoin_context = BitcoinChainContext::new(ordinal_index);
let bitcoin_context = BitcoinChainContext::new();
Indexer {
config,
@@ -86,12 +86,7 @@ impl Indexer {
block: BitcoinBlockFullBreakdown,
ctx: &Context,
) -> Result<Option<BitcoinChainEvent>, String> {
let block = bitcoin::standardize_bitcoin_block(
&self.config,
block,
&mut self.bitcoin_context,
ctx,
)?;
let block = bitcoin::standardize_bitcoin_block(&self.config, block, ctx)?;
let event = self.bitcoin_blocks_pool.process_block(block, ctx);
event
}

View File

@@ -0,0 +1,537 @@
use std::{path::PathBuf, time::Duration};
use chainhook_types::{BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier};
use hiro_system_kit::slog;
use rand::RngCore;
use rusqlite::{Connection, OpenFlags, ToSql};
use threadpool::ThreadPool;
use crate::{
indexer::bitcoin::{
retrieve_block_hash, retrieve_full_block_breakdown_with_retry, BitcoinBlockFullBreakdown,
},
observer::BitcoinConfig,
utils::Context,
};
pub fn get_default_ordinals_db_file_path(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
destination_path.push("bitcoin_block_traversal-readonly.sqlite");
destination_path
}
pub fn open_readonly_ordinals_db_conn(base_dir: &PathBuf) -> Result<Connection, String> {
let path = get_default_ordinals_db_file_path(&base_dir);
let conn = open_existing_readonly_db(&path);
Ok(conn)
}
pub fn initialize_ordinal_state_storage(path: &PathBuf, ctx: &Context) -> Connection {
let conn = create_or_open_readwrite_db(path, ctx);
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS blocks (
id INTEGER NOT NULL PRIMARY KEY,
compacted_bytes TEXT NOT NULL
)",
[],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS inscriptions (
inscription_id TEXT NOT NULL PRIMARY KEY,
outpoint_to_watch TEXT NOT NULL,
satoshi_id TEXT NOT NULL,
inscription_number INTEGER NOT NULL,
offset NOT NULL
)",
[],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_outpoint_to_watch ON inscriptions(outpoint_to_watch);",
[],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_satoshi_id ON inscriptions(satoshi_id);",
[],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
conn
}
fn create_or_open_readwrite_db(path: &PathBuf, ctx: &Context) -> Connection {
let open_flags = match std::fs::metadata(path) {
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
// need to create
if let Some(dirp) = PathBuf::from(path).parent() {
std::fs::create_dir_all(dirp).unwrap_or_else(|e| {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
});
}
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
} else {
panic!("FATAL: could not stat {}", path.display());
}
}
Ok(_md) => {
// can just open
OpenFlags::SQLITE_OPEN_READ_WRITE
}
};
let conn = Connection::open_with_flags(path, open_flags).unwrap();
// db.profile(Some(trace_profile));
// db.busy_handler(Some(tx_busy_handler))?;
conn.pragma_update(None, "journal_mode", &"WAL").unwrap();
conn.pragma_update(None, "synchronous", &"NORMAL").unwrap();
conn
}
fn open_existing_readonly_db(path: &PathBuf) -> Connection {
let open_flags = match std::fs::metadata(path) {
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
panic!("FATAL: could not find {}", path.display());
} else {
panic!("FATAL: could not stat {}", path.display());
}
}
Ok(_md) => {
// can just open
OpenFlags::SQLITE_OPEN_READ_ONLY
}
};
let conn = Connection::open_with_flags(path, open_flags).unwrap();
conn
}
#[derive(Debug, Serialize, Deserialize)]
// pub struct CompactedBlock(Vec<(Vec<(u32, u16, u64)>, Vec<u64>)>);
pub struct CompactedBlock(
(
([u8; 4], u64),
Vec<([u8; 4], Vec<([u8; 4], u32, u16, u64)>, Vec<u64>)>,
),
);
impl CompactedBlock {
pub fn from_full_block(block: &BitcoinBlockFullBreakdown) -> CompactedBlock {
let mut txs = vec![];
let mut coinbase_value = 0;
let coinbase_txid = {
let txid = hex::decode(block.tx[0].txid.to_string()).unwrap();
[txid[0], txid[1], txid[2], txid[3]]
};
for coinbase_output in block.tx[0].vout.iter() {
coinbase_value += coinbase_output.value.to_sat();
}
for tx in block.tx.iter().skip(1) {
let mut inputs = vec![];
for input in tx.vin.iter() {
let txin = hex::decode(input.txid.unwrap().to_string()).unwrap();
inputs.push((
[txin[0], txin[1], txin[2], txin[3]],
input.prevout.as_ref().unwrap().height as u32,
input.vout.unwrap() as u16,
input.prevout.as_ref().unwrap().value.to_sat(),
));
}
let mut outputs = vec![];
for output in tx.vout.iter() {
outputs.push(output.value.to_sat());
}
let txid = hex::decode(tx.txid.to_string()).unwrap();
txs.push(([txid[0], txid[1], txid[2], txid[3]], inputs, outputs));
}
CompactedBlock(((coinbase_txid, coinbase_value), txs))
}
pub fn from_hex_bytes(bytes: &str) -> CompactedBlock {
let bytes = hex::decode(&bytes).unwrap();
let value = ciborium::de::from_reader(&bytes[..]).unwrap();
value
}
pub fn to_hex_bytes(&self) -> String {
use ciborium::cbor;
let value = cbor!(self).unwrap();
let mut bytes = vec![];
let _ = ciborium::ser::into_writer(&value, &mut bytes);
let hex_bytes = hex::encode(bytes);
hex_bytes
}
}
pub fn retrieve_compacted_block_from_index(
block_id: u32,
storage_conn: &Connection,
) -> Option<CompactedBlock> {
let args: &[&dyn ToSql] = &[&block_id.to_sql().unwrap()];
let mut stmt = storage_conn
.prepare("SELECT compacted_bytes FROM blocks WHERE id = ?1")
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
let hex_bytes: String = row.get(0).unwrap();
Ok(CompactedBlock::from_hex_bytes(&hex_bytes))
})
.unwrap();
for result in result_iter {
return Some(result.unwrap());
}
return None;
}
pub fn scan_existing_inscriptions_id(
inscription_id: &str,
storage_conn: &Connection,
) -> Option<String> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let mut stmt = storage_conn
.prepare("SELECT inscription_id FROM inscriptions WHERE inscription_id = ?1")
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
let inscription_id: String = row.get(0).unwrap();
Ok(inscription_id)
})
.unwrap();
for result in result_iter {
return Some(result.unwrap());
}
return None;
}
pub fn store_new_inscription(
inscription_data: &OrdinalInscriptionRevealData,
storage_conn: &Connection,
ctx: &Context,
) {
if let Err(e) = storage_conn.execute(
"INSERT INTO inscriptions (inscription_id, outpoint_to_watch, satoshi_id, inscription_number) VALUES (?1, ?2)",
rusqlite::params![&inscription_data.inscription_id, &inscription_data.outpoint_post_inscription, &inscription_data.inscription_id, &inscription_data.inscription_id],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}
pub fn update_transfered_inscription(
inscription_id: &str,
outpoint_post_transfer: &str,
offset: u64,
storage_conn: &Connection,
ctx: &Context,
) {
if let Err(e) = storage_conn.execute(
"UPDATE inscriptions SET outpoint_to_watch = ?1, offset = ?2 WHERE inscription_id = ?3",
rusqlite::params![&outpoint_post_transfer, &offset, &inscription_id],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}
pub fn find_last_inscription_number(
storage_conn: &Connection,
ctx: &Context,
) -> Result<u64, String> {
let args: &[&dyn ToSql] = &[];
let mut stmt = storage_conn
.prepare(
"SELECT inscription_number FROM inscriptions ORDER BY inscription_number DESC LIMIT 1",
)
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
let inscription_number: u64 = row.get(0).unwrap();
Ok(inscription_number)
})
.unwrap();
for result in result_iter {
return Ok(result.unwrap());
}
return Ok(0);
}
pub fn find_inscription_with_satoshi_id(
satoshi_id: &str,
storage_conn: &Connection,
ctx: &Context,
) -> Option<String> {
let args: &[&dyn ToSql] = &[&satoshi_id.to_sql().unwrap()];
let mut stmt = storage_conn
.prepare("SELECT inscription_id FROM inscriptions WHERE satoshi_id = ?1")
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
let inscription_id: String = row.get(0).unwrap();
Ok(inscription_id)
})
.unwrap();
for result in result_iter {
return Some(result.unwrap());
}
return None;
}
pub fn find_inscriptions_at_wached_outpoint(
txin: &str,
storage_conn: &Connection,
) -> Vec<(String, u64, String, u64)> {
let args: &[&dyn ToSql] = &[&txin.to_sql().unwrap()];
let mut stmt = storage_conn
.prepare("SELECT inscription_id, inscription_number, satoshi_id, offset FROM inscriptions WHERE outpoint_to_watch = ?1 ORDER BY offset ASC")
.unwrap();
let mut results = vec![];
let result_iter = stmt
.query_map(args, |row| {
let inscription_id: String = row.get(0).unwrap();
let inscription_number: u64 = row.get(1).unwrap();
let satoshi_id: String = row.get(2).unwrap();
let offset: u64 = row.get(1).unwrap();
results.push((inscription_id, inscription_number, satoshi_id, offset));
Ok(())
})
.unwrap();
return results;
}
pub fn write_compacted_block_to_index(
block_id: u32,
compacted_block: &CompactedBlock,
storage_conn: &Connection,
ctx: &Context,
) {
let serialized_compacted_block = compacted_block.to_hex_bytes();
if let Err(e) = storage_conn.execute(
"INSERT INTO blocks (id, compacted_bytes) VALUES (?1, ?2)",
rusqlite::params![&block_id, &serialized_compacted_block],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}
pub async fn build_bitcoin_traversal_local_storage(
bitcoin_config: &BitcoinConfig,
cache_path: &PathBuf,
start_block: u64,
end_block: u64,
ctx: &Context,
network_thread: usize,
) -> Result<(), String> {
let retrieve_block_hash_pool = ThreadPool::new(network_thread);
let (block_hash_tx, block_hash_rx) = crossbeam_channel::unbounded();
let retrieve_block_data_pool = ThreadPool::new(network_thread);
let (block_data_tx, block_data_rx) = crossbeam_channel::unbounded();
let compress_block_data_pool = ThreadPool::new(8);
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::unbounded();
for block_cursor in start_block..end_block {
let block_height = block_cursor.clone();
let block_hash_tx = block_hash_tx.clone();
let config = bitcoin_config.clone();
retrieve_block_hash_pool.execute(move || {
let mut err_count = 0;
let mut rng = rand::thread_rng();
loop {
let future = retrieve_block_hash(&config, &block_height);
match hiro_system_kit::nestable_block_on(future) {
Ok(block_hash) => {
err_count = 0;
block_hash_tx.send(Some((block_cursor, block_hash)));
break;
}
Err(e) => {
err_count += 1;
let delay = (err_count + (rng.next_u64() % 3)) * 1000;
println!("retry hash:fetch in {delay}");
std::thread::sleep(std::time::Duration::from_millis(delay));
}
}
}
});
}
let db_file = get_default_ordinals_db_file_path(&cache_path);
let bitcoin_config = bitcoin_config.clone();
let moved_ctx = ctx.clone();
let handle = hiro_system_kit::thread_named("Block data retrieval").spawn(move || {
while let Ok(Some((block_height, block_hash))) = block_hash_rx.recv() {
println!("fetch {block_height}:{block_hash}");
let moved_bitcoin_config = bitcoin_config.clone();
let block_data_tx = block_data_tx.clone();
let moved_ctx = moved_ctx.clone();
retrieve_block_data_pool.execute(move || {
let future = retrieve_full_block_breakdown_with_retry(
&moved_bitcoin_config,
&block_hash,
&moved_ctx,
);
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
block_data_tx.send(Some(block_data));
});
retrieve_block_data_pool.join()
}
});
let handle = hiro_system_kit::thread_named("Block data compression").spawn(move || {
while let Ok(Some(block_data)) = block_data_rx.recv() {
println!("store {}:{}", block_data.height, block_data.hash);
let block_compressed_tx = block_compressed_tx.clone();
compress_block_data_pool.execute(move || {
let compressed_block = CompactedBlock::from_full_block(&block_data);
block_compressed_tx.send(Some((block_data.height as u32, compressed_block)));
});
compress_block_data_pool.join()
}
});
let conn = initialize_ordinal_state_storage(&db_file, &ctx);
while let Ok(Some((block_height, compacted_block))) = block_compressed_rx.recv() {
ctx.try_log(|logger| slog::debug!(logger, "Storing block #{block_height}"));
write_compacted_block_to_index(block_height, &compacted_block, &conn, &ctx);
}
retrieve_block_hash_pool.join();
Ok(())
}
pub fn retrieve_satoshi_point_using_local_storage(
storage_conn: &Connection,
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
ctx: &Context,
) -> Result<(u64, u64), String> {
let mut ordinal_offset = 0;
let mut ordinal_block_number = block_identifier.index as u32;
let txid = {
let bytes = hex::decode(&transaction_identifier.hash[2..]).unwrap();
[bytes[0], bytes[1], bytes[2], bytes[3]]
};
let mut tx_cursor = (txid, 0);
loop {
let res = retrieve_compacted_block_from_index(ordinal_block_number, &storage_conn).unwrap();
ctx.try_log(|logger| {
slog::debug!(
logger,
"{ordinal_block_number}:{:?}:{:?}",
hex::encode(&res.0 .0 .0),
hex::encode(txid)
)
});
std::thread::sleep(std::time::Duration::from_millis(300));
// evaluate exit condition: did we reach a coinbase transaction?
let coinbase_txid = &res.0 .0 .0;
if coinbase_txid.eq(&tx_cursor.0) {
let coinbase_value = &res.0 .0 .1;
if ordinal_offset.lt(coinbase_value) {
break;
}
// loop over the transaction fees to detect the right range
let cut_off = ordinal_offset - coinbase_value;
let mut accumulated_fees = 0;
for (txid, inputs, outputs) in res.0 .1 {
let mut total_in = 0;
for (_, _, _, input_value) in inputs.iter() {
total_in += input_value;
}
let mut total_out = 0;
for output_value in outputs.iter() {
total_out += output_value;
}
let fee = total_in - total_out;
accumulated_fees += fee;
if accumulated_fees > cut_off {
// We are looking at the right transaction
// Retraverse the inputs to select the index to be picked
let mut sats_in = 0;
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
sats_in += txin_value;
if sats_in >= total_out {
ordinal_offset = total_out - (sats_in - txin_value);
ordinal_block_number = block_height;
// println!("{h}: {blockhash} -> {} [in:{} , out: {}] {}/{vout} (input #{in_index}) {compounded_offset}", transaction.txid, transaction.vin.len(), transaction.vout.len(), txid);
tx_cursor = (txin, vout as usize);
break;
}
}
break;
}
}
} else {
// isolate the target transaction
for (txid, inputs, outputs) in res.0 .1 {
// we iterate over the transactions, looking for the transaction target
if !txid.eq(&tx_cursor.0) {
continue;
}
ctx.try_log(|logger| {
slog::debug!(logger, "Evaluating {}: {:?}", hex::encode(&txid), outputs)
});
let mut sats_out = 0;
for (index, output_value) in outputs.iter().enumerate() {
if index == tx_cursor.1 {
break;
}
ctx.try_log(|logger| {
slog::debug!(logger, "Adding {} from output #{}", output_value, index)
});
sats_out += output_value;
}
sats_out += ordinal_offset;
let mut sats_in = 0;
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
sats_in += txin_value;
if sats_in >= sats_out {
ordinal_offset = sats_out - (sats_in - txin_value);
ordinal_block_number = block_height;
ctx.try_log(|logger| slog::debug!(logger, "Block {ordinal_block_number} / Tx {} / [in:{sats_in}, out:{sats_out}]: {block_height} -> {ordinal_block_number}:{ordinal_offset} -> {}:{vout}",
hex::encode(&txid),
hex::encode(&txin)));
tx_cursor = (txin, vout as usize);
break;
}
}
}
}
}
Ok((ordinal_block_number.into(), ordinal_offset))
}
// pub async fn scan_bitcoin_chain_for_ordinal_inscriptions(
// subscribers: Vec<HookAction>,
// first_inscription_height: u64,
// config: &Config,
// ctx: &Context,
// ) -> Result<(), String> {
// Ok(())
// }

View File

@@ -1,191 +1,3 @@
mod blocktime;
pub mod chain;
mod deserialize_from_str;
mod epoch;
mod height;
pub mod indexing;
pub mod db;
pub mod inscription;
pub mod inscription_id;
pub mod sat;
pub mod sat_point;
use hiro_system_kit::slog;
use std::{path::PathBuf, time::Duration};
type Result<T = (), E = anyhow::Error> = std::result::Result<T, E>;
use chainhook_types::BitcoinNetwork;
use crate::{observer::EventObserverConfig, utils::Context};
const DIFFCHANGE_INTERVAL: u64 =
bitcoincore_rpc::bitcoin::blockdata::constants::DIFFCHANGE_INTERVAL as u64;
const SUBSIDY_HALVING_INTERVAL: u64 =
bitcoincore_rpc::bitcoin::blockdata::constants::SUBSIDY_HALVING_INTERVAL as u64;
const CYCLE_EPOCHS: u64 = 6;
pub fn initialize_ordinal_index(
config: &EventObserverConfig,
index_path: Option<PathBuf>,
ctx: &Context,
) -> Result<self::indexing::OrdinalIndex, String> {
let chain = match &config.bitcoin_network {
BitcoinNetwork::Mainnet => chain::Chain::Mainnet,
BitcoinNetwork::Testnet => chain::Chain::Testnet,
BitcoinNetwork::Regtest => chain::Chain::Regtest,
};
let index_options = self::indexing::Options {
rpc_username: config.bitcoin_node_username.clone(),
rpc_password: config.bitcoin_node_password.clone(),
data_dir: config.cache_path.clone().into(),
chain: chain,
first_inscription_height: Some(chain.first_inscription_height()),
height_limit: None,
index: index_path,
rpc_url: config.bitcoin_node_rpc_url.clone(),
};
let index = match self::indexing::OrdinalIndex::open(&index_options) {
Ok(index) => index,
Err(e) => {
ctx.try_log(|logger| {
slog::error!(logger, "Unable to open ordinal index: {}", e.to_string())
});
std::process::exit(1);
}
};
Ok(index)
}
// 1) Retrieve the block height of the oldest block (coinbase), which will indicates the range
// 2) Look at the following transaction N:
// - Compute SUM of the inputs located before the Coinbase Spend, and remove , remove the outputs (N+1) in the list of
//
// -> 10-20
//
// 10-20 -> 10-15
// -> 15-18
//
// 10-15 -> 10-12
// -> 12-14
//
// 10-12 -> X
// -> 10-20
//
// 10-20 -> 10-15
// -> 15-18
//
// 15-18 -> 15-17
// -> 17-18
//
// 17-18 -> X
//
// Open a transaction:
// Locate output, based on amounts transfered
// pub async fn retrieve_satoshi_point(
// config: &Config,
// origin_txid: &str,
// output_index: usize,
// ) -> Result<(), String> {
// let http_client = HttpClient::builder()
// .timeout(Duration::from_secs(20))
// .build()
// .expect("Unable to build http client");
// let mut transactions_chain = VecDeque::new();
// let mut tx_cursor = (origin_txid.to_string(), 0);
// let mut offset = 0;
// loop {
// println!("{:?}", tx_cursor);
// // Craft RPC request
// let body = json!({
// "jsonrpc": "1.0",
// "id": "chainhook-cli",
// "method": "getrawtransaction",
// "params": vec![json!(tx_cursor.0), json!(true)]
// });
// // Send RPC request
// let transaction = http_client
// .post(&config.network.bitcoin_node_rpc_url)
// .basic_auth(
// &config.network.bitcoin_node_rpc_username,
// Some(&config.network.bitcoin_node_rpc_password),
// )
// .header("Content-Type", "application/json")
// .header("Host", &config.network.bitcoin_node_rpc_url[7..])
// .json(&body)
// .send()
// .await
// .map_err(|e| format!("unable to send request ({})", e))?
// .json::<bitcoincore_rpc::jsonrpc::Response>()
// .await
// .map_err(|e| format!("unable to parse response ({})", e))?
// .result::<GetRawTransactionResult>()
// .map_err(|e| format!("unable to parse response ({})", e))?;
// if transaction.is_coinbase() {
// transactions_chain.push_front(transaction);
// break;
// }
// // Identify the TXIN that we should select, just take the 1st one for now
// let mut sats_out = 0;
// for (index, output) in transaction.vout.iter().enumerate() {
// // should reorder output.n?
// assert_eq!(index as u32, output.n);
// if index == tx_cursor.1 {
// break;
// }
// sats_out += output.value.to_sat();
// }
// let mut sats_in = 0;
// for input in transaction.vin.iter() {
// let txid = input.txid.unwrap().to_string();
// let vout = input.vout.unwrap() as usize;
// let body = json!({
// "jsonrpc": "1.0",
// "id": "chainhook-cli",
// "method": "getrawtransaction",
// "params": vec![json!(txid), json!(true)]
// // "method": "gettxout",
// // "params": vec![json!(&txid), json!(&(vout + 1)), json!(false)]
// });
// let raw_txin = http_client
// .post(&config.network.bitcoin_node_rpc_url)
// .basic_auth(
// &config.network.bitcoin_node_rpc_username,
// Some(&config.network.bitcoin_node_rpc_password),
// )
// .header("Content-Type", "application/json")
// .header("Host", &config.network.bitcoin_node_rpc_url[7..])
// .json(&body)
// .send()
// .await
// .map_err(|e| format!("unable to send request ({})", e))?
// .json::<bitcoincore_rpc::jsonrpc::Response>()
// .await
// .map_err(|e| format!("unable to parse response ({})", e))?
// // .result::<GetTxOutResult>()
// // .map_err(|e| format!("unable to parse response ({})", e))?;
// .result::<GetRawTransactionResult>()
// .map_err(|e| format!("unable to parse response ({})", e))?;
// // println!("{:?}", txout);
// sats_in += raw_txin.vout[vout].value.to_sat();
// if sats_in >= sats_out {
// tx_cursor = (txid, vout as usize);
// break;
// }
// }
// transactions_chain.push_front(transaction);
// }
// Ok(())
// }
pub mod ord;

View File

@@ -152,7 +152,7 @@ impl From<Height> for Epoch {
#[cfg(test)]
mod tests {
use crate::indexer::ordinals::{
use crate::indexer::ordinals::ord::{
epoch::Epoch, height::Height, sat::Sat, SUBSIDY_HALVING_INTERVAL,
};

View File

@@ -4,25 +4,25 @@ use super::{epoch::Epoch, sat::Sat, *};
// use std::fmt::Display;
#[derive(Copy, Clone, Debug, Ord, Eq, PartialEq, PartialOrd)]
pub(crate) struct Height(pub(crate) u64);
pub struct Height(pub u64);
impl Height {
pub(crate) fn n(self) -> u64 {
pub fn n(self) -> u64 {
self.0
}
pub(crate) fn subsidy(self) -> u64 {
pub fn subsidy(self) -> u64 {
Epoch::from(self).subsidy()
}
pub(crate) fn starting_sat(self) -> Sat {
pub fn starting_sat(self) -> Sat {
let epoch = Epoch::from(self);
let epoch_starting_sat = epoch.starting_sat();
let epoch_starting_height = epoch.starting_height();
epoch_starting_sat + (self - epoch_starting_height.n()).n() * epoch.subsidy()
}
pub(crate) fn period_offset(self) -> u64 {
pub fn period_offset(self) -> u64 {
self.0 % DIFFCHANGE_INTERVAL
}
}

View File

@@ -6,7 +6,7 @@ use bitcoincore_rpc::bitcoin::{
BlockHash, OutPoint, Txid,
};
use crate::indexer::ordinals::{inscription_id::InscriptionId, sat::Sat, sat_point::SatPoint};
use crate::indexer::ordinals::ord::{inscription_id::InscriptionId, sat::Sat, sat_point::SatPoint};
pub trait Entry: Sized {
type Value;

View File

@@ -1,4 +1,4 @@
use crate::indexer::ordinals::height::Height;
use crate::indexer::ordinals::ord::height::Height;
use super::*;

View File

@@ -1,5 +1,5 @@
use crate::{
indexer::ordinals::{height::Height, sat::Sat, sat_point::SatPoint},
indexer::ordinals::ord::{height::Height, sat::Sat, sat_point::SatPoint},
utils::Context,
};
use anyhow::Context as Ctx;

View File

@@ -1,9 +1,5 @@
use crate::indexer::ordinals::{
inscription::{Inscription, InscriptionParser},
inscription_id::InscriptionId,
sat::Sat,
sat_point::SatPoint,
};
use crate::indexer::ordinals::inscription::{Inscription, InscriptionParser};
use crate::indexer::ordinals::ord::{inscription_id::InscriptionId, sat::Sat, sat_point::SatPoint};
use super::*;

View File

@@ -0,0 +1,56 @@
use hiro_system_kit::slog;
use std::{path::PathBuf, time::Duration};
type Result<T = (), E = anyhow::Error> = std::result::Result<T, E>;
use chainhook_types::BitcoinNetwork;
use crate::{observer::EventObserverConfig, utils::Context};
pub mod blocktime;
pub mod chain;
pub mod deserialize_from_str;
pub mod epoch;
pub mod height;
pub mod indexing;
pub mod inscription_id;
pub mod sat;
pub mod sat_point;
const DIFFCHANGE_INTERVAL: u64 =
bitcoincore_rpc::bitcoin::blockdata::constants::DIFFCHANGE_INTERVAL as u64;
const SUBSIDY_HALVING_INTERVAL: u64 =
bitcoincore_rpc::bitcoin::blockdata::constants::SUBSIDY_HALVING_INTERVAL as u64;
const CYCLE_EPOCHS: u64 = 6;
pub fn initialize_ordinal_index(
config: &EventObserverConfig,
index_path: Option<PathBuf>,
ctx: &Context,
) -> Result<self::indexing::OrdinalIndex, String> {
let chain = match &config.bitcoin_network {
BitcoinNetwork::Mainnet => chain::Chain::Mainnet,
BitcoinNetwork::Testnet => chain::Chain::Testnet,
BitcoinNetwork::Regtest => chain::Chain::Regtest,
};
let index_options = self::indexing::Options {
rpc_username: config.bitcoin_node_username.clone(),
rpc_password: config.bitcoin_node_password.clone(),
data_dir: config.cache_path.clone().into(),
chain: chain,
first_inscription_height: Some(chain.first_inscription_height()),
height_limit: None,
index: index_path,
rpc_url: config.bitcoin_node_rpc_url.clone(),
};
let index = match self::indexing::OrdinalIndex::open(&index_options) {
Ok(index) => index,
Err(e) => {
ctx.try_log(|logger| {
slog::error!(logger, "Unable to open ordinal index: {}", e.to_string())
});
std::process::exit(1);
}
};
Ok(index)
}

View File

@@ -107,6 +107,7 @@ pub fn generate_test_tx_bitcoin_p2pkh_transfer(
ordinal_operations: vec![],
stacks_operations: vec![],
proof: None,
fee: 0,
},
}
}

View File

@@ -10,14 +10,24 @@ use crate::chainhooks::types::{
ChainhookConfig, ChainhookFullSpecification, ChainhookSpecification,
};
use crate::indexer::bitcoin::{retrieve_full_block_breakdown_with_retry, NewBitcoinBlock};
use crate::indexer::ordinals::{indexing::updater::OrdinalIndexUpdater, initialize_ordinal_index};
use crate::indexer::ordinals::db::{
find_inscription_with_satoshi_id, find_inscriptions_at_wached_outpoint,
find_last_inscription_number, get_default_ordinals_db_file_path,
open_readonly_ordinals_db_conn, retrieve_satoshi_point_using_local_storage,
scan_existing_inscriptions_id, store_new_inscription, update_transfered_inscription,
};
use crate::indexer::ordinals::ord::height::Height;
use crate::indexer::ordinals::ord::{
indexing::updater::OrdinalIndexUpdater, initialize_ordinal_index,
};
use crate::indexer::{self, Indexer, IndexerConfig};
use crate::utils::{send_request, Context};
use bitcoincore_rpc::bitcoin::{BlockHash, Txid};
use bitcoincore_rpc::bitcoin::hashes::hex::FromHex;
use bitcoincore_rpc::bitcoin::{Address, BlockHash, Network, Script, Txid};
use bitcoincore_rpc::{Auth, Client, RpcApi};
use chainhook_types::{
BitcoinChainEvent, BitcoinNetwork, BlockIdentifier, StacksChainEvent, StacksNetwork,
TransactionIdentifier,
bitcoin, BitcoinChainEvent, BitcoinNetwork, BlockIdentifier, OrdinalInscriptionTransferData,
OrdinalOperation, StacksChainEvent, StacksNetwork, TransactionIdentifier,
};
use clarity_repl::clarity::util::hash::bytes_to_hex;
use hiro_system_kit;
@@ -32,9 +42,10 @@ use rocket::serde::Deserialize;
use rocket::Shutdown;
use rocket::State;
use rocket_okapi::{openapi, openapi_get_routes, request::OpenApiFromRequest};
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr};
use std::path::PathBuf;
use std::str;
use std::str::FromStr;
use std::sync::mpsc::{Receiver, Sender};
@@ -132,6 +143,23 @@ pub struct EventObserverConfig {
pub stacks_network: StacksNetwork,
}
impl EventObserverConfig {
pub fn get_cache_path_buf(&self) -> PathBuf {
let mut path_buf = PathBuf::new();
path_buf.push(&self.cache_path);
path_buf
}
pub fn get_bitcoin_config(&self) -> BitcoinConfig {
let bitcoin_config = BitcoinConfig {
username: self.bitcoin_node_username.clone(),
password: self.bitcoin_node_password.clone(),
rpc_url: self.bitcoin_node_rpc_url.clone(),
};
bitcoin_config
}
}
#[derive(Deserialize, Debug)]
pub struct ContractReadonlyCall {
pub okay: bool,
@@ -252,17 +280,14 @@ pub async fn start_event_observer(
)
});
let indexer = Indexer::new(
IndexerConfig {
stacks_node_rpc_url: config.stacks_node_rpc_url.clone(),
bitcoin_node_rpc_url: config.bitcoin_node_rpc_url.clone(),
bitcoin_node_rpc_username: config.bitcoin_node_username.clone(),
bitcoin_node_rpc_password: config.bitcoin_node_password.clone(),
stacks_network: StacksNetwork::Devnet,
bitcoin_network: BitcoinNetwork::Regtest,
},
Some(ordinal_index),
);
let indexer = Indexer::new(IndexerConfig {
stacks_node_rpc_url: config.stacks_node_rpc_url.clone(),
bitcoin_node_rpc_url: config.bitcoin_node_rpc_url.clone(),
bitcoin_node_rpc_username: config.bitcoin_node_username.clone(),
bitcoin_node_rpc_password: config.bitcoin_node_password.clone(),
stacks_network: StacksNetwork::Devnet,
bitcoin_network: BitcoinNetwork::Regtest,
});
let log_level = if config.display_logs {
if cfg!(feature = "cli") {
@@ -277,11 +302,7 @@ pub async fn start_event_observer(
let ingestion_port = config.ingestion_port;
let control_port = config.control_port;
let bitcoin_rpc_proxy_enabled = config.bitcoin_rpc_proxy_enabled;
let bitcoin_config = BitcoinConfig {
username: config.bitcoin_node_username.clone(),
password: config.bitcoin_node_password.clone(),
rpc_url: config.bitcoin_node_rpc_url.clone(),
};
let bitcoin_config = config.get_bitcoin_config();
let services_config = ServicesConfig {
stacks_node_url: config.bitcoin_node_rpc_url.clone(),
@@ -438,6 +459,12 @@ pub fn get_bitcoin_proof(
}
}
pub fn pre_process_bitcoin_block() {}
pub fn apply_bitcoin_block() {}
pub fn rollback_bitcoin_block() {}
pub async fn start_observer_commands_handler(
config: EventObserverConfig,
chainhook_store: Arc<RwLock<ChainhookStore>>,
@@ -451,6 +478,7 @@ pub async fn start_observer_commands_handler(
let event_handlers = config.event_handlers.clone();
let mut chainhooks_lookup: HashMap<String, ApiKey> = HashMap::new();
let networks = (&config.bitcoin_network, &config.stacks_network);
let ordinals_db_conn = open_readonly_ordinals_db_conn(&config.get_cache_path_buf())?;
loop {
let command = match observer_commands_rx.recv() {
Ok(cmd) => cmd,
@@ -476,10 +504,278 @@ pub async fn start_observer_commands_handler(
}
break;
}
ObserverCommand::PropagateBitcoinChainEvent(chain_event) => {
// ObserverCommand::ProcessBitcoinBlock?
ObserverCommand::PropagateBitcoinChainEvent(mut chain_event) => {
ctx.try_log(|logger| {
slog::info!(logger, "Handling PropagateBitcoinChainEvent command")
});
// Update Chain event before propagation
match chain_event {
BitcoinChainEvent::ChainUpdatedWithBlocks(ref mut new_blocks) => {
// Look for inscription transfered
let storage_conn =
open_readonly_ordinals_db_conn(&config.get_cache_path_buf()).unwrap(); // TODO(lgalabru)
for new_block in new_blocks.new_blocks.iter_mut() {
let mut coinbase_offset = 0;
let coinbase_txid = &new_block.transactions[0]
.transaction_identifier
.hash
.clone();
let coinbase_subsidy =
Height(new_block.block_identifier.index).subsidy();
for new_tx in new_block.transactions.iter_mut().skip(1) {
let mut ordinals_events_indexes_to_discard = VecDeque::new();
// Have a new inscription been revealed, if so, are looking at a re-inscription
for (ordinal_event_index, ordinal_event) in
new_tx.metadata.ordinal_operations.iter_mut().enumerate()
{
if let OrdinalOperation::InscriptionRevealed(inscription) =
ordinal_event
{
// Are we looking at a re-inscription?
let res = retrieve_satoshi_point_using_local_storage(
&storage_conn,
&new_block.block_identifier,
&new_tx.transaction_identifier,
&ctx,
);
let (block_height, offset) = match res {
Ok(res) => res,
Err(e) => {
continue;
}
};
let satoshi_id = format!("{}:{}", block_height, offset);
if let Some(entry) = find_inscription_with_satoshi_id(
&satoshi_id,
&storage_conn,
&ctx,
) {
ctx.try_log(|logger| {
slog::warn!(logger, "Transaction {} in block {} is overriding an existing inscription {}", new_tx.transaction_identifier.hash, new_block.block_identifier.index, satoshi_id);
});
ordinals_events_indexes_to_discard
.push_front(ordinal_event_index);
} else {
inscription.ordinal_block_height = block_height;
inscription.ordinal_offset = offset;
inscription.ordinal_number = 0;
inscription.inscription_number =
match find_last_inscription_number(
&storage_conn,
&ctx,
) {
Ok(inscription_number) => inscription_number,
Err(e) => {
continue;
}
};
ctx.try_log(|logger| {
slog::info!(logger, "Transaction {} in block {} inclues a new inscription {}", new_tx.transaction_identifier.hash, new_block.block_identifier.index, satoshi_id);
});
{
let storage_rw_conn =
open_readonly_ordinals_db_conn(
&config.get_cache_path_buf(),
)
.unwrap(); // TODO(lgalabru)
store_new_inscription(
&inscription,
&storage_rw_conn,
&ctx,
)
}
}
}
}
// Have inscriptions been transfered?
let mut sats_in_offset = 0;
let mut sats_out_offset = 0;
for input in new_tx.metadata.inputs.iter() {
// input.previous_output.txid
let outpoint_pre_transfer = format!(
"{}:{}",
input.previous_output.txid, input.previous_output.vout
);
let mut post_transfer_output_index = 0;
let mut post_transfer_offset = 0;
for (inscription_id, inscription_number, satoshi_id, offset) in
find_inscriptions_at_wached_outpoint(
&outpoint_pre_transfer,
&ordinals_db_conn,
)
.into_iter()
{
// At this point we know that inscriptions are being moved.
// Question is: are inscriptions moving to a new output, burnt or lost in fees and transfered to the miner?
let post_transfer_output = loop {
if sats_out_offset >= sats_in_offset + offset {
break Some(post_transfer_output_index);
}
if post_transfer_output_index
>= new_tx.metadata.outputs.len()
{
break None;
}
sats_out_offset += new_tx.metadata.outputs
[post_transfer_output_index]
.value;
post_transfer_output_index += 1;
};
let (
outpoint_post_transfer,
offset_post_transfer,
updated_address,
) = match post_transfer_output {
Some(index) => {
let outpoint = format!(
"{}:{}",
new_tx.transaction_identifier.hash, index
);
let offset = 0;
let script_pub_key_hex = new_tx.metadata.outputs
[index]
.get_script_pubkey_hex();
let updated_address =
match Script::from_hex(&script_pub_key_hex) {
Ok(script) => match Address::from_script(
&script,
Network::Bitcoin,
) {
Ok(address) => {
Some(address.to_string())
}
Err(e) => {
// todo(lgalabru log error)
None
}
},
Err(e) => {
// todo(lgalabru log error)
None
}
};
// let vout = new_tx.metadata.outputs[index];
(outpoint, offset, updated_address)
}
None => {
// Get Coinbase TX
let offset = coinbase_subsidy + coinbase_offset;
let outpoint = coinbase_txid.clone();
(outpoint, offset, None)
}
};
// Compute the offset / new txin:vout to watch
let outpoint_post_transfer = format!(
"{}:{}",
input.previous_output.txid, input.previous_output.vout
);
// Update watched outpoint
{
let storage_rw_conn = open_readonly_ordinals_db_conn(
&config.get_cache_path_buf(),
)
.unwrap(); // TODO(lgalabru)
update_transfered_inscription(
&inscription_id,
&outpoint_post_transfer,
offset_post_transfer,
&storage_rw_conn,
&ctx,
);
}
let event_data = OrdinalInscriptionTransferData {
inscription_id,
inscription_number,
satoshi_id,
updated_address,
outpoint_pre_transfer: outpoint_pre_transfer.clone(),
outpoint_post_transfer,
};
// Attach transfer event
new_tx.metadata.ordinal_operations.push(
OrdinalOperation::InscriptionTransfered(event_data),
);
}
sats_in_offset += input.previous_output.value;
}
// - clean new_tx.metadata.ordinal_operations with ordinals_events_indexes_to_ignore
for index in ordinals_events_indexes_to_discard.into_iter() {
new_tx.metadata.ordinal_operations.remove(index);
}
coinbase_offset += new_tx.metadata.fee;
}
// TODO:
// - persist compacted block in table blocks
}
}
BitcoinChainEvent::ChainUpdatedWithReorg(ref mut reorg) => {
// let path
for block_to_rollback in reorg.blocks_to_rollback.iter() {
for tx_to_rollback in block_to_rollback.transactions.iter() {
// Have a new inscription been revealed?
// Are we looking at a re-inscription?
// Have inscriptions been transfered?
// for input in tx_to_rollback.metadata.inputs.iter() {
// // input.previous_output.txid
// let txin = format!("{}", input.previous_output.txid);
// if let Some(_entry) =
// scan_outpoints_to_watch_with_txin(&txin, &ordinals_db_conn)
// {
// // Compute offset
// // Update watched outpoint
// // Attach transfer event
// }
// }
}
// TODO:
// - clean new_tx.metadata.ordinal_operations with ordinals_events_indexes_to_ignore
// - remove any newly, valid, revealed inscription
// - remove compacted block from table blocks
}
for block_to_apply in reorg.blocks_to_apply.iter() {
for tx_to_apply in block_to_apply.transactions.iter() {
// Have a new inscription been revealed?
// Are we looking at a re-inscription?
// Have inscriptions been transfered?
// for input in tx_to_apply.metadata.inputs.iter() {
// // input.previous_output.txid
// let txin = format!("{}", input.previous_output.txid);
// if let Some(_entry) =
// scan_outpoints_to_watch_with_txin(&txin, &ordinals_db_conn)
// {
// // Compute offset
// // Update watched outpoint
// // Attach transfer event
// }
// }
}
}
}
}
for event_handler in event_handlers.iter() {
event_handler.propagate_bitcoin_event(&chain_event).await;
}

View File

@@ -13,3 +13,4 @@ serde_json = "1"
serde_derive = "1"
strum = { version = "0.23.0", features = ["derive"] }
schemars = { version = "0.8.10" }
hex = "0.4.3"

View File

@@ -35,6 +35,20 @@ pub struct OutPoint {
pub txid: String,
/// The index of the referenced output in its transaction's vout.
pub vout: u32,
/// The value of the referenced.
pub value: u64,
/// The script which must be satisfied for the output to be spent.
pub block_height: u64,
}
impl TxOut {
pub fn get_script_pubkey_bytes(&self) -> Vec<u8> {
hex::decode(&self.get_script_pubkey_hex()).expect("not provided for coinbase txs")
}
pub fn get_script_pubkey_hex(&self) -> &str {
&self.script_pubkey[2..]
}
}
/// The Witness is the data used to unlock bitcoins since the [segwit upgrade](https://github.com/bitcoin/bips/blob/master/bip-0143.mediawiki)

View File

@@ -270,6 +270,7 @@ pub struct BitcoinTransactionMetadata {
pub stacks_operations: Vec<StacksBaseChainOperation>,
pub ordinal_operations: Vec<OrdinalOperation>,
pub proof: Option<String>,
pub fee: u64,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
@@ -280,7 +281,14 @@ pub enum OrdinalOperation {
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct OrdinalInscriptionTransferData {}
pub struct OrdinalInscriptionTransferData {
pub inscription_number: u64,
pub inscription_id: String,
pub satoshi_id: String,
pub updated_address: Option<String>,
pub outpoint_pre_transfer: String,
pub outpoint_post_transfer: String,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct OrdinalInscriptionRevealData {
@@ -290,10 +298,11 @@ pub struct OrdinalInscriptionRevealData {
pub inscription_number: u64,
pub inscription_fee: u64,
pub inscription_id: String,
pub inscription_authors: Vec<String>,
pub inscriber_address: Option<String>,
pub ordinal_number: u64,
pub ordinal_block_height: u64,
pub ordinal_offset: u64,
pub outpoint_post_inscription: String,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]