fix: address remaining issues

This commit is contained in:
Ludo Galabru
2023-08-04 22:30:58 +02:00
parent ddf790f2e2
commit 74b2fa9411
16 changed files with 1208 additions and 1269 deletions

View File

@@ -14,8 +14,8 @@ redis = "0.21.5"
serde-redis = "0.12.0"
hex = "0.4.3"
rand = "0.8.5"
chainhook-sdk = { version = "=0.8.0", default-features = false, features = ["zeromq"] }
# chainhook-sdk = { version = "=0.7.12", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
chainhook-sdk = { version = "=0.8.2", default-features = false, features = ["zeromq"] }
# chainhook-sdk = { version = "=0.8.2", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
hiro-system-kit = "0.1.0"
clap = { version = "3.2.23", features = ["derive"], optional = true }
clap_generate = { version = "3.0.3", optional = true }

View File

@@ -3,7 +3,7 @@ use crate::config::Config;
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::{self};
use crate::core::protocol::inscription_parsing::parse_ordinals_and_standardize_block;
use crate::download::download_ordinals_dataset_if_required;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::service::Service;
@@ -11,7 +11,7 @@ use crate::service::Service;
use crate::db::{
delete_data_in_hord_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
find_latest_inscription_block_height, find_lazy_block_at_block_height, initialize_hord_db,
find_latest_inscription_block_height, find_lazy_block_at_block_height,
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db,
};
@@ -742,7 +742,7 @@ pub async fn fetch_and_standardize_block(
download_and_parse_block_with_retry(http_client, &block_hash, &bitcoin_config, &ctx)
.await?;
core::parse_ordinals_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx)
parse_ordinals_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx)
.map_err(|(e, _)| e)
}

View File

