feat: improve ordinal scan efficiency

This commit is contained in:
Ludo Galabru
2023-03-03 18:49:04 -05:00
parent 3b01a48f1e
commit e510d4bd09
9 changed files with 452 additions and 79 deletions

View File

@@ -1,9 +1,24 @@
use crate::config::Config;
use chainhook_event_observer::chainhooks::types::ChainhookConfig;
use crate::node::ordinals::inscription_id::InscriptionId;
use chainhook_event_observer::bitcoincore_rpc::bitcoin::BlockHash;
use chainhook_event_observer::bitcoincore_rpc::jsonrpc;
use chainhook_event_observer::chainhooks::bitcoin::{
handle_bitcoin_hook_action, BitcoinChainhookOccurrence, BitcoinTriggerChainhook,
};
use chainhook_event_observer::chainhooks::types::{
BitcoinPredicateType, ChainhookConfig, OrdinalOperations, Protocols,
};
use chainhook_event_observer::indexer::ordinals::indexing::entry::Entry;
use chainhook_event_observer::indexer::ordinals::indexing::{
HEIGHT_TO_BLOCK_HASH, INSCRIPTION_NUMBER_TO_INSCRIPTION_ID,
};
use chainhook_event_observer::indexer::ordinals::{self, initialize_ordinal_index};
use chainhook_event_observer::indexer::{self, BitcoinChainContext};
use chainhook_event_observer::observer::{
start_event_observer, EventObserverConfig, ObserverEvent,
};
use chainhook_event_observer::utils::Context;
use chainhook_event_observer::redb::ReadableTable;
use chainhook_event_observer::utils::{file_append, send_request, Context};
use chainhook_event_observer::{
chainhooks::stacks::{
evaluate_stacks_predicate_on_transaction, handle_stacks_hook_action,
@@ -15,8 +30,10 @@ use chainhook_types::{
BlockIdentifier, StacksBlockData, StacksBlockMetadata, StacksChainEvent, StacksTransactionData,
};
use redis::{Commands, Connection};
use std::collections::{HashMap, HashSet};
use reqwest::Client as HttpClient;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::mpsc::channel;
use std::time::Duration;
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
@@ -31,7 +48,7 @@ impl Node {
Self { config, ctx }
}
pub fn run(&mut self) {
pub async fn run(&mut self) -> Result<(), String> {
let mut chainhook_config = ChainhookConfig::new();
{
@@ -102,6 +119,14 @@ impl Node {
"Listening for chainhook predicate registrations on port {}", DEFAULT_CONTROL_PORT
);
let ordinal_index = match initialize_ordinal_index(&event_observer_config, &self.ctx) {
Ok(index) => index,
Err(e) => {
panic!()
}
};
let mut bitcoin_context = BitcoinChainContext::new(Some(ordinal_index));
let context_cloned = self.ctx.clone();
let _ = std::thread::spawn(move || {
let future = start_event_observer(
@@ -281,12 +306,214 @@ impl Node {
}
info!(self.ctx.expect_logger(), "Stacks chainhook {} scan completed: action triggered by {} transactions", stacks_hook.uuid, total_hits);
}
ChainhookSpecification::Bitcoin(_bitcoin_hook) => {
// ordinal_index
warn!(
self.ctx.expect_logger(),
"Bitcoin chainhook evaluation unavailable for historical data"
);
ChainhookSpecification::Bitcoin(predicate_spec) => {
let mut inscriptions_hints = BTreeMap::new();
let mut use_hinting = false;
if let BitcoinPredicateType::Protocol(Protocols::Ordinal(
OrdinalOperations::InscriptionRevealed,
)) = &predicate_spec.predicate
{
if let Some(ref ordinal_index) = bitcoin_context.ordinal_index {
for (inscription_number, inscription_id) in ordinal_index
.database
.begin_read()
.unwrap()
.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)
.unwrap()
.iter()
.unwrap()
{
let inscription =
InscriptionId::load(*inscription_id.value());
println!(
"{} -> {}",
inscription_number.value(),
inscription
);
let entry = ordinal_index
.get_inscription_entry(inscription)
.unwrap()
.unwrap();
println!("{:?}", entry);
let blockhash = ordinal_index
.database
.begin_read()
.unwrap()
.open_table(HEIGHT_TO_BLOCK_HASH)
.unwrap()
.get(&entry.height)
.unwrap()
.map(|k| BlockHash::load(*k.value()))
.unwrap();
inscriptions_hints.insert(entry.height, blockhash);
use_hinting = true;
}
}
}
let start_block = match predicate_spec.start_block {
Some(n) => n,
None => 0,
};
let end_block = match predicate_spec.end_block {
Some(n) => n,
None => {
let body = json!({
"jsonrpc": "1.0",
"id": "chainhook-cli",
"method": "getblockcount",
"params": json!([])
});
let http_client = HttpClient::builder()
.timeout(Duration::from_secs(20))
.build()
.expect("Unable to build http client");
http_client
.post(&self.config.network.bitcoin_node_rpc_url)
.basic_auth(
&self.config.network.bitcoin_node_rpc_username,
Some(&self.config.network.bitcoin_node_rpc_password),
)
.header("Content-Type", "application/json")
.header(
"Host",
&self.config.network.bitcoin_node_rpc_url[7..],
)
.json(&body)
.send()
.await
.map_err(|e| format!("unable to send request ({})", e))?
.json::<jsonrpc::Response>()
.await
.map_err(|e| format!("unable to parse response ({})", e))?
.result::<u64>()
.map_err(|e| format!("unable to parse response ({})", e))?
}
};
let mut total_hits = vec![];
for cursor in start_block..=end_block {
let block_hash = if use_hinting {
match inscriptions_hints.remove(&cursor) {
Some(block_hash) => block_hash.to_string(),
None => continue,
}
} else {
let body = json!({
"jsonrpc": "1.0",
"id": "chainhook-cli",
"method": "getblockhash",
"params": [cursor]
});
let http_client = HttpClient::builder()
.timeout(Duration::from_secs(20))
.build()
.expect("Unable to build http client");
http_client
.post(&self.config.network.bitcoin_node_rpc_url)
.basic_auth(
&self.config.network.bitcoin_node_rpc_username,
Some(&self.config.network.bitcoin_node_rpc_password),
)
.header("Content-Type", "application/json")
.header(
"Host",
&self.config.network.bitcoin_node_rpc_url[7..],
)
.json(&body)
.send()
.await
.map_err(|e| format!("unable to send request ({})", e))?
.json::<jsonrpc::Response>()
.await
.map_err(|e| format!("unable to parse response ({})", e))?
.result::<String>()
.map_err(|e| format!("unable to parse response ({})", e))?
};
let body = json!({
"jsonrpc": "1.0",
"id": "chainhook-cli",
"method": "getblock",
"params": [block_hash, 2]
});
let http_client = HttpClient::builder()
.timeout(Duration::from_secs(20))
.build()
.expect("Unable to build http client");
let raw_block = http_client
.post(&self.config.network.bitcoin_node_rpc_url)
.basic_auth(
&self.config.network.bitcoin_node_rpc_username,
Some(&self.config.network.bitcoin_node_rpc_password),
)
.header("Content-Type", "application/json")
.header("Host", &self.config.network.bitcoin_node_rpc_url[7..])
.json(&body)
.send()
.await
.map_err(|e| format!("unable to send request ({})", e))?
.json::<jsonrpc::Response>()
.await
.map_err(|e| format!("unable to parse response ({})", e))?
.result::<indexer::bitcoin::Block>()
.map_err(|e| format!("unable to parse response ({})", e))?;
let block = indexer::bitcoin::standardize_bitcoin_block(
&self.config.network,
cursor,
raw_block,
&mut bitcoin_context,
&self.ctx,
)?;
let mut hits = vec![];
for tx in block.transactions.iter() {
if predicate_spec.predicate.evaluate_transaction_predicate(&tx)
{
info!(
self.ctx.expect_logger(),
"Action #{} triggered by transaction {} (block #{})",
predicate_spec.uuid,
tx.transaction_identifier.hash,
cursor
);
hits.push(tx);
total_hits.push(tx.transaction_identifier.hash.to_string());
}
}
if hits.len() > 0 {
let trigger = BitcoinTriggerChainhook {
chainhook: &predicate_spec,
apply: vec![(hits, &block)],
rollback: vec![],
};
let proofs = HashMap::new();
match handle_bitcoin_hook_action(trigger, &proofs) {
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to handle action {}", e
);
}
Ok(BitcoinChainhookOccurrence::Http(request)) => {
send_request(request, &self.ctx).await;
}
Ok(BitcoinChainhookOccurrence::File(path, bytes)) => {
file_append(path, bytes, &self.ctx)
}
Ok(BitcoinChainhookOccurrence::Data(_payload)) => {
unreachable!()
}
}
}
}
}
}
}
@@ -323,6 +550,7 @@ impl Node {
_ => {}
}
}
Ok(())
}
}

