fix: patch absence of witness data

This commit is contained in:
Ludo Galabru
2023-03-03 01:44:34 -05:00
parent 24c3bc3aec
commit f8fcfcad6d
5 changed files with 331 additions and 107 deletions

View File

@@ -9,26 +9,29 @@ use crate::chainhooks::types::{
use crate::indexer::IndexerConfig;
use crate::observer::BitcoinConfig;
use crate::utils::Context;
use bitcoincore_rpc::bitcoin::{self, Script};
use bitcoincore_rpc::bitcoin::{
self, Address, BlockHeader, OutPoint as OutPointS, PackedLockTime, Script, Transaction, Txid,
Witness,
};
use bitcoincore_rpc_json::{GetRawTransactionResult, GetRawTransactionResultVout};
pub use blocks_pool::BitcoinBlockPool;
use chainhook_types::bitcoin::{OutPoint, TxIn, TxOut};
use chainhook_types::{
BitcoinBlockData, BitcoinBlockMetadata, BitcoinTransactionData, BitcoinTransactionMetadata,
BlockCommitmentData, BlockIdentifier, KeyRegistrationData, LockSTXData,
OrdinalInscriptionRevealData, OrdinalInscriptionRevealInscriptionData,
OrdinalInscriptionRevealOrdinalData, OrdinalOperation, PobBlockCommitmentData,
PoxBlockCommitmentData, PoxReward, StacksBaseChainOperation, TransactionIdentifier,
TransferSTXData,
OrdinalInscriptionRevealData, OrdinalOperation, PobBlockCommitmentData, PoxBlockCommitmentData,
PoxReward, StacksBaseChainOperation, TransactionIdentifier, TransferSTXData,
};
use clarity_repl::clarity::util::hash::to_hex;
use hiro_system_kit::slog;
use rocket::serde::json::Value as JsonValue;
use super::ordinals::indexing::updater::OrdinalIndexUpdater;
use super::ordinals::indexing::entry::InscriptionEntry;
use super::ordinals::indexing::updater::{BlockData, OrdinalIndexUpdater};
use super::ordinals::indexing::OrdinalIndex;
use super::ordinals::inscription::InscriptionParser;
use super::ordinals::inscription_id::InscriptionId;
use super::ordinals::sat::Sat;
use super::BitcoinChainContext;
#[derive(Clone, PartialEq, Debug, Deserialize, Serialize)]
@@ -115,25 +118,111 @@ pub fn standardize_bitcoin_block(
ctx: &Context,
) -> Result<BitcoinBlockData, String> {
let mut transactions = vec![];
ctx.try_log(|logger| slog::info!(logger, "Updating ordinal index",));
ctx.try_log(|logger| slog::info!(logger, "Updating ordinal index"));
let ordinal_index = bitcoin_context.ordinal_index.take();
let ctx_ = ctx.clone();
let block_data = if let Some(ref ordinal_index) = ordinal_index {
if let Ok(count) = ordinal_index.block_count() {
ctx.try_log(|logger| slog::info!(logger, "Blocks: {} / {}", count, block.height));
if count as usize == block.height + 10 {
ctx.try_log(|logger| slog::info!(logger, "Will use cached block"));
// let bits = hex::decode(block.bits).unwrap();
// let block_data = BlockData {
// header: BlockHeader {
// version: block.version,
// prev_blockhash: block.previousblockhash,
// merkle_root: block.merkleroot,
// time: block.time as u32,
// bits: u32::from_be_bytes([bits[0], bits[1], bits[2], bits[3]]),
// nonce: block.nonce,
// },
// txdata: block
// .tx
// .iter()
// .map(|tx| {
// (
// Transaction {
// version: tx.version as i32,
// lock_time: PackedLockTime(tx.locktime),
// input: tx
// .vin
// .iter()
// .map(|i| bitcoin::TxIn {
// previous_output: match (i.txid, i.vout) {
// (Some(txid), Some(vout)) => {
// OutPointS { txid, vout }
// }
// _ => OutPointS::null(),
// },
// script_sig: match i.script_sig {
// Some(ref script_sig) => {
// script_sig.script().unwrap()
// }
// None => Script::default(),
// },
// sequence: bitcoincore_rpc::bitcoin::Sequence(
// i.sequence,
// ),
// witness: Witness::from_vec(
// i.txinwitness.clone().unwrap_or(vec![]),
// ),
// })
// .collect(),
// output: tx
// .vout
// .iter()
// .map(|o| bitcoin::TxOut {
// value: o.value.to_sat(),
// script_pubkey: o.script_pub_key.script().unwrap(),
// })
// .collect(),
// },
// tx.txid,
// )
// })
// .collect::<Vec<(Transaction, Txid)>>(),
// };
// ctx.try_log(|logger| slog::info!(logger, "BlockData: {:?}", block_data));
// Some(block_data)
None
} else {
None
}
} else {
None
}
} else {
None
};
let handle: JoinHandle<Result<_, String>> =
hiro_system_kit::thread_named("Ordinal index update")
.spawn(move || {
if let Some(ref ordinal_index) = ordinal_index {
match hiro_system_kit::nestable_block_on(OrdinalIndexUpdater::update(
&ordinal_index,
block_data,
&ctx_,
)) {
Ok(_) => {
ctx_.try_log(|logger| {
slog::info!(logger, "Ordinal index successfully updated",)
slog::info!(
logger,
"Ordinal index successfully updated {:?}",
ordinal_index.block_count()
)
});
}
Err(e) => {
ctx_.try_log(|logger| {
slog::error!(logger, "Error updating ordinal index",)
slog::error!(
logger,
"Error updating ordinal index: {}",
e.to_string()
)
});
}
};
@@ -276,31 +365,39 @@ fn try_parse_ordinal_operation(
Ok(Some(entry)) => entry,
_ => {
ctx.try_log(|logger| slog::info!(logger, "No inscriptions entry found in index, despite inscription detected in transaction"));
return None;
InscriptionEntry {
fee: 0,
height: 0,
number: 0,
sat: Some(Sat(0)),
timestamp: 0,
}
}
};
let authors = tx.vout[0]
.script_pub_key
.addresses
.clone()
.unwrap_or(vec![]);
let sat = &inscription_entry.sat.unwrap();
let no_content_bytes = vec![];
let inscription_content_bytes = inscription.body().unwrap_or(&no_content_bytes);
return Some(OrdinalOperation::InscriptionRevealed(
OrdinalInscriptionRevealData {
inscription: OrdinalInscriptionRevealInscriptionData {
content_type: inscription
.content_type()
.unwrap_or("unknown")
.to_string(),
content_bytes: format!("0x{}", to_hex(&inscription_content_bytes)),
content_length: inscription_content_bytes.len(),
inscription_id: inscription_id.to_string(),
inscription_number: inscription_entry.number,
inscription_author: "".into(),
inscription_fee: inscription_entry.fee,
},
ordinal: Some(OrdinalInscriptionRevealOrdinalData {
ordinal_number: inscription_entry.sat.unwrap().n(),
ordinal_block_height: 0,
ordinal_offset: 0,
}),
content_type: inscription.content_type().unwrap_or("unknown").to_string(),
content_bytes: format!("0x{}", to_hex(&inscription_content_bytes)),
content_length: inscription_content_bytes.len(),
inscription_id: inscription_id.to_string(),
inscription_number: inscription_entry.number,
inscription_authors: authors
.into_iter()
.map(|a| a.to_string())
.collect::<Vec<String>>(),
inscription_fee: inscription_entry.fee,
ordinal_number: sat.n(),
ordinal_block_height: sat.height().n(),
},
));
}

View File

@@ -1,10 +1,12 @@
use std::{collections::BTreeMap, path::PathBuf};
use anyhow::Context;
use anyhow::Context as Ctx;
use bitcoincore_rpc::bitcoin::{Amount, Block, BlockHash, OutPoint, Transaction, Txid};
use bitcoincore_rpc::RpcApi;
use chrono::{DateTime, TimeZone, Utc};
use crate::utils::Context;
use super::blocktime::Blocktime;
use super::chain::Chain;
use super::height::Height;
@@ -12,7 +14,7 @@ use super::sat_point::SatPoint;
use super::{inscription_id::InscriptionId, sat::Sat};
use std::cmp;
mod entry;
pub mod entry;
mod fetcher;
use {
@@ -370,10 +372,6 @@ impl OrdinalIndex {
Ok(info)
}
pub async fn update(&self) -> Result {
OrdinalIndexUpdater::update(self).await
}
pub fn is_reorged(&self) -> bool {
self.reorged.load(atomic::Ordering::Relaxed)
}

View File

@@ -1,6 +1,15 @@
use crate::indexer::ordinals::{height::Height, sat::Sat, sat_point::SatPoint};
use anyhow::Context;
use bitcoincore_rpc::bitcoin::{Block, OutPoint, Transaction, Txid};
use crate::{
indexer::{
bitcoin::Block,
ordinals::{height::Height, sat::Sat, sat_point::SatPoint},
},
utils::Context,
};
use anyhow::Context as Ctx;
use bitcoincore_rpc::bitcoin::{
OutPoint, PackedLockTime, Script, Transaction, TxIn, TxOut, Txid, Witness,
};
use hiro_system_kit::slog;
use std::{
collections::{HashSet, VecDeque},
@@ -18,25 +27,10 @@ use {
mod inscription_updater;
struct BlockData {
header: BlockHeader,
txdata: Vec<(Transaction, Txid)>,
}
impl From<Block> for BlockData {
fn from(block: Block) -> Self {
BlockData {
header: block.header,
txdata: block
.txdata
.into_iter()
.map(|transaction| {
let txid = transaction.txid();
(transaction, txid)
})
.collect(),
}
}
#[derive(Debug)]
pub struct BlockData {
pub header: BlockHeader,
pub txdata: Vec<(Transaction, Txid)>,
}
pub struct OrdinalIndexUpdater {
@@ -49,7 +43,11 @@ pub struct OrdinalIndexUpdater {
}
impl OrdinalIndexUpdater {
pub async fn update(index: &OrdinalIndex) -> Result {
pub async fn update(
index: &OrdinalIndex,
block_opt: Option<BlockData>,
ctx: &Context,
) -> Result {
let wtx = index.begin_write()?;
let height = wtx
@@ -78,17 +76,49 @@ impl OrdinalIndexUpdater {
outputs_traversed: 0,
};
updater.update_index(index, wtx).await
if let Some(block) = block_opt {
updater
.update_index_with_block(index, wtx, block, ctx)
.await
} else {
updater.update_index(index, wtx, ctx).await
}
}
async fn update_index_with_block<'index>(
&mut self,
index: &'index OrdinalIndex,
mut wtx: WriteTransaction<'index>,
block: BlockData,
ctx: &Context,
) -> Result {
let (mut outpoint_sender, mut value_receiver) = Self::spawn_fetcher(index)?;
let mut value_cache = HashMap::new();
self.index_block(
index,
&mut outpoint_sender,
&mut value_receiver,
&mut wtx,
block,
&mut value_cache,
ctx,
)
.await?;
self.commit(wtx, value_cache, ctx)?;
Ok(())
}
async fn update_index<'index>(
&mut self,
index: &'index OrdinalIndex,
mut wtx: WriteTransaction<'index>,
ctx: &Context,
) -> Result {
let starting_height = index.client.get_block_count()? + 1;
let rx = Self::fetch_blocks_from(index, self.height, true)?;
let rx = Self::fetch_blocks_from(index, self.height, ctx)?;
let (mut outpoint_sender, mut value_receiver) = Self::spawn_fetcher(index)?;
@@ -107,13 +137,14 @@ impl OrdinalIndexUpdater {
&mut wtx,
block,
&mut value_cache,
ctx,
)
.await?;
uncommitted += 1;
if uncommitted == 5000 {
self.commit(wtx, value_cache)?;
self.commit(wtx, value_cache, ctx)?;
value_cache = HashMap::new();
uncommitted = 0;
wtx = index.begin_write()?;
@@ -141,7 +172,7 @@ impl OrdinalIndexUpdater {
}
if uncommitted > 0 {
self.commit(wtx, value_cache)?;
self.commit(wtx, value_cache, ctx)?;
}
Ok(())
@@ -150,7 +181,7 @@ impl OrdinalIndexUpdater {
fn fetch_blocks_from(
index: &OrdinalIndex,
mut height: u64,
index_sats: bool,
ctx: &Context,
) -> Result<mpsc::Receiver<BlockData>> {
let (tx, rx) = mpsc::sync_channel(32);
@@ -161,6 +192,7 @@ impl OrdinalIndexUpdater {
let first_inscription_height = index.first_inscription_height;
let ctx_ = ctx.clone();
std::thread::spawn(move || loop {
if let Some(height_limit) = height_limit {
if height >= height_limit {
@@ -168,15 +200,12 @@ impl OrdinalIndexUpdater {
}
}
match Self::get_block_with_retries(
&client,
height,
index_sats,
first_inscription_height,
) {
match Self::get_block_with_retries(&client, height, first_inscription_height) {
Ok(Some(block)) => {
if let Err(err) = tx.send(block.into()) {
println!("Block receiver disconnected: {err}");
ctx_.try_log(|logger| {
slog::error!(logger, "Block receiver disconnected: {err}",)
});
break;
}
height += 1;
@@ -195,9 +224,8 @@ impl OrdinalIndexUpdater {
fn get_block_with_retries(
client: &Client,
height: u64,
index_sats: bool,
first_inscription_height: u64,
) -> Result<Option<Block>> {
) -> Result<Option<BlockData>> {
let mut errors = 0;
loop {
match client
@@ -206,13 +234,82 @@ impl OrdinalIndexUpdater {
.and_then(|option| {
option
.map(|hash| {
if index_sats || height >= first_inscription_height {
Ok(client.get_block(&hash)?)
if height >= first_inscription_height {
let block: Block =
client.call("getblock", &[json!(hash), json!(2)])?;
let bits = hex::decode(block.bits).unwrap();
let block_data = BlockData {
header: BlockHeader {
version: block.version,
prev_blockhash: block.previousblockhash,
merkle_root: block.merkleroot,
time: block.time as u32,
bits: u32::from_be_bytes([
bits[0], bits[1], bits[2], bits[3],
]),
nonce: block.nonce,
},
txdata: block
.tx
.iter()
.map(|tx| {
(
Transaction {
version: tx.version as i32,
lock_time: PackedLockTime(tx.locktime),
input: tx
.vin
.iter()
.map(|i| TxIn {
previous_output: match (i.txid, i.vout)
{
(Some(txid), Some(vout)) => {
OutPoint { txid, vout }
}
_ => OutPoint::null(),
},
script_sig: match i.script_sig {
Some(ref script_sig) => {
script_sig.script().unwrap()
}
None => Script::default(),
},
sequence:
bitcoincore_rpc::bitcoin::Sequence(
i.sequence,
),
witness: Witness::from_vec(
i.txinwitness
.clone()
.unwrap_or(vec![]),
),
})
.collect(),
output: tx
.vout
.iter()
.map(|o| TxOut {
value: o.value.to_sat(),
script_pubkey: o
.script_pub_key
.script()
.unwrap(),
})
.collect(),
},
tx.txid,
)
})
.collect::<Vec<(Transaction, Txid)>>(),
};
Ok(block_data)
} else {
Ok(Block {
header: client.get_block_header(&hash)?,
let header = client.get_block_header(&hash)?;
let block_data = BlockData {
header,
txdata: Vec::new(),
})
};
Ok(block_data)
}
})
.transpose()
@@ -312,6 +409,7 @@ impl OrdinalIndexUpdater {
wtx: &mut WriteTransaction<'_>,
block: BlockData,
value_cache: &mut HashMap<OutPoint, u64>,
ctx: &Context,
) -> Result<()> {
// If value_receiver still has values something went wrong with the last block
// Could be an assert, shouldn't recover from this and commit the last block
@@ -329,12 +427,15 @@ impl OrdinalIndexUpdater {
let time = timestamp(block.header.time);
println!(
"Block {} at {} with {} transactions…",
self.height,
time,
block.txdata.len()
);
ctx.try_log(|logger| {
slog::info!(
logger,
"Block {} at {} with {} transactions…",
self.height,
time,
block.txdata.len()
)
});
if let Some(prev_height) = self.height.checked_sub(1) {
let prev_hash = height_to_block_hash.get(&prev_height)?.unwrap();
@@ -421,6 +522,7 @@ impl OrdinalIndexUpdater {
&mut sat_ranges_written,
&mut outputs_in_block,
&mut inscription_updater,
&ctx,
)
.await?;
@@ -436,6 +538,7 @@ impl OrdinalIndexUpdater {
&mut sat_ranges_written,
&mut outputs_in_block,
&mut inscription_updater,
&ctx,
)
.await?;
}
@@ -473,11 +576,6 @@ impl OrdinalIndexUpdater {
self.height += 1;
self.outputs_traversed += outputs_in_block;
// println!(
// "Wrote {sat_ranges_written} sat ranges from {outputs_in_block} outputs in {} ms",
// (Instant::now() - start).as_millis(),
// );
Ok(())
}
@@ -490,9 +588,10 @@ impl OrdinalIndexUpdater {
sat_ranges_written: &mut u64,
outputs_traversed: &mut u64,
inscription_updater: &mut InscriptionUpdater<'_, '_, '_>,
ctx: &Context,
) -> Result {
inscription_updater
.index_transaction_inscriptions(tx, txid, Some(input_sat_ranges))
.index_transaction_inscriptions(tx, txid, Some(input_sat_ranges), ctx)
.await?;
for (vout, output) in tx.output.iter().enumerate() {
@@ -546,21 +645,32 @@ impl OrdinalIndexUpdater {
Ok(())
}
fn commit(&mut self, wtx: WriteTransaction, value_cache: HashMap<OutPoint, u64>) -> Result {
println!(
"Committing at block height {}, {} outputs traversed, {} in map, {} cached",
self.height,
self.outputs_traversed,
self.range_cache.len(),
self.outputs_cached
);
fn commit(
&mut self,
wtx: WriteTransaction,
value_cache: HashMap<OutPoint, u64>,
ctx: &Context,
) -> Result {
ctx.try_log(|logger| {
slog::info!(
logger,
"Committing at block height {}, {} outputs traversed, {} in map, {} cached",
self.height,
self.outputs_traversed,
self.range_cache.len(),
self.outputs_cached
)
});
println!(
"Flushing {} entries ({:.1}% resulting from {} insertions) from memory to database",
self.range_cache.len(),
self.range_cache.len() as f64 / self.outputs_inserted_since_flush as f64 * 100.,
self.outputs_inserted_since_flush,
);
ctx.try_log(|logger| {
slog::info!(
logger,
"Flushing {} entries ({:.1}% resulting from {} insertions) from memory to database",
self.range_cache.len(),
self.range_cache.len() as f64 / self.outputs_inserted_since_flush as f64 * 100.,
self.outputs_inserted_since_flush,
)
});
{
let mut outpoint_to_sat_ranges = wtx.open_table(OUTPOINT_TO_SAT_RANGES)?;

View File

@@ -1,5 +1,8 @@
use crate::indexer::ordinals::{
inscription::Inscription, inscription_id::InscriptionId, sat::Sat, sat_point::SatPoint,
inscription::{Inscription, InscriptionParser},
inscription_id::InscriptionId,
sat::Sat,
sat_point::SatPoint,
};
use super::*;
@@ -88,6 +91,7 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> {
tx: &Transaction,
txid: Txid,
input_sat_ranges: Option<&VecDeque<(u64, u64)>>,
ctx: &Context,
) -> Result<u64> {
let mut inscriptions = Vec::new();
@@ -126,9 +130,23 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> {
}
}
if inscriptions.iter().all(|flotsam| flotsam.offset != 0)
&& Inscription::from_transaction(tx).is_some()
{
let is_first_sat_pristine = if inscriptions.is_empty() {
true
} else {
inscriptions.iter().all(|flotsam| flotsam.offset != 0)
};
ctx.try_log(|logger| {
slog::info!(
logger,
"Decision: {}/{:?}/{:?}",
is_first_sat_pristine,
InscriptionParser::parse(&tx.input.get(0).unwrap().witness),
tx
)
});
if is_first_sat_pristine && Inscription::from_transaction(tx).is_some() {
let floatsam = Flotsam {
inscription_id: txid.into(),
offset: 0,

View File

@@ -6,7 +6,7 @@ mod height;
pub mod indexing;
pub mod inscription;
pub mod inscription_id;
mod sat;
pub mod sat;
mod sat_point;
use std::time::Duration;
@@ -15,7 +15,7 @@ type Result<T = (), E = anyhow::Error> = std::result::Result<T, E>;
use chainhook_types::BitcoinNetwork;
use crate::observer::EventObserverConfig;
use crate::{observer::EventObserverConfig, utils::Context};
const DIFFCHANGE_INTERVAL: u64 =
bitcoincore_rpc::bitcoin::blockdata::constants::DIFFCHANGE_INTERVAL as u64;
@@ -25,6 +25,7 @@ const CYCLE_EPOCHS: u64 = 6;
pub fn initialize_ordinal_index(
config: &EventObserverConfig,
ctx: &Context,
) -> Result<self::indexing::OrdinalIndex, String> {
let chain = match &config.bitcoin_network {
BitcoinNetwork::Mainnet => chain::Chain::Mainnet,