@@ -1,14 +1,11 @@
pub mod pipeline;
pub mod protocol;
use chainhook_sdk::types::{
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionRevealData, OrdinalOperation,
};
use chainhook_sdk::types::{BitcoinBlockData, OrdinalOperation};
use dashmap::DashMap;
use fxhash::{FxBuildHasher, FxHasher};
use rocksdb::DB;
use rusqlite::Connection;
use std::collections::BTreeMap;
use std::hash::BuildHasherDefault;
use std::ops::Div;
use std::path::PathBuf;
@@ -20,8 +17,6 @@ use chainhook_sdk::{
use crate::config::{Config, LogConfig};
use chainhook_sdk::indexer::bitcoin::{standardize_bitcoin_block, BitcoinBlockFullBreakdown};
use crate::db::{
find_last_block_inserted, find_latest_inscription_block_height, initialize_hord_db,
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db,
@@ -32,10 +27,6 @@ use crate::db::{
LazyBlockTransaction,
};
use self::protocol::inscribing::{
get_inscriptions_from_full_tx, get_inscriptions_from_standardized_tx,
};
#[derive(Clone, Debug)]
pub struct HordConfig {
pub network_thread_max: usize,
@@ -47,52 +38,6 @@ pub struct HordConfig {
pub logs: LogConfig,
}
pub fn parse_ordinals_and_standardize_block(
raw_block: BitcoinBlockFullBreakdown,
network: &BitcoinNetwork,
ctx: &Context,
) -> Result<BitcoinBlockData, (String, bool)> {
let mut ordinal_operations = BTreeMap::new();
for tx in raw_block.tx.iter() {
ordinal_operations.insert(tx.txid.to_string(), get_inscriptions_from_full_tx(&tx, ctx));
}
let mut block = standardize_bitcoin_block(raw_block, network, ctx)?;
for tx in block.transactions.iter_mut() {
if let Some(ordinal_operations) =
ordinal_operations.remove(tx.transaction_identifier.get_hash_bytes_str())
{
tx.metadata.ordinal_operations = ordinal_operations;
}
}
Ok(block)
}
pub fn parse_inscriptions_in_standardized_block(block: &mut BitcoinBlockData, ctx: &Context) {
for tx in block.transactions.iter_mut() {
tx.metadata.ordinal_operations = get_inscriptions_from_standardized_tx(tx, ctx);
}
}
pub fn get_inscriptions_revealed_in_block(
block: &BitcoinBlockData,
) -> Vec<&OrdinalInscriptionRevealData> {
let mut ops = vec![];
for tx in block.transactions.iter() {
for op in tx.metadata.ordinal_operations.iter() {
if let OrdinalOperation::InscriptionRevealed(op) = op {
ops.push(op);
}
if let OrdinalOperation::CursedInscriptionRevealed(op) = op {
ops.push(op);
}
}
}
ops
}
pub fn revert_hord_db_with_augmented_bitcoin_block(
block: &BitcoinBlockData,
blocks_db_rw: &DB,
@@ -106,8 +51,7 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
let tx = &block.transactions[block.transactions.len() - tx_index];
for ordinal_event in tx.metadata.ordinal_operations.iter() {
match ordinal_event {
OrdinalOperation::InscriptionRevealed(data)
| OrdinalOperation::CursedInscriptionRevealed(data) => {
OrdinalOperation::InscriptionRevealed(data) => {
// We remove any new inscription created
remove_entry_from_inscriptions(
&data.inscription_id,
@@ -248,13 +192,13 @@ pub fn should_sync_hord_db(
(end_block.min(200_000), 10_000)
} else if start_block < 550_000 {
(end_block.min(550_000), 1_000)
} else {
} else {
(end_block, 100)
};
if start_block < 767430 && end_block > 767430 {
end_block = 767430;
}
}
if start_block <= end_block {
Ok(Some((start_block, end_block, speed)))

View File

@@ -16,7 +16,7 @@ use chainhook_sdk::indexer::bitcoin::{
build_http_client, parse_downloaded_block, try_download_block_bytes_with_retry,
};
use super::parse_ordinals_and_standardize_block;
use super::protocol::inscription_parsing::parse_ordinals_and_standardize_block;
pub enum PostProcessorCommand {
Start,

View File

@@ -6,11 +6,12 @@ use std::{
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
use crossbeam_channel::TryRecvError;
use rocksdb::DB;
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db},
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db, LazyBlock},
};
pub fn start_block_ingestion_processor(
@@ -25,18 +26,14 @@ pub fn start_block_ingestion_processor(
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
.spawn(move || {
let mut num_writes = 0;
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();
let mut empty_cycles = 0;
let mut tip: u64 = 0;
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start block indexing runloop");
}
loop {
let (compacted_blocks, _) = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
@@ -62,40 +59,7 @@ pub fn start_block_ingestion_processor(
}
},
};
let batch_size = compacted_blocks.len();
num_writes += batch_size;
for (block_height, compacted_block) in compacted_blocks.into_iter() {
tip = tip.max(block_height);
insert_entry_in_blocks(
block_height as u32,
&compacted_block,
&blocks_db_rw,
&ctx,
);
}
info!(ctx.expect_logger(), "{batch_size} blocks saved to disk (total: {tip})");
// Early return
if num_writes >= 512 {
ctx.try_log(|logger| {
info!(logger, "Flushing DB to disk ({num_writes} inserts)");
});
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
num_writes = 0;
continue;
}
// Write blocks to disk, before traversals
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
store_compacted_blocks(compacted_blocks, &blocks_db_rw, &ctx);
}
if let Err(e) = blocks_db_rw.flush() {
@@ -112,3 +76,22 @@ pub fn start_block_ingestion_processor(
thread_handle: handle,
}
}
pub fn store_compacted_blocks(
mut compacted_blocks: Vec<(u64, LazyBlock)>,
blocks_db_rw: &DB,
ctx: &Context,
) {
compacted_blocks.sort_by(|(a, _), (b, _)| a.cmp(b));
for (block_height, compacted_block) in compacted_blocks.into_iter() {
insert_entry_in_blocks(block_height as u32, &compacted_block, &blocks_db_rw, &ctx);
info!(ctx.expect_logger(), "Block #{block_height} saved to disk");
}
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
}

View File

@@ -1,28 +1,36 @@
use std::{
collections::BTreeMap,
sync::Arc,
thread::{sleep, JoinHandle},
time::Duration,
};
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType,
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
},
types::{BitcoinBlockData, TransactionIdentifier},
utils::Context,
};
use crossbeam_channel::{Sender, TryRecvError};
use rusqlite::Transaction;
use dashmap::DashMap;
use fxhash::FxHasher;
use rusqlite::Connection;
use std::collections::HashMap;
use std::hash::BuildHasherDefault;
use crate::{
core::{protocol::sequencing::update_hord_db_and_augment_bitcoin_block_v3, HordConfig},
db::{find_all_inscriptions_in_block, find_all_transfers_in_block, format_satpoint_to_watch},
core::{
pipeline::processors::block_ingestion::store_compacted_blocks,
protocol::{
inscription_parsing::get_inscriptions_revealed_in_block,
inscription_sequencing::{
augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx,
retrieve_inscribed_satoshi_points_from_block_v3, SequenceCursor,
},
inscription_tracking::augment_block_with_ordinals_transfer_data,
},
HordConfig,
},
db::{get_any_entry_in_ordinal_activities, open_readonly_hord_db_conn},
};
use crate::db::{LazyBlockTransaction, TraversalResult};
@@ -33,10 +41,7 @@ use crate::{
new_traversals_lazy_cache,
pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
},
db::{
insert_entry_in_blocks, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
InscriptionHeigthHint,
},
db::{open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db},
};
pub fn start_inscription_indexing_processor(
@@ -58,13 +63,13 @@ pub fn start_inscription_indexing_processor(
let mut inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
let hord_config = config.get_hord_config();
let mut num_writes = 0;
let mut tip: u64 = 0;
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();
let mut inscription_height_hint = InscriptionHeigthHint::new();
let mut empty_cycles = 0;
let inscriptions_db_conn =
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
let mut sequence_cursor = SequenceCursor::new(inscriptions_db_conn);
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start inscription indexing runloop");
@@ -94,55 +99,22 @@ pub fn start_inscription_indexing_processor(
},
};
let batch_size = compacted_blocks.len();
num_writes += batch_size;
for (block_height, compacted_block) in compacted_blocks.into_iter() {
tip = tip.max(block_height);
insert_entry_in_blocks(
block_height as u32,
&compacted_block,
&blocks_db_rw,
&ctx,
);
}
info!(ctx.expect_logger(), "{batch_size} blocks saved to disk (total: {tip})");
// Early return
if blocks.is_empty() {
if num_writes >= 512 {
ctx.try_log(|logger| {
info!(logger, "Flushing DB to disk ({num_writes} inserts)");
});
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
num_writes = 0;
}
continue;
}
store_compacted_blocks(compacted_blocks, &blocks_db_rw, &Context::empty());
info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());
// Write blocks to disk, before traversals
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
garbage_collect_nth_block += blocks.len();
process_blocks(
blocks = process_blocks(
&mut blocks,
&mut sequence_cursor,
&cache_l2,
&mut inscription_height_hint,
&mut inscriptions_db_conn_rw,
&hord_config,
&post_processor,
&ctx,
);
garbage_collect_nth_block += blocks.len();
// Clear L2 cache on a regular basis
if garbage_collect_nth_block > garbage_collect_every_n_blocks {
info!(
@@ -154,12 +126,6 @@ pub fn start_inscription_indexing_processor(
garbage_collect_nth_block = 0;
}
}
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
})
.expect("unable to spawn thread");
@@ -172,29 +138,88 @@ pub fn start_inscription_indexing_processor(
pub fn process_blocks(
next_blocks: &mut Vec<BitcoinBlockData>,
sequence_cursor: &mut SequenceCursor,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
inscription_height_hint: &mut InscriptionHeigthHint,
inscriptions_db_conn_rw: &mut Connection,
hord_config: &HordConfig,
post_processor: &Option<Sender<BitcoinBlockData>>,
ctx: &Context,
) -> Vec<BitcoinBlockData> {
let mut cache_l1 = HashMap::new();
let mut cache_l1 = BTreeMap::new();
let mut updated_blocks = vec![];
for _cursor in 0..next_blocks.len() {
let inscriptions_db_tx: rusqlite::Transaction<'_> =
inscriptions_db_conn_rw.transaction().unwrap();
let mut block = next_blocks.remove(0);
// We check before hand if some data were pre-existing, before processing
let any_existing_activity = get_any_entry_in_ordinal_activities(
&block.block_identifier.index,
&inscriptions_db_tx,
ctx,
);
let _ = process_block(
&mut block,
&next_blocks,
sequence_cursor,
&mut cache_l1,
cache_l2,
inscription_height_hint,
inscriptions_db_conn_rw,
&inscriptions_db_tx,
hord_config,
ctx,
);
let inscriptions_revealed = get_inscriptions_revealed_in_block(&block)
.iter()
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();
ctx.try_log(|logger| {
info!(
logger,
"Block #{} revealed {} inscriptions [{}]",
block.block_identifier.index,
inscriptions_revealed.len(),
inscriptions_revealed.join(", ")
)
});
if any_existing_activity {
ctx.try_log(|logger| {
warn!(
logger,
"Dropping updates for block #{}, activities present in database",
block.block_identifier.index,
)
});
let _ = inscriptions_db_tx.rollback();
} else {
match inscriptions_db_tx.commit() {
Ok(_) => {
ctx.try_log(|logger| {
info!(
logger,
"Updates saved for block {}", block.block_identifier.index,
)
});
}
Err(e) => {
ctx.try_log(|logger| {
error!(
logger,
"Unable to update changes in block #{}: {}",
block.block_identifier.index,
e.to_string()
)
});
}
}
}
if let Some(post_processor_tx) = post_processor {
let _ = post_processor_tx.send(block.clone());
}
@@ -206,104 +231,45 @@ pub fn process_blocks(
pub fn process_block(
block: &mut BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
sequence_cursor: &mut SequenceCursor,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
inscription_height_hint: &mut InscriptionHeigthHint,
inscriptions_db_conn_rw: &mut Connection,
inscriptions_db_tx: &Transaction,
hord_config: &HordConfig,
ctx: &Context,
) -> Result<(), String> {
update_hord_db_and_augment_bitcoin_block_v3(
block,
next_blocks,
let any_processable_transactions = retrieve_inscribed_satoshi_points_from_block_v3(
&block,
&next_blocks,
cache_l1,
cache_l2,
inscription_height_hint,
inscriptions_db_conn_rw,
hord_config,
inscriptions_db_tx,
&hord_config,
ctx,
)
}
)?;
pub fn re_augment_block_with_ordinals_operations(
block: &mut BitcoinBlockData,
inscriptions_db_conn: &Connection,
ctx: &Context,
) {
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
if !any_processable_transactions {
return Ok(());
}
// Always discard if we have some existing content at this block height (inscription or transfers)
let inner_ctx = if hord_config.logs.ordinals_internals {
ctx.clone()
} else {
Context::empty()
};
// Restore inscriptions data
let mut inscriptions =
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx);
let mut should_become_cursed = vec![];
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
for (op_index, operation) in tx.metadata.ordinal_operations.iter_mut().enumerate() {
let (inscription, is_cursed) = match operation {
OrdinalOperation::CursedInscriptionRevealed(ref mut inscription) => {
(inscription, true)
}
OrdinalOperation::InscriptionRevealed(ref mut inscription) => (inscription, false),
OrdinalOperation::InscriptionTransferred(_) => continue,
};
let Some(traversal) = inscriptions.remove(&(tx.transaction_identifier.clone(), inscription.inscription_input_index)) else {
continue;
};
inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset();
inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height();
inscription.ordinal_number = traversal.ordinal_number;
inscription.inscription_number = traversal.inscription_number;
inscription.transfers_pre_inscription = traversal.transfers;
inscription.inscription_fee = tx.metadata.fee;
inscription.tx_index = tx_index;
inscription.satpoint_post_inscription = format_satpoint_to_watch(
&traversal.transfer_data.transaction_identifier_location,
traversal.transfer_data.output_index,
traversal.transfer_data.inscription_offset_intra_output,
);
let Some(output) = tx.metadata.outputs.get(traversal.transfer_data.output_index) else {
continue;
};
inscription.inscription_output_value = output.value;
inscription.inscriber_address = {
let script_pub_key = output.get_script_pubkey_hex();
match Script::from_hex(&script_pub_key) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(a) => Some(a.to_string()),
_ => None,
},
_ => None,
}
};
if !is_cursed && inscription.inscription_number < 0 {
inscription.curse_type = Some(OrdinalInscriptionCurseType::Reinscription);
should_become_cursed.push((tx_index, op_index));
}
}
}
for (tx_index, op_index) in should_become_cursed.into_iter() {
let Some(tx) = block.transactions.get_mut(tx_index) else {
continue;
};
let OrdinalOperation::InscriptionRevealed(inscription) = tx.metadata.ordinal_operations.remove(op_index) else {
continue;
};
tx.metadata.ordinal_operations.insert(
op_index,
OrdinalOperation::CursedInscriptionRevealed(inscription),
);
}
// TODO: Handle transfers
// Handle inscriptions
let _ = augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
block,
sequence_cursor,
cache_l1,
&inscriptions_db_tx,
&inner_ctx,
);
// Handle transfers
let _ = augment_block_with_ordinals_transfer_data(block, inscriptions_db_tx, true, &inner_ctx);
Ok(())
}

View File

@@ -1,28 +1,24 @@
use std::{
collections::BTreeMap,
thread::{sleep, JoinHandle},
time::Duration,
};
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation},
utils::Context,
};
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
use crossbeam_channel::{Sender, TryRecvError};
use crate::{
core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
db::{
find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_locations,
parse_satpoint_to_watch, remove_entries_from_locations_at_block_height,
},
};
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::open_readwrite_hord_db_conn,
core::{
pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
protocol::{
inscription_sequencing::consolidate_block_with_pre_computed_ordinals_data,
inscription_tracking::augment_block_with_ordinals_transfer_data,
},
},
db::{
insert_new_inscriptions_from_block_in_locations, open_readwrite_hord_db_conn,
remove_entries_from_locations_at_block_height,
},
};
pub fn start_transfers_recomputing_processor(
@@ -70,123 +66,33 @@ pub fn start_transfers_recomputing_processor(
};
info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());
let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap();
for block in blocks.iter_mut() {
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
};
info!(
ctx.expect_logger(),
"Cleaning transfers from block {}", block.block_identifier.index
);
let inscriptions = find_all_inscriptions_in_block(
&block.block_identifier.index,
&inscriptions_db_conn_rw,
consolidate_block_with_pre_computed_ordinals_data(
block,
&inscriptions_db_tx,
false,
&ctx,
);
info!(
ctx.expect_logger(),
"{} inscriptions retrieved at block {}",
inscriptions.len(),
block.block_identifier.index
);
let mut operations = BTreeMap::new();
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
remove_entries_from_locations_at_block_height(
&block.block_identifier.index,
&transaction,
&inscriptions_db_tx,
&ctx,
);
for (_, entry) in inscriptions.iter() {
let inscription_id = entry.get_inscription_id();
info!(
ctx.expect_logger(),
"Processing inscription {}", inscription_id
);
insert_entry_in_locations(
&inscription_id,
block.block_identifier.index,
&entry.transfer_data,
&transaction,
&ctx,
);
operations.insert(
entry.transaction_identifier_inscription.clone(),
OrdinalInscriptionTransferData {
inscription_id: entry.get_inscription_id(),
updated_address: None,
satpoint_pre_transfer: format_satpoint_to_watch(
&entry.transaction_identifier_inscription,
entry.inscription_input_index,
0,
),
satpoint_post_transfer: format_satpoint_to_watch(
&entry.transfer_data.transaction_identifier_location,
entry.transfer_data.output_index,
entry.transfer_data.inscription_offset_intra_output,
),
post_transfer_output_value: None,
tx_index: 0,
},
);
}
info!(
ctx.expect_logger(),
"Rewriting transfers for block {}", block.block_identifier.index
);
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
tx.metadata.ordinal_operations.clear();
if let Some(mut entry) = operations.remove(&tx.transaction_identifier) {
let (_, output_index, _) =
parse_satpoint_to_watch(&entry.satpoint_post_transfer);
let script_pub_key_hex =
tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match Script::from_hex(&script_pub_key_hex) {
Ok(script) => {
match Address::from_script(&script, network.clone()) {
Ok(address) => Some(address.to_string()),
Err(_e) => None,
}
}
Err(_e) => None,
};
entry.updated_address = updated_address;
entry.post_transfer_output_value =
Some(tx.metadata.outputs[output_index].value);
entry.tx_index = tx_index;
tx.metadata
.ordinal_operations
.push(OrdinalOperation::InscriptionTransferred(entry));
}
}
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
insert_new_inscriptions_from_block_in_locations(
block,
&transaction,
&inscriptions_db_tx,
&ctx,
)
.unwrap();
info!(
ctx.expect_logger(),
"Saving supdates for block {}", block.block_identifier.index
);
transaction.commit().unwrap();
info!(
ctx.expect_logger(),
"Transfers in block {} repaired", block.block_identifier.index
augment_block_with_ordinals_transfer_data(
block,
&inscriptions_db_tx,
true,
&ctx,
);
if let Some(ref post_processor) = post_processor {

View File

@@ -3,9 +3,10 @@ use std::str::FromStr;
use chainhook_sdk::bitcoincore_rpc_json::bitcoin::hashes::hex::FromHex;
use chainhook_sdk::bitcoincore_rpc_json::bitcoin::Txid;
use chainhook_sdk::indexer::bitcoin::{standardize_bitcoin_block, BitcoinBlockFullBreakdown};
use chainhook_sdk::types::{
BitcoinTransactionData, OrdinalInscriptionCurseType, OrdinalInscriptionRevealData,
OrdinalOperation,
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, OrdinalInscriptionCurseType,
OrdinalInscriptionRevealData, OrdinalOperation,
};
use chainhook_sdk::utils::Context;
use chainhook_sdk::{
@@ -310,10 +311,7 @@ pub fn get_inscriptions_from_witness(
curse_type: inscription.curse.take(),
};
match &payload.curse_type {
Some(_) => Some(OrdinalOperation::CursedInscriptionRevealed(payload)),
None => Some(OrdinalOperation::InscriptionRevealed(payload)),
}
Some(OrdinalOperation::InscriptionRevealed(payload))
}
pub fn get_inscriptions_from_standardized_tx(
@@ -377,3 +375,46 @@ fn test_ordinal_inscription_parsing() {
println!("{:?}", inscription);
}
pub fn parse_ordinals_and_standardize_block(
raw_block: BitcoinBlockFullBreakdown,
network: &BitcoinNetwork,
ctx: &Context,
) -> Result<BitcoinBlockData, (String, bool)> {
let mut ordinal_operations = BTreeMap::new();
for tx in raw_block.tx.iter() {
ordinal_operations.insert(tx.txid.to_string(), get_inscriptions_from_full_tx(&tx, ctx));
}
let mut block = standardize_bitcoin_block(raw_block, network, ctx)?;
for tx in block.transactions.iter_mut() {
if let Some(ordinal_operations) =
ordinal_operations.remove(tx.transaction_identifier.get_hash_bytes_str())
{
tx.metadata.ordinal_operations = ordinal_operations;
}
}
Ok(block)
}
pub fn parse_inscriptions_in_standardized_block(block: &mut BitcoinBlockData, ctx: &Context) {
for tx in block.transactions.iter_mut() {
tx.metadata.ordinal_operations = get_inscriptions_from_standardized_tx(tx, ctx);
}
}
pub fn get_inscriptions_revealed_in_block(
block: &BitcoinBlockData,
) -> Vec<&OrdinalInscriptionRevealData> {
let mut ops = vec![];
for tx in block.transactions.iter() {
for op in tx.metadata.ordinal_operations.iter() {
if let OrdinalOperation::InscriptionRevealed(op) = op {
ops.push(op);
}
}
}
ops
}

View File

@@ -0,0 +1,689 @@
use std::{
collections::{BTreeMap, HashMap, VecDeque},
hash::BuildHasherDefault,
sync::Arc,
};
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier,
OrdinalInscriptionCurseType, OrdinalOperation, TransactionIdentifier,
},
utils::Context,
};
use dashmap::DashMap;
use fxhash::FxHasher;
use rusqlite::{Connection, Transaction};
use crate::{
core::HordConfig,
db::{
find_blessed_inscription_with_ordinal_number,
find_latest_cursed_inscription_number_at_block_height,
find_latest_inscription_number_at_block_height, format_satpoint_to_watch,
insert_new_inscriptions_from_block_in_inscriptions_and_locations, LazyBlockTransaction,
TraversalResult,
},
ord::height::Height,
};
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::sync::mpsc::channel;
use crate::db::find_all_inscriptions_in_block;
use super::{
inscription_tracking::augment_transaction_with_ordinals_transfers_data,
satoshi_numbering::compute_satoshi_number,
};
pub fn retrieve_inscribed_satoshi_points_from_block_v3(
block: &BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
inscriptions_db_tx: &Transaction,
hord_config: &HordConfig,
ctx: &Context,
) -> Result<bool, String> {
let (mut transactions_ids, l1_cache_hits) =
get_transactions_to_process(block, cache_l1, inscriptions_db_tx, ctx);
let inner_ctx = if hord_config.logs.ordinals_internals {
ctx.clone()
} else {
Context::empty()
};
let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty();
let thread_max = hord_config.ingestion_thread_max * 2;
if has_transactions_to_process {
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
let (traversal_tx, traversal_rx) = channel();
let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
for thread_index in 0..thread_max {
let (tx, rx) = channel();
tx_thread_pool.push(tx);
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = inner_ctx.clone();
let moved_hord_db_path = hord_config.db_path.clone();
let local_cache = cache_l2.clone();
let handle = hiro_system_kit::thread_named("Worker")
.spawn(move || {
while let Ok(Some((
transaction_id,
block_identifier,
input_index,
prioritary,
))) = rx.recv()
{
let traversal: Result<TraversalResult, String> = compute_satoshi_number(
&moved_hord_db_path,
&block_identifier,
&transaction_id,
input_index,
0,
&local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
}
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
}
// Empty cache
let mut thread_index = 0;
for key in l1_cache_hits.iter() {
if let Some(entry) = cache_l1.remove(key) {
let _ = traversal_tx.send((Ok(entry), true, thread_index));
thread_index = (thread_index + 1) % thread_max;
}
}
ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})",
block.block_identifier.index,
transactions_ids.len(),
l1_cache_hits.len(),
next_blocks.len(),
cache_l1.len(),
cache_l2.len(),
)
});
let mut rng = thread_rng();
transactions_ids.shuffle(&mut rng);
let mut priority_queue = VecDeque::new();
let mut warmup_queue = VecDeque::new();
for (transaction_id, input_index) in transactions_ids.into_iter() {
priority_queue.push_back((
transaction_id,
block.block_identifier.clone(),
input_index,
true,
));
}
// Feed each workers with 2 workitems each
for thread_index in 0..thread_max {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
for thread_index in 0..thread_max {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
let mut next_block_iter = next_blocks.iter();
let mut traversals_received = 0;
while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() {
if prioritary {
traversals_received += 1;
}
match traversal_result {
Ok(traversal) => {
inner_ctx.try_log(|logger| {
info!(
logger,
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
)
});
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
),
traversal,
);
}
Err(e) => {
ctx.try_log(|logger| {
error!(logger, "Unable to compute inscription's Satoshi: {e}",)
});
}
}
if traversals_received == expected_traversals {
break;
}
if let Some(w) = priority_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(w) = warmup_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(next_block) = next_block_iter.next() {
let (mut transactions_ids, _) = get_transactions_to_process(
next_block,
cache_l1,
inscriptions_db_tx,
ctx,
);
ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to pre-process: {}",
block.block_identifier.index,
transactions_ids.len()
)
});
transactions_ids.shuffle(&mut rng);
for (transaction_id, input_index) in transactions_ids.into_iter() {
warmup_queue.push_back((
transaction_id,
next_block.block_identifier.clone(),
input_index,
false,
));
}
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
}
}
}
}
for tx in tx_thread_pool.iter() {
// Empty the queue
if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() {
if let Ok(traversal) = traversal_result {
inner_ctx.try_log(|logger| {
info!(
logger,
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
)
});
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
),
traversal,
);
}
}
let _ = tx.send(None);
}
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
});
} else {
ctx.try_log(|logger| {
info!(
logger,
"No inscriptions to index in block #{}", block.block_identifier.index
)
});
}
Ok(has_transactions_to_process)
}
fn get_transactions_to_process(
block: &BitcoinBlockData,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
inscriptions_db_tx: &Transaction,
ctx: &Context,
) -> (
Vec<(TransactionIdentifier, usize)>,
Vec<(TransactionIdentifier, usize)>,
) {
let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![];
let mut l1_cache_hits = vec![];
let mut known_transactions =
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_tx, ctx);
for tx in block.transactions.iter().skip(1) {
// Have a new inscription been revealed, if so, are looking at a re-inscription
for ordinal_event in tx.metadata.ordinal_operations.iter() {
let inscription_data = match ordinal_event {
OrdinalOperation::InscriptionRevealed(inscription_data) => inscription_data,
OrdinalOperation::InscriptionTransferred(_) => {
continue;
}
};
let key = (
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
);
if cache_l1.contains_key(&key) {
l1_cache_hits.push(key);
continue;
}
if let Some(entry) = known_transactions.remove(&key) {
continue;
}
// Enqueue for traversals
transactions_ids.push((
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
));
}
}
(transactions_ids, l1_cache_hits)
}
/// For each input of each transaction in the block, we retrieve the UTXO spent (outpoint_pre_transfer)
/// and we check using a `storage` (in-memory or sqlite) absctraction if we have some existing inscriptions
/// for this entry.
/// When this is the case, it means that an inscription_transfer event needs to be produced. We need to
/// compute the output index (if any) `post_transfer_output` that will now include the inscription.
/// When identifying the output index, we will also need to provide an updated offset for pin pointing
/// the satoshi location.
pub struct SequenceCursor {
blessed: Option<i64>,
cursed: Option<i64>,
inscriptions_db_conn: Connection,
current_block_height: u64,
}
impl SequenceCursor {
pub fn new(inscriptions_db_conn: Connection) -> SequenceCursor {
SequenceCursor {
blessed: None,
cursed: None,
inscriptions_db_conn,
current_block_height: 0,
}
}
pub fn reset(&mut self) {
self.blessed = None;
self.cursed = None;
self.current_block_height = 0;
}
pub fn pick_next(&mut self, cursed: bool, block_height: u64) -> i64 {
if block_height < self.current_block_height {
self.reset();
}
self.current_block_height = block_height;
match cursed {
true => self.pick_next_cursed(),
false => self.pick_next_blessed(),
}
}
fn pick_next_blessed(&mut self) -> i64 {
match self.blessed {
None => {
match find_latest_inscription_number_at_block_height(
&self.current_block_height,
&None,
&self.inscriptions_db_conn,
&Context::empty(),
) {
Ok(Some(inscription_number)) => {
self.blessed = Some(inscription_number);
inscription_number + 1
}
_ => {
self.blessed = Some(0);
0
}
}
}
Some(value) => value + 1,
}
}
fn pick_next_cursed(&mut self) -> i64 {
match self.cursed {
None => {
match find_latest_cursed_inscription_number_at_block_height(
&self.current_block_height,
&None,
&self.inscriptions_db_conn,
&Context::empty(),
) {
Ok(Some(inscription_number)) => {
self.cursed = Some(inscription_number);
inscription_number - 1
}
_ => {
self.cursed = Some(-1);
-1
}
}
}
Some(value) => value - 1,
}
}
pub fn increment_cursed(&mut self) {
self.cursed = Some(self.pick_next_cursed());
}
pub fn increment_blessed(&mut self) {
self.blessed = Some(self.pick_next_blessed())
}
}
pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
block: &mut BitcoinBlockData,
sequence_cursor: &mut SequenceCursor,
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
inscriptions_db_tx: &Transaction,
ctx: &Context,
) -> bool {
// Handle re-inscriptions
let mut reinscriptions_data = HashMap::new();
for (_, inscription_data) in inscriptions_data.iter() {
if let Some(inscription_id) = find_blessed_inscription_with_ordinal_number(
&inscription_data.ordinal_number,
inscriptions_db_tx,
ctx,
) {
reinscriptions_data.insert(inscription_data.ordinal_number, inscription_id);
}
}
let any_events = augment_block_with_ordinals_inscriptions_data(
block,
sequence_cursor,
inscriptions_data,
&reinscriptions_data,
&ctx,
);
// Store inscriptions
insert_new_inscriptions_from_block_in_inscriptions_and_locations(
block,
inscriptions_db_tx,
ctx,
);
any_events
}
pub fn augment_block_with_ordinals_inscriptions_data(
block: &mut BitcoinBlockData,
sequence_cursor: &mut SequenceCursor,
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
reinscriptions_data: &HashMap<u64, String>,
ctx: &Context,
) -> bool {
// Handle sat oveflows
let mut sats_overflows = VecDeque::new();
let mut any_event = false;
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
};
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
any_event |= augment_transaction_with_ordinals_inscriptions_data(
tx,
tx_index,
&block.block_identifier,
sequence_cursor,
&network,
inscriptions_data,
&mut sats_overflows,
&reinscriptions_data,
ctx,
);
}
// Handle sats overflow
while let Some(sats_overlow) = sats_overflows.pop_front() {
// TODO
}
any_event
}
pub fn augment_transaction_with_ordinals_inscriptions_data(
tx: &mut BitcoinTransactionData,
tx_index: usize,
block_identifier: &BlockIdentifier,
sequence_cursor: &mut SequenceCursor,
network: &Network,
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
sats_overflows: &mut VecDeque<(usize, usize)>,
reinscriptions_data: &HashMap<u64, String>,
ctx: &Context,
) -> bool {
let any_event = tx.metadata.ordinal_operations.is_empty() == false;
let mut ordinals_ops_indexes_to_discard = VecDeque::new();
for (op_index, op) in tx.metadata.ordinal_operations.iter_mut().enumerate() {
let (mut is_cursed, inscription) = match op {
OrdinalOperation::InscriptionRevealed(inscription) => {
(inscription.curse_type.as_ref().is_some(), inscription)
}
OrdinalOperation::InscriptionTransferred(_) => continue,
};
let mut inscription_number = sequence_cursor.pick_next(is_cursed, block_identifier.index);
let transaction_identifier = tx.transaction_identifier.clone();
let traversal = match inscriptions_data
.remove(&(transaction_identifier, inscription.inscription_input_index))
{
Some(traversal) => traversal,
None => {
ctx.try_log(|logger| {
error!(
logger,
"Unable to retrieve cached inscription data for inscription {}",
tx.transaction_identifier.hash
);
});
ordinals_ops_indexes_to_discard.push_front(op_index);
continue;
}
};
let mut curse_type_override = None;
// Do we need to curse the inscription?
if !is_cursed {
// Is this inscription re-inscribing an existing blessed inscription?
if let Some(exisiting_inscription_id) =
reinscriptions_data.get(&traversal.ordinal_number)
{
ctx.try_log(|logger| {
info!(
logger,
"Satoshi #{} was inscribed with blessed inscription {}, cursing inscription {}",
traversal.ordinal_number,
exisiting_inscription_id,
traversal.get_inscription_id(),
);
});
is_cursed = true;
inscription_number = sequence_cursor.pick_next(is_cursed, block_identifier.index);
curse_type_override = Some(OrdinalInscriptionCurseType::Reinscription)
}
};
let outputs = &tx.metadata.outputs;
inscription.inscription_number = inscription_number;
inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset();
inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height();
inscription.ordinal_number = traversal.ordinal_number;
inscription.transfers_pre_inscription = traversal.transfers;
inscription.inscription_fee = tx.metadata.fee;
inscription.tx_index = tx_index;
inscription.curse_type = match curse_type_override {
Some(curse_type) => Some(curse_type),
None => inscription.curse_type.take(),
};
inscription.satpoint_post_inscription = format_satpoint_to_watch(
&traversal.transfer_data.transaction_identifier_location,
traversal.transfer_data.output_index,
traversal.transfer_data.inscription_offset_intra_output,
);
if let Some(output) = outputs.get(traversal.transfer_data.output_index) {
inscription.inscription_output_value = output.value;
inscription.inscriber_address = {
let script_pub_key = output.get_script_pubkey_hex();
match Script::from_hex(&script_pub_key) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(a) => Some(a.to_string()),
_ => None,
},
_ => None,
}
};
} else {
ctx.try_log(|logger| {
warn!(
logger,
"Database corrupted, skipping cursed inscription => {:?} / {:?}",
traversal,
outputs
);
});
}
if traversal.ordinal_number == 0 {
// If the satoshi inscribed correspond to a sat overflow, we will store the inscription
// and assign an inscription number after the other inscriptions, to mimick the
// bug in ord.
sats_overflows.push_back((tx_index, op_index));
continue;
}
ctx.try_log(|logger| {
info!(
logger,
"Inscription {} (#{}) detected on Satoshi {} (block {}, {} transfers)",
inscription.inscription_id,
inscription.inscription_number,
inscription.ordinal_number,
block_identifier.index,
inscription.transfers_pre_inscription,
);
});
if is_cursed {
sequence_cursor.increment_cursed();
} else {
sequence_cursor.increment_blessed();
}
}
any_event
}
pub fn consolidate_transaction_with_pre_computed_inscription_data(
tx: &mut BitcoinTransactionData,
tx_index: usize,
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
_ctx: &Context,
) {
for operation in tx.metadata.ordinal_operations.iter_mut() {
let inscription = match operation {
OrdinalOperation::InscriptionRevealed(ref mut inscription) => inscription,
OrdinalOperation::InscriptionTransferred(_) => continue,
};
let Some(traversal) = inscriptions_data.remove(&(tx.transaction_identifier.clone(), inscription.inscription_input_index)) else {
continue;
};
inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset();
inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height();
inscription.ordinal_number = traversal.ordinal_number;
inscription.inscription_number = traversal.inscription_number;
inscription.transfers_pre_inscription = traversal.transfers;
inscription.inscription_fee = tx.metadata.fee;
inscription.tx_index = tx_index;
inscription.satpoint_post_inscription = format_satpoint_to_watch(
&traversal.transfer_data.transaction_identifier_location,
traversal.transfer_data.output_index,
traversal.transfer_data.inscription_offset_intra_output,
);
if inscription.inscription_number < 0 {
inscription.curse_type = Some(OrdinalInscriptionCurseType::Unknown);
}
}
}
pub fn consolidate_block_with_pre_computed_ordinals_data(
block: &mut BitcoinBlockData,
inscriptions_db_tx: &Transaction,
include_transfers: bool,
ctx: &Context,
) {
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
};
let coinbase_subsidy = Height(block.block_identifier.index).subsidy();
let coinbase_txid = &block.transactions[0].transaction_identifier.clone();
let mut cumulated_fees = 0;
let mut inscriptions_data =
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_tx, ctx);
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
// Add inscriptions data
consolidate_transaction_with_pre_computed_inscription_data(
tx,
tx_index,
&mut inscriptions_data,
ctx,
);
// Add transfers data
if include_transfers {
let _ = augment_transaction_with_ordinals_transfers_data(
tx,
tx_index,
&block.block_identifier,
&network,
&coinbase_txid,
coinbase_subsidy,
&mut cumulated_fees,
inscriptions_db_tx,
ctx,
);
}
}
}