View File

@@ -1,26 +1,105 @@
use crate::config::Config;
use chainhook_event_observer::bitcoincore_rpc::bitcoin::BlockHash;
use chainhook_event_observer::bitcoincore_rpc::{jsonrpc, RpcApi};
use chainhook_event_observer::bitcoincore_rpc::{Auth, Client};
use chainhook_event_observer::chainhooks::bitcoin::{
handle_bitcoin_hook_action, BitcoinChainhookOccurrence, BitcoinTriggerChainhook,
};
use chainhook_event_observer::chainhooks::types::BitcoinChainhookFullSpecification;
use chainhook_event_observer::indexer::ordinals::indexing::updater::OrdinalIndexUpdater;
use chainhook_event_observer::chainhooks::types::{
BitcoinChainhookFullSpecification, BitcoinPredicateType, OrdinalOperations, Protocols,
};
use chainhook_event_observer::indexer::ordinals::indexing::entry::Entry;
use chainhook_event_observer::indexer::ordinals::indexing::{
HEIGHT_TO_BLOCK_HASH, INSCRIPTION_NUMBER_TO_INSCRIPTION_ID,
};
use chainhook_event_observer::indexer::ordinals::initialize_ordinal_index;
use chainhook_event_observer::indexer::ordinals::inscription_id::InscriptionId;
use chainhook_event_observer::indexer::{self, BitcoinChainContext};
use chainhook_event_observer::observer::{
EventObserverConfig, DEFAULT_CONTROL_PORT, DEFAULT_INGESTION_PORT,
};
use chainhook_event_observer::redb::ReadableTable;
use chainhook_event_observer::utils::{file_append, send_request, Context};
use std::collections::{HashMap, HashSet};
use reqwest::Client as HttpClient;
use rusqlite::{Connection, Result, ToSql};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::time::Duration;
#[derive(Debug)]
pub struct Inscription {
inscription_id: String,
fee: u32,
number: u32,
ordinal_number: u64,
}
pub fn initialize_inscription_cache() -> Connection {
let conn = Connection::open_in_memory().unwrap();
conn.execute(
"CREATE TABLE inscriptions (
inscription_id TEXT PRIMARY KEY,
fee INTEGER NOT NULL,
number INTEGER NOT NULL,
ordinal_number INTEGER NOT NULL
)",
[],
)
.unwrap();
conn
}
pub fn retrieve_inscription_from_cache(
inscription_id: &str,
storage_conn: &Connection,
) -> Option<Inscription> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let mut stmt = storage_conn.prepare("SELECT inscription_id, fee, number, ordinal_number FROM inscriptions WHERE inscription_id = ?1").unwrap();
let inscription_iter = stmt
.query_map(args, |row| {
Ok(Inscription {
inscription_id: row.get(0).unwrap(),
fee: row.get(1).unwrap(),
number: row.get(2).unwrap(),
ordinal_number: row.get(3).unwrap(),
})
})
.unwrap();
for inscription in inscription_iter {
return Some(inscription.unwrap());
}
return None;
}
pub fn write_inscription_to_cache(inscription: &Inscription, storage_conn: &Connection) {
storage_conn.execute(
"INSERT INTO inscriptions (inscription_id, fee, number, ordinal_number) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![&inscription.inscription_id, &inscription.fee, &inscription.number, &inscription.ordinal_number],
).unwrap();
}
pub async fn scan_bitcoin_chain_with_predicate(
predicate: BitcoinChainhookFullSpecification,
apply: bool,
config: &Config,
ctx: &Context,
) -> Result<(), String> {
let conn = initialize_inscription_cache();
let insc = Inscription {
inscription_id: "1".into(),
fee: 1,
number: 1,
ordinal_number: 1,
};
write_inscription_to_cache(&insc, &conn);
println!(
"{:?}",
retrieve_inscription_from_cache(&insc.inscription_id, &conn)
);
let auth = Auth::UserPass(
config.network.bitcoin_node_rpc_username.clone(),
config.network.bitcoin_node_rpc_password.clone(),
@@ -73,7 +152,77 @@ pub async fn scan_bitcoin_chain_with_predicate(
end_block,
apply
);
use reqwest::Client as HttpClient;
// Optimization: we will use the ordinal storage to provide a set of hints.
let mut inscriptions_hints = BTreeMap::new();
let mut use_hinting = false;
let ordinal_index = if let BitcoinPredicateType::Protocol(Protocols::Ordinal(
OrdinalOperations::InscriptionRevealed,
)) = &predicate_spec.predicate
{
let event_observer_config = EventObserverConfig {
normalization_enabled: true,
grpc_server_enabled: false,
hooks_enabled: true,
bitcoin_rpc_proxy_enabled: true,
event_handlers: vec![],
chainhook_config: None,
ingestion_port: DEFAULT_INGESTION_PORT,
control_port: DEFAULT_CONTROL_PORT,
bitcoin_node_username: config.network.bitcoin_node_rpc_username.clone(),
bitcoin_node_password: config.network.bitcoin_node_rpc_password.clone(),
bitcoin_node_rpc_url: config.network.bitcoin_node_rpc_url.clone(),
stacks_node_rpc_url: config.network.stacks_node_rpc_url.clone(),
operators: HashSet::new(),
display_logs: false,
cache_path: "cache/tmp".to_string(),
bitcoin_network: config.network.bitcoin_network.clone(),
};
let ordinal_index = match initialize_ordinal_index(&event_observer_config, &ctx) {
Ok(index) => index,
Err(e) => {
panic!()
}
};
for (inscription_number, inscription_id) in ordinal_index
.database
.begin_read()
.unwrap()
.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)
.unwrap()
.iter()
.unwrap()
{
let inscription = InscriptionId::load(*inscription_id.value());
println!("{} -> {}", inscription_number.value(), inscription);
let entry = ordinal_index
.get_inscription_entry(inscription)
.unwrap()
.unwrap();
println!("{:?}", entry);
let blockhash = ordinal_index
.database
.begin_read()
.unwrap()
.open_table(HEIGHT_TO_BLOCK_HASH)
.unwrap()
.get(&entry.height)
.unwrap()
.map(|k| BlockHash::load(*k.value()))
.unwrap();
inscriptions_hints.insert(entry.height, blockhash);
use_hinting = true;
}
Some(ordinal_index)
} else {
None
};
let mut bitcoin_context = BitcoinChainContext::new(ordinal_index);
let mut total_hits = vec![];
for cursor in start_block..=end_block {
@@ -82,33 +231,40 @@ pub async fn scan_bitcoin_chain_with_predicate(
"Evaluating predicate #{} on block #{}", predicate_uuid, cursor
);
let body = json!({
"jsonrpc": "1.0",
"id": "chainhook-cli",
"method": "getblockhash",
"params": [cursor]
});
let http_client = HttpClient::builder()
.timeout(Duration::from_secs(20))
.build()
.expect("Unable to build http client");
let block_hash = http_client
.post(&config.network.bitcoin_node_rpc_url)
.basic_auth(
&config.network.bitcoin_node_rpc_username,
Some(&config.network.bitcoin_node_rpc_password),
)
.header("Content-Type", "application/json")
.header("Host", &config.network.bitcoin_node_rpc_url[7..])
.json(&body)
.send()
.await
.map_err(|e| format!("unable to send request ({})", e))?
.json::<jsonrpc::Response>()
.await
.map_err(|e| format!("unable to parse response ({})", e))?
.result::<String>()
.map_err(|e| format!("unable to parse response ({})", e))?;
let block_hash = if use_hinting {
match inscriptions_hints.remove(&cursor) {
Some(block_hash) => block_hash.to_string(),
None => continue,
}
} else {
let body = json!({
"jsonrpc": "1.0",
"id": "chainhook-cli",
"method": "getblockhash",
"params": [cursor]
});
let http_client = HttpClient::builder()
.timeout(Duration::from_secs(20))
.build()
.expect("Unable to build http client");
http_client
.post(&config.network.bitcoin_node_rpc_url)
.basic_auth(
&config.network.bitcoin_node_rpc_username,
Some(&config.network.bitcoin_node_rpc_password),
)
.header("Content-Type", "application/json")
.header("Host", &config.network.bitcoin_node_rpc_url[7..])
.json(&body)
.send()
.await
.map_err(|e| format!("unable to send request ({})", e))?
.json::<jsonrpc::Response>()
.await
.map_err(|e| format!("unable to parse response ({})", e))?
.result::<String>()
.map_err(|e| format!("unable to parse response ({})", e))?
};
let body = json!({
"jsonrpc": "1.0",
@@ -138,29 +294,6 @@ pub async fn scan_bitcoin_chain_with_predicate(
.result::<indexer::bitcoin::Block>()
.map_err(|e| format!("unable to parse response ({})", e))?;
let event_observer_config = EventObserverConfig {
normalization_enabled: true,
grpc_server_enabled: false,
hooks_enabled: true,
bitcoin_rpc_proxy_enabled: true,
event_handlers: vec![],
chainhook_config: None,
ingestion_port: DEFAULT_INGESTION_PORT,
control_port: DEFAULT_CONTROL_PORT,
bitcoin_node_username: config.network.bitcoin_node_rpc_username.clone(),
bitcoin_node_password: config.network.bitcoin_node_rpc_password.clone(),
bitcoin_node_rpc_url: config.network.bitcoin_node_rpc_url.clone(),
stacks_node_rpc_url: config.network.stacks_node_rpc_url.clone(),
operators: HashSet::new(),
display_logs: false,
cache_path: config.storage.cache_path.clone(),
bitcoin_network: config.network.bitcoin_network.clone(),
};
let ordinal_index = initialize_ordinal_index(&event_observer_config, ctx)?;
let mut bitcoin_context = BitcoinChainContext::new(Some(ordinal_index));
let block = indexer::bitcoin::standardize_bitcoin_block(
&config.network,
cursor,

View File

@@ -36,7 +36,7 @@ impl StacksChainContext {
}
pub struct BitcoinChainContext {
ordinal_index: Option<OrdinalIndex>,
pub ordinal_index: Option<OrdinalIndex>,
}
impl BitcoinChainContext {

View File

@@ -9,7 +9,7 @@ use super::*;
#[derive(Default, Copy, Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub(crate) enum Chain {
pub enum Chain {
#[default]
Mainnet,
Testnet,
@@ -18,6 +18,14 @@ pub(crate) enum Chain {
}
impl Chain {
pub fn from_bitcoin_network(network: &BitcoinNetwork) -> Chain {
match network {
BitcoinNetwork::Mainnet => Chain::Mainnet,
BitcoinNetwork::Testnet => Chain::Testnet,
BitcoinNetwork::Regtest => Chain::Regtest,
}
}
pub(crate) fn network(self) -> Network {
match self {
Self::Mainnet => Network::Bitcoin,
@@ -43,7 +51,7 @@ impl Chain {
}
}
pub(crate) fn first_inscription_height(self) -> u64 {
pub fn first_inscription_height(self) -> u64 {
match self {
Self::Mainnet => 767430,
Self::Regtest => 0,
@@ -56,7 +64,7 @@ impl Chain {
bitcoin::blockdata::constants::genesis_block(self.network())
}
pub(crate) fn address_from_script(
pub fn address_from_script(
self,
script: &Script,
) -> Result<Address, bitcoin::util::address::Error> {

View File

@@ -8,7 +8,7 @@ use bitcoincore_rpc::bitcoin::{
use crate::indexer::ordinals::{inscription_id::InscriptionId, sat::Sat, sat_point::SatPoint};
pub(super) trait Entry: Sized {
pub trait Entry: Sized {
type Value;
fn load(value: Self::Value) -> Self;
@@ -16,7 +16,7 @@ pub(super) trait Entry: Sized {
fn store(self) -> Self::Value;
}
pub(super) type BlockHashValue = [u8; 32];
pub type BlockHashValue = [u8; 32];
impl Entry for BlockHash {
type Value = BlockHashValue;
@@ -30,6 +30,7 @@ impl Entry for BlockHash {
}
}
#[derive(Debug)]
pub struct InscriptionEntry {
pub fee: u64,
pub height: u64,
@@ -71,7 +72,7 @@ impl Entry for InscriptionEntry {
}
}
pub(super) type InscriptionIdValue = [u8; 36];
pub type InscriptionIdValue = [u8; 36];
impl Entry for InscriptionId {
type Value = InscriptionIdValue;

View File

@@ -41,7 +41,7 @@ const SCHEMA_VERSION: u64 = 2;
macro_rules! define_table {
($name:ident, $key:ty, $value:ty) => {
const $name: TableDefinition<$key, $value> = TableDefinition::new(stringify!($name));
pub const $name: TableDefinition<$key, $value> = TableDefinition::new(stringify!($name));
};
}
@@ -75,7 +75,7 @@ pub(crate) struct Options {
pub struct OrdinalIndex {
auth: Auth,
client: Client,
database: Database,
pub database: Database,
path: PathBuf,
first_inscription_height: u64,
genesis_block_coinbase_transaction: Transaction,
@@ -376,7 +376,7 @@ impl OrdinalIndex {
self.reorged.load(atomic::Ordering::Relaxed)
}
fn begin_read(&self) -> Result<rtx::Rtx> {
pub fn begin_read(&self) -> Result<rtx::Rtx> {
Ok(rtx::Rtx(self.database.begin_read()?))
}

View File

@@ -2,7 +2,7 @@ use crate::indexer::ordinals::height::Height;
use super::*;
pub(crate) struct Rtx<'a>(pub(crate) redb::ReadTransaction<'a>);
pub struct Rtx<'a>(pub(crate) redb::ReadTransaction<'a>);
impl Rtx<'_> {
pub(crate) fn height(&self) -> anyhow::Result<Option<Height>> {

View File

@@ -1,5 +1,5 @@
mod blocktime;
mod chain;
pub mod chain;
mod deserialize_from_str;
mod epoch;
mod height;
@@ -8,7 +8,7 @@ pub mod inscription;
pub mod inscription_id;
pub mod sat;
mod sat_point;
use hiro_system_kit::slog;
use std::time::Duration;
type Result<T = (), E = anyhow::Error> = std::result::Result<T, E>;
@@ -45,8 +45,10 @@ pub fn initialize_ordinal_index(
let index = match self::indexing::OrdinalIndex::open(&index_options) {
Ok(index) => index,
Err(e) => {
println!("unable to open ordinal index: {}", e.to_string());
panic!()
ctx.try_log(|logger| {
slog::error!(logger, "Unable to open ordinal index: {}", e.to_string())
});
std::process::exit(1);
}
};
Ok(index)

View File

@@ -13,6 +13,7 @@ extern crate serde_derive;
extern crate serde_json;
pub extern crate bitcoincore_rpc;
pub extern crate redb;
pub mod chainhooks;
pub mod indexer;