diff --git a/components/chainhook-event-observer/src/indexer/bitcoin/mod.rs b/components/chainhook-event-observer/src/indexer/bitcoin/mod.rs index e925427..394c03e 100644 --- a/components/chainhook-event-observer/src/indexer/bitcoin/mod.rs +++ b/components/chainhook-event-observer/src/indexer/bitcoin/mod.rs @@ -9,26 +9,29 @@ use crate::chainhooks::types::{ use crate::indexer::IndexerConfig; use crate::observer::BitcoinConfig; use crate::utils::Context; -use bitcoincore_rpc::bitcoin::{self, Script}; +use bitcoincore_rpc::bitcoin::{ + self, Address, BlockHeader, OutPoint as OutPointS, PackedLockTime, Script, Transaction, Txid, + Witness, +}; use bitcoincore_rpc_json::{GetRawTransactionResult, GetRawTransactionResultVout}; pub use blocks_pool::BitcoinBlockPool; use chainhook_types::bitcoin::{OutPoint, TxIn, TxOut}; use chainhook_types::{ BitcoinBlockData, BitcoinBlockMetadata, BitcoinTransactionData, BitcoinTransactionMetadata, BlockCommitmentData, BlockIdentifier, KeyRegistrationData, LockSTXData, - OrdinalInscriptionRevealData, OrdinalInscriptionRevealInscriptionData, - OrdinalInscriptionRevealOrdinalData, OrdinalOperation, PobBlockCommitmentData, - PoxBlockCommitmentData, PoxReward, StacksBaseChainOperation, TransactionIdentifier, - TransferSTXData, + OrdinalInscriptionRevealData, OrdinalOperation, PobBlockCommitmentData, PoxBlockCommitmentData, + PoxReward, StacksBaseChainOperation, TransactionIdentifier, TransferSTXData, }; use clarity_repl::clarity::util::hash::to_hex; use hiro_system_kit::slog; use rocket::serde::json::Value as JsonValue; -use super::ordinals::indexing::updater::OrdinalIndexUpdater; +use super::ordinals::indexing::entry::InscriptionEntry; +use super::ordinals::indexing::updater::{BlockData, OrdinalIndexUpdater}; use super::ordinals::indexing::OrdinalIndex; use super::ordinals::inscription::InscriptionParser; use super::ordinals::inscription_id::InscriptionId; +use super::ordinals::sat::Sat; use super::BitcoinChainContext; #[derive(Clone, PartialEq, Debug, Deserialize, Serialize)] @@ -115,25 +118,111 @@ pub fn standardize_bitcoin_block( ctx: &Context, ) -> Result { let mut transactions = vec![]; - ctx.try_log(|logger| slog::info!(logger, "Updating ordinal index",)); + ctx.try_log(|logger| slog::info!(logger, "Updating ordinal index")); let ordinal_index = bitcoin_context.ordinal_index.take(); let ctx_ = ctx.clone(); + + let block_data = if let Some(ref ordinal_index) = ordinal_index { + if let Ok(count) = ordinal_index.block_count() { + ctx.try_log(|logger| slog::info!(logger, "Blocks: {} / {}", count, block.height)); + if count as usize == block.height + 10 { + ctx.try_log(|logger| slog::info!(logger, "Will use cached block")); + + // let bits = hex::decode(block.bits).unwrap(); + // let block_data = BlockData { + // header: BlockHeader { + // version: block.version, + // prev_blockhash: block.previousblockhash, + // merkle_root: block.merkleroot, + // time: block.time as u32, + // bits: u32::from_be_bytes([bits[0], bits[1], bits[2], bits[3]]), + // nonce: block.nonce, + // }, + // txdata: block + // .tx + // .iter() + // .map(|tx| { + // ( + // Transaction { + // version: tx.version as i32, + // lock_time: PackedLockTime(tx.locktime), + // input: tx + // .vin + // .iter() + // .map(|i| bitcoin::TxIn { + // previous_output: match (i.txid, i.vout) { + // (Some(txid), Some(vout)) => { + // OutPointS { txid, vout } + // } + // _ => OutPointS::null(), + // }, + // script_sig: match i.script_sig { + // Some(ref script_sig) => { + // script_sig.script().unwrap() + // } + // None => Script::default(), + // }, + // sequence: bitcoincore_rpc::bitcoin::Sequence( + // i.sequence, + // ), + // witness: Witness::from_vec( + // i.txinwitness.clone().unwrap_or(vec![]), + // ), + // }) + // .collect(), + // output: tx + // .vout + // .iter() + // .map(|o| bitcoin::TxOut { + // value: o.value.to_sat(), + // script_pubkey: o.script_pub_key.script().unwrap(), + // }) + // .collect(), + // }, + // tx.txid, + // ) + // }) + // .collect::>(), + // }; + // ctx.try_log(|logger| slog::info!(logger, "BlockData: {:?}", block_data)); + // Some(block_data) + None + } else { + None + } + } else { + None + } + } else { + None + }; + let handle: JoinHandle> = hiro_system_kit::thread_named("Ordinal index update") .spawn(move || { if let Some(ref ordinal_index) = ordinal_index { match hiro_system_kit::nestable_block_on(OrdinalIndexUpdater::update( &ordinal_index, + block_data, + &ctx_, )) { Ok(_) => { ctx_.try_log(|logger| { - slog::info!(logger, "Ordinal index successfully updated",) + slog::info!( + logger, + "Ordinal index successfully updated {:?}", + ordinal_index.block_count() + ) }); } Err(e) => { ctx_.try_log(|logger| { - slog::error!(logger, "Error updating ordinal index",) + slog::error!( + logger, + "Error updating ordinal index: {}", + e.to_string() + ) }); } }; @@ -276,31 +365,39 @@ fn try_parse_ordinal_operation( Ok(Some(entry)) => entry, _ => { ctx.try_log(|logger| slog::info!(logger, "No inscriptions entry found in index, despite inscription detected in transaction")); - return None; + InscriptionEntry { + fee: 0, + height: 0, + number: 0, + sat: Some(Sat(0)), + timestamp: 0, + } } }; + let authors = tx.vout[0] + .script_pub_key + .addresses + .clone() + .unwrap_or(vec![]); + let sat = &inscription_entry.sat.unwrap(); let no_content_bytes = vec![]; let inscription_content_bytes = inscription.body().unwrap_or(&no_content_bytes); + return Some(OrdinalOperation::InscriptionRevealed( OrdinalInscriptionRevealData { - inscription: OrdinalInscriptionRevealInscriptionData { - content_type: inscription - .content_type() - .unwrap_or("unknown") - .to_string(), - content_bytes: format!("0x{}", to_hex(&inscription_content_bytes)), - content_length: inscription_content_bytes.len(), - inscription_id: inscription_id.to_string(), - inscription_number: inscription_entry.number, - inscription_author: "".into(), - inscription_fee: inscription_entry.fee, - }, - ordinal: Some(OrdinalInscriptionRevealOrdinalData { - ordinal_number: inscription_entry.sat.unwrap().n(), - ordinal_block_height: 0, - ordinal_offset: 0, - }), + content_type: inscription.content_type().unwrap_or("unknown").to_string(), + content_bytes: format!("0x{}", to_hex(&inscription_content_bytes)), + content_length: inscription_content_bytes.len(), + inscription_id: inscription_id.to_string(), + inscription_number: inscription_entry.number, + inscription_authors: authors + .into_iter() + .map(|a| a.to_string()) + .collect::>(), + inscription_fee: inscription_entry.fee, + ordinal_number: sat.n(), + ordinal_block_height: sat.height().n(), }, )); } diff --git a/components/chainhook-event-observer/src/indexer/ordinals/indexing/mod.rs b/components/chainhook-event-observer/src/indexer/ordinals/indexing/mod.rs index 0de9421..2e35d1a 100644 --- a/components/chainhook-event-observer/src/indexer/ordinals/indexing/mod.rs +++ b/components/chainhook-event-observer/src/indexer/ordinals/indexing/mod.rs @@ -1,10 +1,12 @@ use std::{collections::BTreeMap, path::PathBuf}; -use anyhow::Context; +use anyhow::Context as Ctx; use bitcoincore_rpc::bitcoin::{Amount, Block, BlockHash, OutPoint, Transaction, Txid}; use bitcoincore_rpc::RpcApi; use chrono::{DateTime, TimeZone, Utc}; +use crate::utils::Context; + use super::blocktime::Blocktime; use super::chain::Chain; use super::height::Height; @@ -12,7 +14,7 @@ use super::sat_point::SatPoint; use super::{inscription_id::InscriptionId, sat::Sat}; use std::cmp; -mod entry; +pub mod entry; mod fetcher; use { @@ -370,10 +372,6 @@ impl OrdinalIndex { Ok(info) } - pub async fn update(&self) -> Result { - OrdinalIndexUpdater::update(self).await - } - pub fn is_reorged(&self) -> bool { self.reorged.load(atomic::Ordering::Relaxed) } diff --git a/components/chainhook-event-observer/src/indexer/ordinals/indexing/updater.rs b/components/chainhook-event-observer/src/indexer/ordinals/indexing/updater.rs index e5f33f9..825cd23 100644 --- a/components/chainhook-event-observer/src/indexer/ordinals/indexing/updater.rs +++ b/components/chainhook-event-observer/src/indexer/ordinals/indexing/updater.rs @@ -1,6 +1,15 @@ -use crate::indexer::ordinals::{height::Height, sat::Sat, sat_point::SatPoint}; -use anyhow::Context; -use bitcoincore_rpc::bitcoin::{Block, OutPoint, Transaction, Txid}; +use crate::{ + indexer::{ + bitcoin::Block, + ordinals::{height::Height, sat::Sat, sat_point::SatPoint}, + }, + utils::Context, +}; +use anyhow::Context as Ctx; +use bitcoincore_rpc::bitcoin::{ + OutPoint, PackedLockTime, Script, Transaction, TxIn, TxOut, Txid, Witness, +}; +use hiro_system_kit::slog; use std::{ collections::{HashSet, VecDeque}, @@ -18,25 +27,10 @@ use { mod inscription_updater; -struct BlockData { - header: BlockHeader, - txdata: Vec<(Transaction, Txid)>, -} - -impl From for BlockData { - fn from(block: Block) -> Self { - BlockData { - header: block.header, - txdata: block - .txdata - .into_iter() - .map(|transaction| { - let txid = transaction.txid(); - (transaction, txid) - }) - .collect(), - } - } +#[derive(Debug)] +pub struct BlockData { + pub header: BlockHeader, + pub txdata: Vec<(Transaction, Txid)>, } pub struct OrdinalIndexUpdater { @@ -49,7 +43,11 @@ pub struct OrdinalIndexUpdater { } impl OrdinalIndexUpdater { - pub async fn update(index: &OrdinalIndex) -> Result { + pub async fn update( + index: &OrdinalIndex, + block_opt: Option, + ctx: &Context, + ) -> Result { let wtx = index.begin_write()?; let height = wtx @@ -78,17 +76,49 @@ impl OrdinalIndexUpdater { outputs_traversed: 0, }; - updater.update_index(index, wtx).await + if let Some(block) = block_opt { + updater + .update_index_with_block(index, wtx, block, ctx) + .await + } else { + updater.update_index(index, wtx, ctx).await + } + } + + async fn update_index_with_block<'index>( + &mut self, + index: &'index OrdinalIndex, + mut wtx: WriteTransaction<'index>, + block: BlockData, + ctx: &Context, + ) -> Result { + let (mut outpoint_sender, mut value_receiver) = Self::spawn_fetcher(index)?; + + let mut value_cache = HashMap::new(); + self.index_block( + index, + &mut outpoint_sender, + &mut value_receiver, + &mut wtx, + block, + &mut value_cache, + ctx, + ) + .await?; + + self.commit(wtx, value_cache, ctx)?; + Ok(()) } async fn update_index<'index>( &mut self, index: &'index OrdinalIndex, mut wtx: WriteTransaction<'index>, + ctx: &Context, ) -> Result { let starting_height = index.client.get_block_count()? + 1; - let rx = Self::fetch_blocks_from(index, self.height, true)?; + let rx = Self::fetch_blocks_from(index, self.height, ctx)?; let (mut outpoint_sender, mut value_receiver) = Self::spawn_fetcher(index)?; @@ -107,13 +137,14 @@ impl OrdinalIndexUpdater { &mut wtx, block, &mut value_cache, + ctx, ) .await?; uncommitted += 1; if uncommitted == 5000 { - self.commit(wtx, value_cache)?; + self.commit(wtx, value_cache, ctx)?; value_cache = HashMap::new(); uncommitted = 0; wtx = index.begin_write()?; @@ -141,7 +172,7 @@ impl OrdinalIndexUpdater { } if uncommitted > 0 { - self.commit(wtx, value_cache)?; + self.commit(wtx, value_cache, ctx)?; } Ok(()) @@ -150,7 +181,7 @@ impl OrdinalIndexUpdater { fn fetch_blocks_from( index: &OrdinalIndex, mut height: u64, - index_sats: bool, + ctx: &Context, ) -> Result> { let (tx, rx) = mpsc::sync_channel(32); @@ -161,6 +192,7 @@ impl OrdinalIndexUpdater { let first_inscription_height = index.first_inscription_height; + let ctx_ = ctx.clone(); std::thread::spawn(move || loop { if let Some(height_limit) = height_limit { if height >= height_limit { @@ -168,15 +200,12 @@ impl OrdinalIndexUpdater { } } - match Self::get_block_with_retries( - &client, - height, - index_sats, - first_inscription_height, - ) { + match Self::get_block_with_retries(&client, height, first_inscription_height) { Ok(Some(block)) => { if let Err(err) = tx.send(block.into()) { - println!("Block receiver disconnected: {err}"); + ctx_.try_log(|logger| { + slog::error!(logger, "Block receiver disconnected: {err}",) + }); break; } height += 1; @@ -195,9 +224,8 @@ impl OrdinalIndexUpdater { fn get_block_with_retries( client: &Client, height: u64, - index_sats: bool, first_inscription_height: u64, - ) -> Result> { + ) -> Result> { let mut errors = 0; loop { match client @@ -206,13 +234,82 @@ impl OrdinalIndexUpdater { .and_then(|option| { option .map(|hash| { - if index_sats || height >= first_inscription_height { - Ok(client.get_block(&hash)?) + if height >= first_inscription_height { + let block: Block = + client.call("getblock", &[json!(hash), json!(2)])?; + let bits = hex::decode(block.bits).unwrap(); + let block_data = BlockData { + header: BlockHeader { + version: block.version, + prev_blockhash: block.previousblockhash, + merkle_root: block.merkleroot, + time: block.time as u32, + bits: u32::from_be_bytes([ + bits[0], bits[1], bits[2], bits[3], + ]), + nonce: block.nonce, + }, + txdata: block + .tx + .iter() + .map(|tx| { + ( + Transaction { + version: tx.version as i32, + lock_time: PackedLockTime(tx.locktime), + input: tx + .vin + .iter() + .map(|i| TxIn { + previous_output: match (i.txid, i.vout) + { + (Some(txid), Some(vout)) => { + OutPoint { txid, vout } + } + _ => OutPoint::null(), + }, + script_sig: match i.script_sig { + Some(ref script_sig) => { + script_sig.script().unwrap() + } + None => Script::default(), + }, + sequence: + bitcoincore_rpc::bitcoin::Sequence( + i.sequence, + ), + witness: Witness::from_vec( + i.txinwitness + .clone() + .unwrap_or(vec![]), + ), + }) + .collect(), + output: tx + .vout + .iter() + .map(|o| TxOut { + value: o.value.to_sat(), + script_pubkey: o + .script_pub_key + .script() + .unwrap(), + }) + .collect(), + }, + tx.txid, + ) + }) + .collect::>(), + }; + Ok(block_data) } else { - Ok(Block { - header: client.get_block_header(&hash)?, + let header = client.get_block_header(&hash)?; + let block_data = BlockData { + header, txdata: Vec::new(), - }) + }; + Ok(block_data) } }) .transpose() @@ -312,6 +409,7 @@ impl OrdinalIndexUpdater { wtx: &mut WriteTransaction<'_>, block: BlockData, value_cache: &mut HashMap, + ctx: &Context, ) -> Result<()> { // If value_receiver still has values something went wrong with the last block // Could be an assert, shouldn't recover from this and commit the last block @@ -329,12 +427,15 @@ impl OrdinalIndexUpdater { let time = timestamp(block.header.time); - println!( - "Block {} at {} with {} transactions…", - self.height, - time, - block.txdata.len() - ); + ctx.try_log(|logger| { + slog::info!( + logger, + "Block {} at {} with {} transactions…", + self.height, + time, + block.txdata.len() + ) + }); if let Some(prev_height) = self.height.checked_sub(1) { let prev_hash = height_to_block_hash.get(&prev_height)?.unwrap(); @@ -421,6 +522,7 @@ impl OrdinalIndexUpdater { &mut sat_ranges_written, &mut outputs_in_block, &mut inscription_updater, + &ctx, ) .await?; @@ -436,6 +538,7 @@ impl OrdinalIndexUpdater { &mut sat_ranges_written, &mut outputs_in_block, &mut inscription_updater, + &ctx, ) .await?; } @@ -473,11 +576,6 @@ impl OrdinalIndexUpdater { self.height += 1; self.outputs_traversed += outputs_in_block; - // println!( - // "Wrote {sat_ranges_written} sat ranges from {outputs_in_block} outputs in {} ms", - // (Instant::now() - start).as_millis(), - // ); - Ok(()) } @@ -490,9 +588,10 @@ impl OrdinalIndexUpdater { sat_ranges_written: &mut u64, outputs_traversed: &mut u64, inscription_updater: &mut InscriptionUpdater<'_, '_, '_>, + ctx: &Context, ) -> Result { inscription_updater - .index_transaction_inscriptions(tx, txid, Some(input_sat_ranges)) + .index_transaction_inscriptions(tx, txid, Some(input_sat_ranges), ctx) .await?; for (vout, output) in tx.output.iter().enumerate() { @@ -546,21 +645,32 @@ impl OrdinalIndexUpdater { Ok(()) } - fn commit(&mut self, wtx: WriteTransaction, value_cache: HashMap) -> Result { - println!( - "Committing at block height {}, {} outputs traversed, {} in map, {} cached", - self.height, - self.outputs_traversed, - self.range_cache.len(), - self.outputs_cached - ); + fn commit( + &mut self, + wtx: WriteTransaction, + value_cache: HashMap, + ctx: &Context, + ) -> Result { + ctx.try_log(|logger| { + slog::info!( + logger, + "Committing at block height {}, {} outputs traversed, {} in map, {} cached", + self.height, + self.outputs_traversed, + self.range_cache.len(), + self.outputs_cached + ) + }); - println!( - "Flushing {} entries ({:.1}% resulting from {} insertions) from memory to database", - self.range_cache.len(), - self.range_cache.len() as f64 / self.outputs_inserted_since_flush as f64 * 100., - self.outputs_inserted_since_flush, - ); + ctx.try_log(|logger| { + slog::info!( + logger, + "Flushing {} entries ({:.1}% resulting from {} insertions) from memory to database", + self.range_cache.len(), + self.range_cache.len() as f64 / self.outputs_inserted_since_flush as f64 * 100., + self.outputs_inserted_since_flush, + ) + }); { let mut outpoint_to_sat_ranges = wtx.open_table(OUTPOINT_TO_SAT_RANGES)?; diff --git a/components/chainhook-event-observer/src/indexer/ordinals/indexing/updater/inscription_updater.rs b/components/chainhook-event-observer/src/indexer/ordinals/indexing/updater/inscription_updater.rs index 8e96c56..0bc5ce4 100644 --- a/components/chainhook-event-observer/src/indexer/ordinals/indexing/updater/inscription_updater.rs +++ b/components/chainhook-event-observer/src/indexer/ordinals/indexing/updater/inscription_updater.rs @@ -1,5 +1,8 @@ use crate::indexer::ordinals::{ - inscription::Inscription, inscription_id::InscriptionId, sat::Sat, sat_point::SatPoint, + inscription::{Inscription, InscriptionParser}, + inscription_id::InscriptionId, + sat::Sat, + sat_point::SatPoint, }; use super::*; @@ -88,6 +91,7 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> { tx: &Transaction, txid: Txid, input_sat_ranges: Option<&VecDeque<(u64, u64)>>, + ctx: &Context, ) -> Result { let mut inscriptions = Vec::new(); @@ -126,9 +130,23 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> { } } - if inscriptions.iter().all(|flotsam| flotsam.offset != 0) - && Inscription::from_transaction(tx).is_some() - { + let is_first_sat_pristine = if inscriptions.is_empty() { + true + } else { + inscriptions.iter().all(|flotsam| flotsam.offset != 0) + }; + + ctx.try_log(|logger| { + slog::info!( + logger, + "Decision: {}/{:?}/{:?}", + is_first_sat_pristine, + InscriptionParser::parse(&tx.input.get(0).unwrap().witness), + tx + ) + }); + + if is_first_sat_pristine && Inscription::from_transaction(tx).is_some() { let floatsam = Flotsam { inscription_id: txid.into(), offset: 0, diff --git a/components/chainhook-event-observer/src/indexer/ordinals/mod.rs b/components/chainhook-event-observer/src/indexer/ordinals/mod.rs index 42f3a2e..dbbf4d6 100644 --- a/components/chainhook-event-observer/src/indexer/ordinals/mod.rs +++ b/components/chainhook-event-observer/src/indexer/ordinals/mod.rs @@ -6,7 +6,7 @@ mod height; pub mod indexing; pub mod inscription; pub mod inscription_id; -mod sat; +pub mod sat; mod sat_point; use std::time::Duration; @@ -15,7 +15,7 @@ type Result = std::result::Result; use chainhook_types::BitcoinNetwork; -use crate::observer::EventObserverConfig; +use crate::{observer::EventObserverConfig, utils::Context}; const DIFFCHANGE_INTERVAL: u64 = bitcoincore_rpc::bitcoin::blockdata::constants::DIFFCHANGE_INTERVAL as u64; @@ -25,6 +25,7 @@ const CYCLE_EPOCHS: u64 = 6; pub fn initialize_ordinal_index( config: &EventObserverConfig, + ctx: &Context, ) -> Result { let chain = match &config.bitcoin_network { BitcoinNetwork::Mainnet => chain::Chain::Mainnet,