View File

@@ -0,0 +1,212 @@
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier,
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
},
utils::Context,
};
use crate::{
core::{compute_next_satpoint_data, SatPosition},
db::{
find_inscriptions_at_wached_outpoint, format_outpoint_to_watch,
insert_transfer_in_locations_tx,
},
ord::height::Height,
};
use rusqlite::Transaction;
pub fn augment_block_with_ordinals_transfer_data(
block: &mut BitcoinBlockData,
inscriptions_db_tx: &Transaction,
update_db_tx: bool,
ctx: &Context,
) -> bool {
let mut any_event = false;
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
};
let coinbase_subsidy = Height(block.block_identifier.index).subsidy();
let coinbase_txid = &block.transactions[0].transaction_identifier.clone();
let mut cumulated_fees = 0;
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
let transfers = augment_transaction_with_ordinals_transfers_data(
tx,
tx_index,
&block.block_identifier,
&network,
&coinbase_txid,
coinbase_subsidy,
&mut cumulated_fees,
inscriptions_db_tx,
ctx,
);
any_event |= !transfers.is_empty();
if update_db_tx {
// Store transfers between each iteration
for transfer_data in transfers.into_iter() {
insert_transfer_in_locations_tx(
&transfer_data,
&block.block_identifier,
&inscriptions_db_tx,
&ctx,
);
}
}
}
any_event
}
pub fn augment_transaction_with_ordinals_transfers_data(
tx: &mut BitcoinTransactionData,
tx_index: usize,
block_identifier: &BlockIdentifier,
network: &Network,
coinbase_txid: &TransactionIdentifier,
coinbase_subsidy: u64,
cumulated_fees: &mut u64,
inscriptions_db_tx: &Transaction,
ctx: &Context,
) -> Vec<OrdinalInscriptionTransferData> {
let mut transfers = vec![];
for (input_index, input) in tx.metadata.inputs.iter().enumerate() {
let outpoint_pre_transfer = format_outpoint_to_watch(
&input.previous_output.txid,
input.previous_output.vout as usize,
);
let entries =
match find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &inscriptions_db_tx)
{
Ok(entries) => entries,
Err(e) => {
ctx.try_log(|logger| warn!(logger, "unable query inscriptions: {e}"));
continue;
}
};
// For each satpoint inscribed retrieved, we need to compute the next
// outpoint to watch
for watched_satpoint in entries.into_iter() {
let satpoint_pre_transfer =
format!("{}:{}", outpoint_pre_transfer, watched_satpoint.offset);
// Question is: are inscriptions moving to a new output,
// burnt or lost in fees and transfered to the miner?
let inputs = tx
.metadata
.inputs
.iter()
.map(|o| o.previous_output.value)
.collect::<_>();
let outputs = tx.metadata.outputs.iter().map(|o| o.value).collect::<_>();
let post_transfer_data =
compute_next_satpoint_data(input_index, watched_satpoint.offset, &inputs, &outputs);
let (
outpoint_post_transfer,
offset_post_transfer,
updated_address,
post_transfer_output_value,
) = match post_transfer_data {
SatPosition::Output((output_index, offset)) => {
let outpoint =
format_outpoint_to_watch(&tx.transaction_identifier, output_index);
let script_pub_key_hex =
tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match Script::from_hex(&script_pub_key_hex) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(address) => Some(address.to_string()),
Err(e) => {
ctx.try_log(|logger| {
warn!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
)
});
None
}
},
Err(e) => {
ctx.try_log(|logger| {
warn!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
)
});
None
}
};
// At this point we know that inscriptions are being moved.
ctx.try_log(|logger| {
info!(
logger,
"Inscription {} moved from {} to {} (block: {})",
watched_satpoint.inscription_id,
satpoint_pre_transfer,
outpoint,
block_identifier.index,
)
});
(
outpoint,
offset,
updated_address,
Some(tx.metadata.outputs[output_index].value),
)
}
SatPosition::Fee(offset) => {
// Get Coinbase TX
let total_offset = coinbase_subsidy + *cumulated_fees + offset;
let outpoint = format_outpoint_to_watch(&coinbase_txid, 0);
ctx.try_log(|logger| {
info!(
logger,
"Inscription {} spent in fees ({}+{}+{})",
watched_satpoint.inscription_id,
coinbase_subsidy,
cumulated_fees,
offset
)
});
(outpoint, total_offset, None, None)
}
};
let satpoint_post_transfer =
format!("{}:{}", outpoint_post_transfer, offset_post_transfer);
let transfer_data = OrdinalInscriptionTransferData {
inscription_id: watched_satpoint.inscription_id.clone(),
updated_address,
tx_index,
satpoint_pre_transfer,
satpoint_post_transfer,
post_transfer_output_value,
};
transfers.push(transfer_data.clone());
// Attach transfer event
tx.metadata
.ordinal_operations
.push(OrdinalOperation::InscriptionTransferred(transfer_data));
}
}
*cumulated_fees += tx.metadata.fee;
transfers
}

View File

@@ -1,3 +1,4 @@
pub mod inscribing;
pub mod numbering;
pub mod sequencing;
pub mod inscription_parsing;
pub mod inscription_sequencing;
pub mod inscription_tracking;
pub mod satoshi_numbering;

View File

@@ -13,7 +13,7 @@ use crate::db::{
use crate::db::{LazyBlockTransaction, TraversalResult};
use crate::ord::height::Height;
pub fn retrieve_satoshi_point_using_lazy_storage_v3(
pub fn compute_satoshi_number(
blocks_db_dir: &PathBuf,
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,

View File

@@ -1,836 +0,0 @@
use std::{
collections::{HashMap, VecDeque},
hash::BuildHasherDefault,
sync::Arc,
};
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType,
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
},
utils::Context,
};
use dashmap::DashMap;
use fxhash::FxHasher;
use rusqlite::{Connection, Transaction};
use crate::{
core::{
compute_next_satpoint_data, get_inscriptions_revealed_in_block, HordConfig, SatPosition,
},
db::{
find_inscription_with_ordinal_number, find_inscriptions_at_wached_outpoint,
find_latest_cursed_inscription_number_at_block_height,
find_latest_inscription_number_at_block_height, format_outpoint_to_watch,
format_satpoint_to_watch, get_any_entry_in_ordinal_activities,
insert_entry_in_inscriptions, insert_transfer_in_locations_tx, InscriptionHeigthHint,
LazyBlockTransaction, TraversalResult,
},
ord::height::Height,
};
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::sync::mpsc::channel;
use crate::db::find_all_inscriptions_in_block;
use super::numbering::retrieve_satoshi_point_using_lazy_storage_v3;
pub fn retrieve_inscribed_satoshi_points_from_block_v3(
block: &BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
existing_inscriptions: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
inscriptions_db_conn: &mut Connection,
hord_config: &HordConfig,
ctx: &Context,
) -> Result<bool, String> {
let (mut transactions_ids, l1_cache_hits) = get_transactions_to_process(
block,
cache_l1,
existing_inscriptions,
inscriptions_db_conn,
ctx,
);
let inner_ctx = if hord_config.logs.ordinals_internals {
ctx.clone()
} else {
Context::empty()
};
let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty();
let thread_max = hord_config.ingestion_thread_max * 2;
if has_transactions_to_process {
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
let (traversal_tx, traversal_rx) = channel();
let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
for thread_index in 0..thread_max {
let (tx, rx) = channel();
tx_thread_pool.push(tx);
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = inner_ctx.clone();
let moved_hord_db_path = hord_config.db_path.clone();
let local_cache = cache_l2.clone();
let handle = hiro_system_kit::thread_named("Worker")
.spawn(move || {
while let Ok(Some((
transaction_id,
block_identifier,
input_index,
prioritary,
))) = rx.recv()
{
let traversal: Result<TraversalResult, String> =
retrieve_satoshi_point_using_lazy_storage_v3(
&moved_hord_db_path,
&block_identifier,
&transaction_id,
input_index,
0,
&local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
}
})
.expect("unable to spawn thread");
thread_pool_handles.push(handle);
}
// Empty cache
let mut thread_index = 0;
for key in l1_cache_hits.iter() {
if let Some(entry) = cache_l1.remove(key) {
let _ = traversal_tx.send((Ok(entry), true, thread_index));
thread_index = (thread_index + 1) % thread_max;
}
}
ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})",
block.block_identifier.index,
transactions_ids.len(),
l1_cache_hits.len(),
next_blocks.len(),
cache_l1.len(),
cache_l2.len(),
)
});
let mut rng = thread_rng();
transactions_ids.shuffle(&mut rng);
let mut priority_queue = VecDeque::new();
let mut warmup_queue = VecDeque::new();
for (transaction_id, input_index) in transactions_ids.into_iter() {
priority_queue.push_back((
transaction_id,
block.block_identifier.clone(),
input_index,
true,
));
}
// Feed each workers with 2 workitems each
for thread_index in 0..thread_max {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
for thread_index in 0..thread_max {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
}
let mut next_block_iter = next_blocks.iter();
let mut traversals_received = 0;
while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() {
if prioritary {
traversals_received += 1;
}
match traversal_result {
Ok(traversal) => {
inner_ctx.try_log(|logger| {
info!(
logger,
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
)
});
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
),
traversal,
);
}
Err(e) => {
ctx.try_log(|logger| {
error!(logger, "Unable to compute inscription's Satoshi: {e}",)
});
}
}
if traversals_received == expected_traversals {
break;
}
if let Some(w) = priority_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(w) = warmup_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(next_block) = next_block_iter.next() {
let (mut transactions_ids, _) = get_transactions_to_process(
next_block,
cache_l1,
existing_inscriptions,
inscriptions_db_conn,
ctx,
);
ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to pre-process: {}",
block.block_identifier.index,
transactions_ids.len()
)
});
transactions_ids.shuffle(&mut rng);
for (transaction_id, input_index) in transactions_ids.into_iter() {
warmup_queue.push_back((
transaction_id,
next_block.block_identifier.clone(),
input_index,
false,
));
}
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
}
}
}
}
for tx in tx_thread_pool.iter() {
// Empty the queue
if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() {
if let Ok(traversal) = traversal_result {
inner_ctx.try_log(|logger| {
info!(
logger,
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
)
});
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
),
traversal,
);
}
}
let _ = tx.send(None);
}
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
});
} else {
ctx.try_log(|logger| {
info!(
logger,
"No inscriptions to index in block #{}", block.block_identifier.index
)
});
}
Ok(has_transactions_to_process)
}
fn get_transactions_to_process(
block: &BitcoinBlockData,
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
existing_inscriptions: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
inscriptions_db_conn: &mut Connection,
ctx: &Context,
) -> (
Vec<(TransactionIdentifier, usize)>,
Vec<(TransactionIdentifier, usize)>,
) {
let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![];
let mut l1_cache_hits = vec![];
let mut known_transactions =
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx);
for tx in block.transactions.iter().skip(1) {
// Have a new inscription been revealed, if so, are looking at a re-inscription
for ordinal_event in tx.metadata.ordinal_operations.iter() {
let inscription_data = match ordinal_event {
OrdinalOperation::InscriptionRevealed(inscription_data) => inscription_data,
OrdinalOperation::CursedInscriptionRevealed(inscription_data) => inscription_data,
OrdinalOperation::InscriptionTransferred(_) => {
continue;
}
};
let key = (
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
);
if cache_l1.contains_key(&key) {
l1_cache_hits.push(key);
continue;
}
if let Some(entry) = known_transactions.remove(&key) {
existing_inscriptions.insert(key, entry);
continue;
}
// Enqueue for traversals
transactions_ids.push((
tx.transaction_identifier.clone(),
inscription_data.inscription_input_index,
));
}
}
(transactions_ids, l1_cache_hits)
}
pub fn update_hord_db_and_augment_bitcoin_block_v3(
new_block: &mut BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
inscription_height_hint: &mut InscriptionHeigthHint,
inscriptions_db_conn_rw: &mut Connection,
hord_config: &HordConfig,
ctx: &Context,
) -> Result<(), String> {
let mut existing_inscriptions = HashMap::new();
let transactions_processed = retrieve_inscribed_satoshi_points_from_block_v3(
&new_block,
&next_blocks,
cache_l1,
cache_l2,
&mut existing_inscriptions,
inscriptions_db_conn_rw,
&hord_config,
ctx,
)?;
if !transactions_processed {
return Ok(());
}
let discard_changes: bool = get_any_entry_in_ordinal_activities(
&new_block.block_identifier.index,
inscriptions_db_conn_rw,
ctx,
);
let inner_ctx = if discard_changes {
Context::empty()
} else {
if hord_config.logs.ordinals_internals {
ctx.clone()
} else {
Context::empty()
}
};
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
let any_inscription_revealed =
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
new_block,
&transaction,
&cache_l1,
inscription_height_hint,
&inner_ctx,
)?;
// Have inscriptions been transfered?
let any_inscription_transferred =
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
new_block,
&transaction,
&inner_ctx,
)?;
if !any_inscription_revealed && !any_inscription_transferred {
return Ok(());
}
if discard_changes {
ctx.try_log(|logger| {
info!(
logger,
"Ignoring updates for block #{}, activities present in database",
new_block.block_identifier.index,
)
});
} else {
ctx.try_log(|logger| {
info!(
logger,
"Saving updates for block {}", new_block.block_identifier.index,
)
});
transaction.commit().unwrap();
ctx.try_log(|logger| {
info!(
logger,
"Updates saved for block {}", new_block.block_identifier.index,
)
});
}
let inscriptions_revealed = get_inscriptions_revealed_in_block(&new_block)
.iter()
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();
ctx.try_log(|logger| {
info!(
logger,
"Block #{} revealed {} inscriptions [{}]",
new_block.block_identifier.index,
inscriptions_revealed.len(),
inscriptions_revealed.join(", ")
)
});
Ok(())
}
/// For each input of each transaction in the block, we retrieve the UTXO spent (outpoint_pre_transfer)
/// and we check using a `storage` (in-memory or sqlite) absctraction if we have some existing inscriptions
/// for this entry.
/// When this is the case, it means that an inscription_transfer event needs to be produced. We need to
/// compute the output index (if any) `post_transfer_output` that will now include the inscription.
/// When identifying the output index, we will also need to provide an updated offset for pin pointing
/// the satoshi location.
pub fn update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
block: &mut BitcoinBlockData,
hord_db_tx: &Transaction,
ctx: &Context,
) -> Result<bool, String> {
let mut storage_updated = false;
let mut cumulated_fees = 0;
let subsidy = Height(block.block_identifier.index).subsidy();
let coinbase_txid = &block.transactions[0].transaction_identifier.clone();
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
};
for (tx_index, new_tx) in block.transactions.iter_mut().skip(1).enumerate() {
for (input_index, input) in new_tx.metadata.inputs.iter().enumerate() {
let outpoint_pre_transfer = format_outpoint_to_watch(
&input.previous_output.txid,
input.previous_output.vout as usize,
);
let entries =
find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &hord_db_tx)?;
// For each satpoint inscribed retrieved, we need to compute the next
// outpoint to watch
for watched_satpoint in entries.into_iter() {
let satpoint_pre_transfer =
format!("{}:{}", outpoint_pre_transfer, watched_satpoint.offset);
// Question is: are inscriptions moving to a new output,
// burnt or lost in fees and transfered to the miner?
let inputs = new_tx
.metadata
.inputs
.iter()
.map(|o| o.previous_output.value)
.collect::<_>();
let outputs = new_tx
.metadata
.outputs
.iter()
.map(|o| o.value)
.collect::<_>();
let post_transfer_data = compute_next_satpoint_data(
input_index,
watched_satpoint.offset,
&inputs,
&outputs,
);
let (
outpoint_post_transfer,
offset_post_transfer,
updated_address,
post_transfer_output_value,
) = match post_transfer_data {
SatPosition::Output((output_index, offset)) => {
let outpoint =
format_outpoint_to_watch(&new_tx.transaction_identifier, output_index);
let script_pub_key_hex =
new_tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match Script::from_hex(&script_pub_key_hex) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(address) => Some(address.to_string()),
Err(e) => {
ctx.try_log(|logger| {
warn!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}", e.to_string()
)
});
None
}
},
Err(e) => {
ctx.try_log(|logger| {
warn!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
)
});
None
}
};
// At this point we know that inscriptions are being moved.
ctx.try_log(|logger| {
info!(
logger,
"Inscription {} moved from {} to {} (block: {})",
watched_satpoint.inscription_id,
satpoint_pre_transfer,
outpoint,
block.block_identifier.index,
)
});
(
outpoint,
offset,
updated_address,
Some(new_tx.metadata.outputs[output_index].value),
)
}
SatPosition::Fee(offset) => {
// Get Coinbase TX
let total_offset = subsidy + cumulated_fees + offset;
let outpoint = format_outpoint_to_watch(&coinbase_txid, 0);
ctx.try_log(|logger| {
info!(
logger,
"Inscription {} spent in fees ({}+{}+{})",
watched_satpoint.inscription_id,
subsidy,
cumulated_fees,
offset
)
});
(outpoint, total_offset, None, None)
}
};
let satpoint_post_transfer =
format!("{}:{}", outpoint_post_transfer, offset_post_transfer);
let transfer_data = OrdinalInscriptionTransferData {
inscription_id: watched_satpoint.inscription_id.clone(),
updated_address,
tx_index,
satpoint_pre_transfer,
satpoint_post_transfer,
post_transfer_output_value,
};
// Update watched outpoint
insert_transfer_in_locations_tx(
&transfer_data,
&block.block_identifier,
&hord_db_tx,
&ctx,
);
storage_updated = true;
// Attach transfer event
new_tx
.metadata
.ordinal_operations
.push(OrdinalOperation::InscriptionTransferred(transfer_data));
}
}
cumulated_fees += new_tx.metadata.fee;
}
Ok(storage_updated)
}
pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
block: &mut BitcoinBlockData,
transaction: &Transaction,
cache_l1: &HashMap<(TransactionIdentifier, usize), TraversalResult>,
inscription_height_hint: &mut InscriptionHeigthHint,
ctx: &Context,
) -> Result<bool, String> {
let mut storage_updated = false;
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
};
let mut latest_cursed_inscription_loaded = false;
let mut latest_cursed_inscription_number = 0;
let mut cursed_inscription_sequence_updated = false;
let mut latest_blessed_inscription_loaded = false;
let mut latest_blessed_inscription_number = 0;
let mut blessed_inscription_sequence_updated = false;
let mut sats_overflow = vec![];
for (tx_index, new_tx) in block.transactions.iter_mut().skip(1).enumerate() {
let mut ordinals_events_indexes_to_discard = VecDeque::new();
let mut ordinals_events_indexes_to_curse = VecDeque::new();
// Have a new inscription been revealed, if so, are looking at a re-inscription
for (ordinal_event_index, ordinal_event) in
new_tx.metadata.ordinal_operations.iter_mut().enumerate()
{
let (inscription, is_cursed) = match ordinal_event {
OrdinalOperation::InscriptionRevealed(inscription) => (inscription, false),
OrdinalOperation::CursedInscriptionRevealed(inscription) => (inscription, true),
OrdinalOperation::InscriptionTransferred(_) => continue,
};
let mut inscription_number = if is_cursed {
latest_cursed_inscription_number = if !latest_cursed_inscription_loaded {
latest_cursed_inscription_loaded = true;
match find_latest_cursed_inscription_number_at_block_height(
&block.block_identifier.index,
&inscription_height_hint.cursed,
&transaction,
&ctx,
)? {
None => -1,
Some(inscription_number) => inscription_number - 1,
}
} else {
latest_cursed_inscription_number - 1
};
latest_cursed_inscription_number
} else {
latest_blessed_inscription_number = if !latest_blessed_inscription_loaded {
latest_blessed_inscription_loaded = true;
match find_latest_inscription_number_at_block_height(
&block.block_identifier.index,
&inscription_height_hint.blessed,
&transaction,
&ctx,
)? {
None => 0,
Some(inscription_number) => inscription_number + 1,
}
} else {
latest_blessed_inscription_number + 1
};
latest_blessed_inscription_number
};
let transaction_identifier = new_tx.transaction_identifier.clone();
let traversal = match cache_l1
.get(&(transaction_identifier, inscription.inscription_input_index))
{
Some(traversal) => traversal,
None => {
ctx.try_log(|logger| {
info!(
logger,
"Unable to retrieve cached inscription data for inscription {}",
new_tx.transaction_identifier.hash
);
});
ordinals_events_indexes_to_discard.push_front(ordinal_event_index);
continue;
}
};
let outputs = &new_tx.metadata.outputs;
inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset();
inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height();
inscription.ordinal_number = traversal.ordinal_number;
inscription.inscription_number = traversal.inscription_number;
inscription.transfers_pre_inscription = traversal.transfers;
inscription.inscription_fee = new_tx.metadata.fee;
inscription.tx_index = tx_index;
inscription.satpoint_post_inscription = format_satpoint_to_watch(
&traversal.transfer_data.transaction_identifier_location,
traversal.transfer_data.output_index,
traversal.transfer_data.inscription_offset_intra_output,
);
if let Some(output) = outputs.get(traversal.transfer_data.output_index) {
inscription.inscription_output_value = output.value;
inscription.inscriber_address = {
let script_pub_key = output.get_script_pubkey_hex();
match Script::from_hex(&script_pub_key) {
Ok(script) => match Address::from_script(&script, network) {
Ok(a) => Some(a.to_string()),
_ => None,
},
_ => None,
}
};
} else {
ctx.try_log(|logger| {
warn!(
logger,
"Database corrupted, skipping cursed inscription => {:?} / {:?}",
traversal,
outputs
);
});
}
if traversal.ordinal_number == 0 {
// If the satoshi inscribed correspond to a sat overflow, we will store the inscription
// and assign an inscription number after the other inscriptions, to mimick the
// bug in ord.
sats_overflow.push(inscription.clone());
continue;
}
if let Some(_entry) =
find_inscription_with_ordinal_number(&traversal.ordinal_number, &transaction, &ctx)
{
ctx.try_log(|logger| {
info!(
logger,
"Transaction {} in block {} is overriding an existing inscription {}",
new_tx.transaction_identifier.hash,
block.block_identifier.index,
traversal.ordinal_number
);
});
inscription_number = if !latest_cursed_inscription_loaded {
latest_cursed_inscription_loaded = true;
match find_latest_cursed_inscription_number_at_block_height(
&block.block_identifier.index,
&inscription_height_hint.cursed,
&transaction,
&ctx,
)? {
None => -1,
Some(inscription_number) => inscription_number - 1,
}
} else {
latest_cursed_inscription_number - 1
};
inscription.curse_type = Some(OrdinalInscriptionCurseType::Reinscription);
if !is_cursed {
ordinals_events_indexes_to_curse.push_front(ordinal_event_index);
latest_blessed_inscription_number -= 1;
}
}
inscription.inscription_number = inscription_number;
ctx.try_log(|logger| {
info!(
logger,
"Inscription {} (#{}) detected on Satoshi {} (block {}, {} transfers)",
inscription.inscription_id,
inscription.inscription_number,
inscription.ordinal_number,
block.block_identifier.index,
inscription.transfers_pre_inscription,
);
});
insert_entry_in_inscriptions(&inscription, &block.block_identifier, &transaction, &ctx);
if inscription.curse_type.is_some() {
cursed_inscription_sequence_updated = true;
} else {
blessed_inscription_sequence_updated = true;
}
storage_updated = true;
}
for index in ordinals_events_indexes_to_curse.into_iter() {
match new_tx.metadata.ordinal_operations.remove(index) {
OrdinalOperation::InscriptionRevealed(inscription_data)
| OrdinalOperation::CursedInscriptionRevealed(inscription_data) => {
ctx.try_log(|logger| {
info!(
logger,
"Inscription {} (#{}) transitioned from blessed to cursed",
inscription_data.inscription_id,
inscription_data.inscription_number,
);
});
new_tx.metadata.ordinal_operations.insert(
index,
OrdinalOperation::CursedInscriptionRevealed(inscription_data),
);
}
_ => unreachable!(),
}
}
for index in ordinals_events_indexes_to_discard.into_iter() {
new_tx.metadata.ordinal_operations.remove(index);
}
}
for inscription in sats_overflow.iter_mut() {
inscription.inscription_number = latest_blessed_inscription_number;
ctx.try_log(|logger| {
info!(
logger,
"Inscription {} (#{}) detected on Satoshi overflow {} (block {}, {} transfers)",
inscription.inscription_id,
inscription.inscription_number,
inscription.ordinal_number,
block.block_identifier.index,
inscription.transfers_pre_inscription,
);
});
insert_entry_in_inscriptions(&inscription, &block.block_identifier, &transaction, &ctx);
latest_blessed_inscription_number += 1;
storage_updated = true;
if inscription.curse_type.is_some() {
cursed_inscription_sequence_updated = true;
} else {
blessed_inscription_sequence_updated = true;
}
}
if cursed_inscription_sequence_updated {
inscription_height_hint.cursed = Some(block.block_identifier.index);
}
if blessed_inscription_sequence_updated {
inscription_height_hint.blessed = Some(block.block_identifier.index);
}
Ok(storage_updated)
}

View File

@@ -19,22 +19,9 @@ use chainhook_sdk::{
utils::Context,
};
use crate::ord::sat::Sat;
#[derive(Clone, Debug)]
pub struct InscriptionHeigthHint {
pub cursed: Option<u64>,
pub blessed: Option<u64>,
}
impl InscriptionHeigthHint {
pub fn new() -> InscriptionHeigthHint {
InscriptionHeigthHint {
cursed: None,
blessed: None,
}
}
}
use crate::{
core::protocol::inscription_parsing::get_inscriptions_revealed_in_block, ord::sat::Sat,
};
fn get_default_hord_db_file_path(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
@@ -94,7 +81,7 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
}
}
}
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS locations (
@@ -107,7 +94,11 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
[],
) {
ctx.try_log(|logger| {
warn!(logger, "Unable to create table locations: {}", e.to_string())
warn!(
logger,
"Unable to create table locations: {}",
e.to_string()
)
});
} else {
if let Err(e) = conn.execute(
@@ -127,7 +118,7 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
}
}
}
conn
@@ -379,12 +370,42 @@ pub fn insert_entry_in_inscriptions(
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
}
insert_inscription_in_locations(
&inscription_data,
&block_identifier,
&inscriptions_db_conn_rw,
ctx,
);
}
pub fn insert_new_inscriptions_from_block_in_inscriptions_and_locations(
block: &BitcoinBlockData,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
for inscription_data in get_inscriptions_revealed_in_block(&block).iter() {
insert_entry_in_inscriptions(
inscription_data,
&block.block_identifier,
inscriptions_db_conn_rw,
&ctx,
);
insert_inscription_in_locations(
&inscription_data,
&block.block_identifier,
&inscriptions_db_conn_rw,
ctx,
);
}
}
pub fn insert_new_inscriptions_from_block_in_locations(
block: &BitcoinBlockData,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
for inscription_data in get_inscriptions_revealed_in_block(&block).iter() {
insert_inscription_in_locations(
inscription_data,
&block.block_identifier,
inscriptions_db_conn_rw,
&ctx,
);
}
}
pub fn insert_inscription_in_locations(
@@ -438,18 +459,18 @@ pub fn insert_transfer_in_locations(
pub fn get_any_entry_in_ordinal_activities(
block_height: &u64,
inscriptions_db_conn: &Connection,
inscriptions_db_tx: &Connection,
_ctx: &Context,
) -> bool {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
let mut stmt = inscriptions_db_tx
.prepare("SELECT DISTINCT block_height FROM inscriptions WHERE block_height = ?")
.unwrap();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(_)) = rows.next() {
return true;
}
let mut stmt = inscriptions_db_conn
let mut stmt = inscriptions_db_tx
.prepare("SELECT DISTINCT block_height FROM locations WHERE block_height = ?")
.unwrap();
let mut rows = stmt.query(args).unwrap();
@@ -539,7 +560,7 @@ pub fn find_latest_transfers_block_height(
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let block_height: u64 = row.get(0).unwrap();
return Some(block_height)
return Some(block_height);
}
None
}
@@ -682,14 +703,14 @@ pub fn find_latest_cursed_inscription_number_at_block_height(
Ok(None)
}
pub fn find_inscription_with_ordinal_number(
pub fn find_blessed_inscription_with_ordinal_number(
ordinal_number: &u64,
inscriptions_db_conn: &Connection,
_ctx: &Context,
) -> Option<String> {
let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT inscription_id FROM inscriptions WHERE ordinal_number = ? AND inscription_number > 0")
.prepare("SELECT inscription_id FROM inscriptions WHERE ordinal_number = ? AND inscription_number >= 0")
.unwrap();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
@@ -750,17 +771,17 @@ pub fn find_inscription_with_id(
pub fn find_all_inscriptions_in_block(
block_height: &u64,
inscriptions_db_conn: &Connection,
inscriptions_db_tx: &Connection,
ctx: &Context,
) -> BTreeMap<(TransactionIdentifier, usize), TraversalResult> {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
let mut stmt = inscriptions_db_tx
.prepare("SELECT inscription_number, ordinal_number, inscription_id FROM inscriptions where block_height = ? ORDER BY inscription_number ASC")
.unwrap();
let mut results = BTreeMap::new();
let mut rows = stmt.query(args).unwrap();
let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_conn, ctx);
let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_tx, ctx);
while let Ok(Some(row)) = rows.next() {
let inscription_number: i64 = row.get(0).unwrap();
let ordinal_number: u64 = row.get(1).unwrap();

View File

@@ -1,6 +1,8 @@
use crate::config::{Config, PredicatesApi};
use crate::core::pipeline::processors::inscription_indexing::re_augment_block_with_ordinals_operations;
use crate::core::{self, get_inscriptions_revealed_in_block};
use crate::core::protocol::inscription_parsing::{
get_inscriptions_revealed_in_block, parse_ordinals_and_standardize_block,
};
use crate::core::protocol::inscription_sequencing::consolidate_block_with_pre_computed_ordinals_data;
use crate::db::{get_any_entry_in_ordinal_activities, open_readonly_hord_db_conn};
use crate::download::download_ordinals_dataset_if_required;
use crate::service::{
@@ -75,7 +77,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
// Are we dealing with an ordinals-based predicate?
// If so, we could use the ordinal storage to provide a set of hints.
let inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;
let mut inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
@@ -109,7 +111,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
let block_breakdown =
download_and_parse_block_with_retry(&http_client, &block_hash, &bitcoin_config, ctx)
.await?;
let mut block = match core::parse_ordinals_and_standardize_block(
let mut block = match parse_ordinals_and_standardize_block(
block_breakdown,
&event_observer_config.bitcoin_network,
ctx,
@@ -124,8 +126,15 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
}
};
let empty_ctx = Context::empty();
re_augment_block_with_ordinals_operations(&mut block, &inscriptions_db_conn, &empty_ctx);
{
let inscriptions_db_tx = inscriptions_db_conn.transaction().unwrap();
consolidate_block_with_pre_computed_ordinals_data(
&mut block,
&inscriptions_db_tx,
true,
&ctx,
);
}
let inscriptions_revealed = get_inscriptions_revealed_in_block(&block)
.iter()

View File

@@ -6,13 +6,13 @@ use crate::core::pipeline::processors::inscription_indexing::process_blocks;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor;
use crate::core::pipeline::{download_and_pipeline_blocks, PostProcessorCommand};
use crate::core::protocol::inscription_parsing::parse_inscriptions_in_standardized_block;
use crate::core::protocol::inscription_sequencing::SequenceCursor;
use crate::core::{
new_traversals_lazy_cache, parse_inscriptions_in_standardized_block,
revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db,
new_traversals_lazy_cache, revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db,
};
use crate::db::{
find_latest_inscription_block_height, initialize_hord_db, insert_entry_in_blocks,
open_readonly_hord_db_conn, open_readwrite_hord_dbs, InscriptionHeigthHint, LazyBlock, find_latest_transfers_block_height,
insert_entry_in_blocks, open_readonly_hord_db_conn, open_readwrite_hord_dbs, LazyBlock,
};
use crate::scan::bitcoin::process_block_with_predicates;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
@@ -105,7 +105,6 @@ impl Service {
})
.expect("unable to spawn thread");
// let (cursor, tip) = {
// let inscriptions_db_conn =
// open_readonly_hord_db_conn(&self.config.expected_cache_path(), &self.ctx)?;
@@ -117,6 +116,7 @@ impl Service {
// };
// self.replay_transfers(cursor, tip, Some(tx_replayer.clone()))
// .await?;
self.update_state(Some(tx_replayer.clone())).await?;
// Catch-up with chain tip
@@ -259,12 +259,15 @@ impl Service {
parse_inscriptions_in_standardized_block(block, &ctx);
}
let inscriptions_db_conn =
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)
.expect("unable to open inscriptions db");
let mut sequence_cursor = SequenceCursor::new(inscriptions_db_conn);
let mut hint = InscriptionHeigthHint::new();
let updated_blocks = process_blocks(
&mut blocks,
&mut sequence_cursor,
&moved_traversals_cache,
&mut hint,
&mut inscriptions_db_conn_rw,
&config.get_hord_config(),
&None,