mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 22:43:06 +08:00
fix: address remaining issues
This commit is contained in:
@@ -14,8 +14,8 @@ redis = "0.21.5"
|
||||
serde-redis = "0.12.0"
|
||||
hex = "0.4.3"
|
||||
rand = "0.8.5"
|
||||
chainhook-sdk = { version = "=0.8.0", default-features = false, features = ["zeromq"] }
|
||||
# chainhook-sdk = { version = "=0.7.12", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
|
||||
chainhook-sdk = { version = "=0.8.2", default-features = false, features = ["zeromq"] }
|
||||
# chainhook-sdk = { version = "=0.8.2", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
|
||||
hiro-system-kit = "0.1.0"
|
||||
clap = { version = "3.2.23", features = ["derive"], optional = true }
|
||||
clap_generate = { version = "3.0.3", optional = true }
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::config::Config;
|
||||
use crate::core::pipeline::download_and_pipeline_blocks;
|
||||
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
|
||||
use crate::core::pipeline::processors::start_inscription_indexing_processor;
|
||||
use crate::core::{self};
|
||||
use crate::core::protocol::inscription_parsing::parse_ordinals_and_standardize_block;
|
||||
use crate::download::download_ordinals_dataset_if_required;
|
||||
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
|
||||
use crate::service::Service;
|
||||
@@ -11,7 +11,7 @@ use crate::service::Service;
|
||||
use crate::db::{
|
||||
delete_data_in_hord_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
|
||||
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
|
||||
find_latest_inscription_block_height, find_lazy_block_at_block_height, initialize_hord_db,
|
||||
find_latest_inscription_block_height, find_lazy_block_at_block_height,
|
||||
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
|
||||
open_readwrite_hord_db_conn_rocks_db,
|
||||
};
|
||||
@@ -742,7 +742,7 @@ pub async fn fetch_and_standardize_block(
|
||||
download_and_parse_block_with_retry(http_client, &block_hash, &bitcoin_config, &ctx)
|
||||
.await?;
|
||||
|
||||
core::parse_ordinals_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx)
|
||||
parse_ordinals_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx)
|
||||
.map_err(|(e, _)| e)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
pub mod pipeline;
|
||||
pub mod protocol;
|
||||
|
||||
use chainhook_sdk::types::{
|
||||
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionRevealData, OrdinalOperation,
|
||||
};
|
||||
use chainhook_sdk::types::{BitcoinBlockData, OrdinalOperation};
|
||||
use dashmap::DashMap;
|
||||
use fxhash::{FxBuildHasher, FxHasher};
|
||||
use rocksdb::DB;
|
||||
use rusqlite::Connection;
|
||||
use std::collections::BTreeMap;
|
||||
use std::hash::BuildHasherDefault;
|
||||
use std::ops::Div;
|
||||
use std::path::PathBuf;
|
||||
@@ -20,8 +17,6 @@ use chainhook_sdk::{
|
||||
|
||||
use crate::config::{Config, LogConfig};
|
||||
|
||||
use chainhook_sdk::indexer::bitcoin::{standardize_bitcoin_block, BitcoinBlockFullBreakdown};
|
||||
|
||||
use crate::db::{
|
||||
find_last_block_inserted, find_latest_inscription_block_height, initialize_hord_db,
|
||||
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db,
|
||||
@@ -32,10 +27,6 @@ use crate::db::{
|
||||
LazyBlockTransaction,
|
||||
};
|
||||
|
||||
use self::protocol::inscribing::{
|
||||
get_inscriptions_from_full_tx, get_inscriptions_from_standardized_tx,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct HordConfig {
|
||||
pub network_thread_max: usize,
|
||||
@@ -47,52 +38,6 @@ pub struct HordConfig {
|
||||
pub logs: LogConfig,
|
||||
}
|
||||
|
||||
pub fn parse_ordinals_and_standardize_block(
|
||||
raw_block: BitcoinBlockFullBreakdown,
|
||||
network: &BitcoinNetwork,
|
||||
ctx: &Context,
|
||||
) -> Result<BitcoinBlockData, (String, bool)> {
|
||||
let mut ordinal_operations = BTreeMap::new();
|
||||
|
||||
for tx in raw_block.tx.iter() {
|
||||
ordinal_operations.insert(tx.txid.to_string(), get_inscriptions_from_full_tx(&tx, ctx));
|
||||
}
|
||||
|
||||
let mut block = standardize_bitcoin_block(raw_block, network, ctx)?;
|
||||
|
||||
for tx in block.transactions.iter_mut() {
|
||||
if let Some(ordinal_operations) =
|
||||
ordinal_operations.remove(tx.transaction_identifier.get_hash_bytes_str())
|
||||
{
|
||||
tx.metadata.ordinal_operations = ordinal_operations;
|
||||
}
|
||||
}
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
pub fn parse_inscriptions_in_standardized_block(block: &mut BitcoinBlockData, ctx: &Context) {
|
||||
for tx in block.transactions.iter_mut() {
|
||||
tx.metadata.ordinal_operations = get_inscriptions_from_standardized_tx(tx, ctx);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_inscriptions_revealed_in_block(
|
||||
block: &BitcoinBlockData,
|
||||
) -> Vec<&OrdinalInscriptionRevealData> {
|
||||
let mut ops = vec![];
|
||||
for tx in block.transactions.iter() {
|
||||
for op in tx.metadata.ordinal_operations.iter() {
|
||||
if let OrdinalOperation::InscriptionRevealed(op) = op {
|
||||
ops.push(op);
|
||||
}
|
||||
if let OrdinalOperation::CursedInscriptionRevealed(op) = op {
|
||||
ops.push(op);
|
||||
}
|
||||
}
|
||||
}
|
||||
ops
|
||||
}
|
||||
|
||||
pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
block: &BitcoinBlockData,
|
||||
blocks_db_rw: &DB,
|
||||
@@ -106,8 +51,7 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
let tx = &block.transactions[block.transactions.len() - tx_index];
|
||||
for ordinal_event in tx.metadata.ordinal_operations.iter() {
|
||||
match ordinal_event {
|
||||
OrdinalOperation::InscriptionRevealed(data)
|
||||
| OrdinalOperation::CursedInscriptionRevealed(data) => {
|
||||
OrdinalOperation::InscriptionRevealed(data) => {
|
||||
// We remove any new inscription created
|
||||
remove_entry_from_inscriptions(
|
||||
&data.inscription_id,
|
||||
@@ -248,13 +192,13 @@ pub fn should_sync_hord_db(
|
||||
(end_block.min(200_000), 10_000)
|
||||
} else if start_block < 550_000 {
|
||||
(end_block.min(550_000), 1_000)
|
||||
} else {
|
||||
} else {
|
||||
(end_block, 100)
|
||||
};
|
||||
|
||||
if start_block < 767430 && end_block > 767430 {
|
||||
end_block = 767430;
|
||||
}
|
||||
}
|
||||
|
||||
if start_block <= end_block {
|
||||
Ok(Some((start_block, end_block, speed)))
|
||||
|
||||
@@ -16,7 +16,7 @@ use chainhook_sdk::indexer::bitcoin::{
|
||||
build_http_client, parse_downloaded_block, try_download_block_bytes_with_retry,
|
||||
};
|
||||
|
||||
use super::parse_ordinals_and_standardize_block;
|
||||
use super::protocol::inscription_parsing::parse_ordinals_and_standardize_block;
|
||||
|
||||
pub enum PostProcessorCommand {
|
||||
Start,
|
||||
|
||||
@@ -6,11 +6,12 @@ use std::{
|
||||
|
||||
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
|
||||
use crossbeam_channel::TryRecvError;
|
||||
use rocksdb::DB;
|
||||
|
||||
use crate::{
|
||||
config::Config,
|
||||
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
|
||||
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db},
|
||||
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db, LazyBlock},
|
||||
};
|
||||
|
||||
pub fn start_block_ingestion_processor(
|
||||
@@ -25,18 +26,14 @@ pub fn start_block_ingestion_processor(
|
||||
let ctx = ctx.clone();
|
||||
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
|
||||
.spawn(move || {
|
||||
let mut num_writes = 0;
|
||||
let blocks_db_rw =
|
||||
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();
|
||||
|
||||
let mut empty_cycles = 0;
|
||||
let mut tip: u64 = 0;
|
||||
|
||||
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
|
||||
info!(ctx.expect_logger(), "Start block indexing runloop");
|
||||
}
|
||||
|
||||
|
||||
loop {
|
||||
let (compacted_blocks, _) = match commands_rx.try_recv() {
|
||||
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
|
||||
@@ -62,40 +59,7 @@ pub fn start_block_ingestion_processor(
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
let batch_size = compacted_blocks.len();
|
||||
num_writes += batch_size;
|
||||
for (block_height, compacted_block) in compacted_blocks.into_iter() {
|
||||
tip = tip.max(block_height);
|
||||
insert_entry_in_blocks(
|
||||
block_height as u32,
|
||||
&compacted_block,
|
||||
&blocks_db_rw,
|
||||
&ctx,
|
||||
);
|
||||
}
|
||||
info!(ctx.expect_logger(), "{batch_size} blocks saved to disk (total: {tip})");
|
||||
|
||||
// Early return
|
||||
if num_writes >= 512 {
|
||||
ctx.try_log(|logger| {
|
||||
info!(logger, "Flushing DB to disk ({num_writes} inserts)");
|
||||
});
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "{}", e.to_string());
|
||||
});
|
||||
}
|
||||
num_writes = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Write blocks to disk, before traversals
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "{}", e.to_string());
|
||||
});
|
||||
}
|
||||
store_compacted_blocks(compacted_blocks, &blocks_db_rw, &ctx);
|
||||
}
|
||||
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
@@ -112,3 +76,22 @@ pub fn start_block_ingestion_processor(
|
||||
thread_handle: handle,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn store_compacted_blocks(
|
||||
mut compacted_blocks: Vec<(u64, LazyBlock)>,
|
||||
blocks_db_rw: &DB,
|
||||
ctx: &Context,
|
||||
) {
|
||||
compacted_blocks.sort_by(|(a, _), (b, _)| a.cmp(b));
|
||||
|
||||
for (block_height, compacted_block) in compacted_blocks.into_iter() {
|
||||
insert_entry_in_blocks(block_height as u32, &compacted_block, &blocks_db_rw, &ctx);
|
||||
info!(ctx.expect_logger(), "Block #{block_height} saved to disk");
|
||||
}
|
||||
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "{}", e.to_string());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,28 +1,36 @@
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
sync::Arc,
|
||||
thread::{sleep, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use chainhook_sdk::{
|
||||
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
|
||||
types::{
|
||||
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType,
|
||||
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
|
||||
},
|
||||
types::{BitcoinBlockData, TransactionIdentifier},
|
||||
utils::Context,
|
||||
};
|
||||
use crossbeam_channel::{Sender, TryRecvError};
|
||||
use rusqlite::Transaction;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use fxhash::FxHasher;
|
||||
use rusqlite::Connection;
|
||||
use std::collections::HashMap;
|
||||
use std::hash::BuildHasherDefault;
|
||||
|
||||
use crate::{
|
||||
core::{protocol::sequencing::update_hord_db_and_augment_bitcoin_block_v3, HordConfig},
|
||||
db::{find_all_inscriptions_in_block, find_all_transfers_in_block, format_satpoint_to_watch},
|
||||
core::{
|
||||
pipeline::processors::block_ingestion::store_compacted_blocks,
|
||||
protocol::{
|
||||
inscription_parsing::get_inscriptions_revealed_in_block,
|
||||
inscription_sequencing::{
|
||||
augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx,
|
||||
retrieve_inscribed_satoshi_points_from_block_v3, SequenceCursor,
|
||||
},
|
||||
inscription_tracking::augment_block_with_ordinals_transfer_data,
|
||||
},
|
||||
HordConfig,
|
||||
},
|
||||
db::{get_any_entry_in_ordinal_activities, open_readonly_hord_db_conn},
|
||||
};
|
||||
|
||||
use crate::db::{LazyBlockTransaction, TraversalResult};
|
||||
@@ -33,10 +41,7 @@ use crate::{
|
||||
new_traversals_lazy_cache,
|
||||
pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
|
||||
},
|
||||
db::{
|
||||
insert_entry_in_blocks, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
|
||||
InscriptionHeigthHint,
|
||||
},
|
||||
db::{open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db},
|
||||
};
|
||||
|
||||
pub fn start_inscription_indexing_processor(
|
||||
@@ -58,13 +63,13 @@ pub fn start_inscription_indexing_processor(
|
||||
let mut inscriptions_db_conn_rw =
|
||||
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
|
||||
let hord_config = config.get_hord_config();
|
||||
let mut num_writes = 0;
|
||||
let mut tip: u64 = 0;
|
||||
let blocks_db_rw =
|
||||
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();
|
||||
|
||||
let mut inscription_height_hint = InscriptionHeigthHint::new();
|
||||
let mut empty_cycles = 0;
|
||||
|
||||
let inscriptions_db_conn =
|
||||
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
|
||||
let mut sequence_cursor = SequenceCursor::new(inscriptions_db_conn);
|
||||
|
||||
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
|
||||
info!(ctx.expect_logger(), "Start inscription indexing runloop");
|
||||
@@ -94,55 +99,22 @@ pub fn start_inscription_indexing_processor(
|
||||
},
|
||||
};
|
||||
|
||||
let batch_size = compacted_blocks.len();
|
||||
num_writes += batch_size;
|
||||
for (block_height, compacted_block) in compacted_blocks.into_iter() {
|
||||
tip = tip.max(block_height);
|
||||
insert_entry_in_blocks(
|
||||
block_height as u32,
|
||||
&compacted_block,
|
||||
&blocks_db_rw,
|
||||
&ctx,
|
||||
);
|
||||
}
|
||||
info!(ctx.expect_logger(), "{batch_size} blocks saved to disk (total: {tip})");
|
||||
|
||||
// Early return
|
||||
if blocks.is_empty() {
|
||||
if num_writes >= 512 {
|
||||
ctx.try_log(|logger| {
|
||||
info!(logger, "Flushing DB to disk ({num_writes} inserts)");
|
||||
});
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "{}", e.to_string());
|
||||
});
|
||||
}
|
||||
num_writes = 0;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
store_compacted_blocks(compacted_blocks, &blocks_db_rw, &Context::empty());
|
||||
|
||||
info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());
|
||||
|
||||
// Write blocks to disk, before traversals
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "{}", e.to_string());
|
||||
});
|
||||
}
|
||||
garbage_collect_nth_block += blocks.len();
|
||||
|
||||
process_blocks(
|
||||
blocks = process_blocks(
|
||||
&mut blocks,
|
||||
&mut sequence_cursor,
|
||||
&cache_l2,
|
||||
&mut inscription_height_hint,
|
||||
&mut inscriptions_db_conn_rw,
|
||||
&hord_config,
|
||||
&post_processor,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
garbage_collect_nth_block += blocks.len();
|
||||
|
||||
// Clear L2 cache on a regular basis
|
||||
if garbage_collect_nth_block > garbage_collect_every_n_blocks {
|
||||
info!(
|
||||
@@ -154,12 +126,6 @@ pub fn start_inscription_indexing_processor(
|
||||
garbage_collect_nth_block = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = blocks_db_rw.flush() {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "{}", e.to_string());
|
||||
});
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
@@ -172,29 +138,88 @@ pub fn start_inscription_indexing_processor(
|
||||
|
||||
pub fn process_blocks(
|
||||
next_blocks: &mut Vec<BitcoinBlockData>,
|
||||
sequence_cursor: &mut SequenceCursor,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
|
||||
inscription_height_hint: &mut InscriptionHeigthHint,
|
||||
inscriptions_db_conn_rw: &mut Connection,
|
||||
hord_config: &HordConfig,
|
||||
post_processor: &Option<Sender<BitcoinBlockData>>,
|
||||
ctx: &Context,
|
||||
) -> Vec<BitcoinBlockData> {
|
||||
let mut cache_l1 = HashMap::new();
|
||||
let mut cache_l1 = BTreeMap::new();
|
||||
|
||||
let mut updated_blocks = vec![];
|
||||
|
||||
for _cursor in 0..next_blocks.len() {
|
||||
let inscriptions_db_tx: rusqlite::Transaction<'_> =
|
||||
inscriptions_db_conn_rw.transaction().unwrap();
|
||||
|
||||
let mut block = next_blocks.remove(0);
|
||||
|
||||
// We check before hand if some data were pre-existing, before processing
|
||||
let any_existing_activity = get_any_entry_in_ordinal_activities(
|
||||
&block.block_identifier.index,
|
||||
&inscriptions_db_tx,
|
||||
ctx,
|
||||
);
|
||||
|
||||
let _ = process_block(
|
||||
&mut block,
|
||||
&next_blocks,
|
||||
sequence_cursor,
|
||||
&mut cache_l1,
|
||||
cache_l2,
|
||||
inscription_height_hint,
|
||||
inscriptions_db_conn_rw,
|
||||
&inscriptions_db_tx,
|
||||
hord_config,
|
||||
ctx,
|
||||
);
|
||||
|
||||
let inscriptions_revealed = get_inscriptions_revealed_in_block(&block)
|
||||
.iter()
|
||||
.map(|d| d.inscription_number.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Block #{} revealed {} inscriptions [{}]",
|
||||
block.block_identifier.index,
|
||||
inscriptions_revealed.len(),
|
||||
inscriptions_revealed.join(", ")
|
||||
)
|
||||
});
|
||||
|
||||
if any_existing_activity {
|
||||
ctx.try_log(|logger| {
|
||||
warn!(
|
||||
logger,
|
||||
"Dropping updates for block #{}, activities present in database",
|
||||
block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
let _ = inscriptions_db_tx.rollback();
|
||||
} else {
|
||||
match inscriptions_db_tx.commit() {
|
||||
Ok(_) => {
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Updates saved for block {}", block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
error!(
|
||||
logger,
|
||||
"Unable to update changes in block #{}: {}",
|
||||
block.block_identifier.index,
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(post_processor_tx) = post_processor {
|
||||
let _ = post_processor_tx.send(block.clone());
|
||||
}
|
||||
@@ -206,104 +231,45 @@ pub fn process_blocks(
|
||||
pub fn process_block(
|
||||
block: &mut BitcoinBlockData,
|
||||
next_blocks: &Vec<BitcoinBlockData>,
|
||||
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
sequence_cursor: &mut SequenceCursor,
|
||||
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
|
||||
inscription_height_hint: &mut InscriptionHeigthHint,
|
||||
inscriptions_db_conn_rw: &mut Connection,
|
||||
inscriptions_db_tx: &Transaction,
|
||||
hord_config: &HordConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
update_hord_db_and_augment_bitcoin_block_v3(
|
||||
block,
|
||||
next_blocks,
|
||||
let any_processable_transactions = retrieve_inscribed_satoshi_points_from_block_v3(
|
||||
&block,
|
||||
&next_blocks,
|
||||
cache_l1,
|
||||
cache_l2,
|
||||
inscription_height_hint,
|
||||
inscriptions_db_conn_rw,
|
||||
hord_config,
|
||||
inscriptions_db_tx,
|
||||
&hord_config,
|
||||
ctx,
|
||||
)
|
||||
}
|
||||
)?;
|
||||
|
||||
pub fn re_augment_block_with_ordinals_operations(
|
||||
block: &mut BitcoinBlockData,
|
||||
inscriptions_db_conn: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
let network = match block.metadata.network {
|
||||
BitcoinNetwork::Mainnet => Network::Bitcoin,
|
||||
BitcoinNetwork::Regtest => Network::Regtest,
|
||||
BitcoinNetwork::Testnet => Network::Testnet,
|
||||
if !any_processable_transactions {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Always discard if we have some existing content at this block height (inscription or transfers)
|
||||
let inner_ctx = if hord_config.logs.ordinals_internals {
|
||||
ctx.clone()
|
||||
} else {
|
||||
Context::empty()
|
||||
};
|
||||
|
||||
// Restore inscriptions data
|
||||
let mut inscriptions =
|
||||
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx);
|
||||
|
||||
let mut should_become_cursed = vec![];
|
||||
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
|
||||
for (op_index, operation) in tx.metadata.ordinal_operations.iter_mut().enumerate() {
|
||||
let (inscription, is_cursed) = match operation {
|
||||
OrdinalOperation::CursedInscriptionRevealed(ref mut inscription) => {
|
||||
(inscription, true)
|
||||
}
|
||||
OrdinalOperation::InscriptionRevealed(ref mut inscription) => (inscription, false),
|
||||
OrdinalOperation::InscriptionTransferred(_) => continue,
|
||||
};
|
||||
|
||||
let Some(traversal) = inscriptions.remove(&(tx.transaction_identifier.clone(), inscription.inscription_input_index)) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset();
|
||||
inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height();
|
||||
inscription.ordinal_number = traversal.ordinal_number;
|
||||
inscription.inscription_number = traversal.inscription_number;
|
||||
inscription.transfers_pre_inscription = traversal.transfers;
|
||||
inscription.inscription_fee = tx.metadata.fee;
|
||||
inscription.tx_index = tx_index;
|
||||
inscription.satpoint_post_inscription = format_satpoint_to_watch(
|
||||
&traversal.transfer_data.transaction_identifier_location,
|
||||
traversal.transfer_data.output_index,
|
||||
traversal.transfer_data.inscription_offset_intra_output,
|
||||
);
|
||||
|
||||
let Some(output) = tx.metadata.outputs.get(traversal.transfer_data.output_index) else {
|
||||
continue;
|
||||
};
|
||||
inscription.inscription_output_value = output.value;
|
||||
inscription.inscriber_address = {
|
||||
let script_pub_key = output.get_script_pubkey_hex();
|
||||
match Script::from_hex(&script_pub_key) {
|
||||
Ok(script) => match Address::from_script(&script, network.clone()) {
|
||||
Ok(a) => Some(a.to_string()),
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
}
|
||||
};
|
||||
|
||||
if !is_cursed && inscription.inscription_number < 0 {
|
||||
inscription.curse_type = Some(OrdinalInscriptionCurseType::Reinscription);
|
||||
should_become_cursed.push((tx_index, op_index));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (tx_index, op_index) in should_become_cursed.into_iter() {
|
||||
let Some(tx) = block.transactions.get_mut(tx_index) else {
|
||||
continue;
|
||||
};
|
||||
let OrdinalOperation::InscriptionRevealed(inscription) = tx.metadata.ordinal_operations.remove(op_index) else {
|
||||
continue;
|
||||
};
|
||||
tx.metadata.ordinal_operations.insert(
|
||||
op_index,
|
||||
OrdinalOperation::CursedInscriptionRevealed(inscription),
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: Handle transfers
|
||||
// Handle inscriptions
|
||||
let _ = augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
|
||||
block,
|
||||
sequence_cursor,
|
||||
cache_l1,
|
||||
&inscriptions_db_tx,
|
||||
&inner_ctx,
|
||||
);
|
||||
|
||||
// Handle transfers
|
||||
let _ = augment_block_with_ordinals_transfer_data(block, inscriptions_db_tx, true, &inner_ctx);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,28 +1,24 @@
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
thread::{sleep, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use chainhook_sdk::{
|
||||
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
|
||||
types::{BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation},
|
||||
utils::Context,
|
||||
};
|
||||
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
|
||||
use crossbeam_channel::{Sender, TryRecvError};
|
||||
|
||||
use crate::{
|
||||
core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
|
||||
db::{
|
||||
find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_locations,
|
||||
parse_satpoint_to_watch, remove_entries_from_locations_at_block_height,
|
||||
},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::Config,
|
||||
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
|
||||
db::open_readwrite_hord_db_conn,
|
||||
core::{
|
||||
pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
|
||||
protocol::{
|
||||
inscription_sequencing::consolidate_block_with_pre_computed_ordinals_data,
|
||||
inscription_tracking::augment_block_with_ordinals_transfer_data,
|
||||
},
|
||||
},
|
||||
db::{
|
||||
insert_new_inscriptions_from_block_in_locations, open_readwrite_hord_db_conn,
|
||||
remove_entries_from_locations_at_block_height,
|
||||
},
|
||||
};
|
||||
|
||||
pub fn start_transfers_recomputing_processor(
|
||||
@@ -70,123 +66,33 @@ pub fn start_transfers_recomputing_processor(
|
||||
};
|
||||
|
||||
info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());
|
||||
let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap();
|
||||
|
||||
for block in blocks.iter_mut() {
|
||||
let network = match block.metadata.network {
|
||||
BitcoinNetwork::Mainnet => Network::Bitcoin,
|
||||
BitcoinNetwork::Regtest => Network::Regtest,
|
||||
BitcoinNetwork::Testnet => Network::Testnet,
|
||||
};
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Cleaning transfers from block {}", block.block_identifier.index
|
||||
);
|
||||
let inscriptions = find_all_inscriptions_in_block(
|
||||
&block.block_identifier.index,
|
||||
&inscriptions_db_conn_rw,
|
||||
consolidate_block_with_pre_computed_ordinals_data(
|
||||
block,
|
||||
&inscriptions_db_tx,
|
||||
false,
|
||||
&ctx,
|
||||
);
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"{} inscriptions retrieved at block {}",
|
||||
inscriptions.len(),
|
||||
block.block_identifier.index
|
||||
);
|
||||
let mut operations = BTreeMap::new();
|
||||
|
||||
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
|
||||
|
||||
remove_entries_from_locations_at_block_height(
|
||||
&block.block_identifier.index,
|
||||
&transaction,
|
||||
&inscriptions_db_tx,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
for (_, entry) in inscriptions.iter() {
|
||||
let inscription_id = entry.get_inscription_id();
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Processing inscription {}", inscription_id
|
||||
);
|
||||
insert_entry_in_locations(
|
||||
&inscription_id,
|
||||
block.block_identifier.index,
|
||||
&entry.transfer_data,
|
||||
&transaction,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
operations.insert(
|
||||
entry.transaction_identifier_inscription.clone(),
|
||||
OrdinalInscriptionTransferData {
|
||||
inscription_id: entry.get_inscription_id(),
|
||||
updated_address: None,
|
||||
satpoint_pre_transfer: format_satpoint_to_watch(
|
||||
&entry.transaction_identifier_inscription,
|
||||
entry.inscription_input_index,
|
||||
0,
|
||||
),
|
||||
satpoint_post_transfer: format_satpoint_to_watch(
|
||||
&entry.transfer_data.transaction_identifier_location,
|
||||
entry.transfer_data.output_index,
|
||||
entry.transfer_data.inscription_offset_intra_output,
|
||||
),
|
||||
post_transfer_output_value: None,
|
||||
tx_index: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Rewriting transfers for block {}", block.block_identifier.index
|
||||
);
|
||||
|
||||
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
|
||||
tx.metadata.ordinal_operations.clear();
|
||||
if let Some(mut entry) = operations.remove(&tx.transaction_identifier) {
|
||||
let (_, output_index, _) =
|
||||
parse_satpoint_to_watch(&entry.satpoint_post_transfer);
|
||||
|
||||
let script_pub_key_hex =
|
||||
tx.metadata.outputs[output_index].get_script_pubkey_hex();
|
||||
let updated_address = match Script::from_hex(&script_pub_key_hex) {
|
||||
Ok(script) => {
|
||||
match Address::from_script(&script, network.clone()) {
|
||||
Ok(address) => Some(address.to_string()),
|
||||
Err(_e) => None,
|
||||
}
|
||||
}
|
||||
Err(_e) => None,
|
||||
};
|
||||
|
||||
entry.updated_address = updated_address;
|
||||
entry.post_transfer_output_value =
|
||||
Some(tx.metadata.outputs[output_index].value);
|
||||
entry.tx_index = tx_index;
|
||||
tx.metadata
|
||||
.ordinal_operations
|
||||
.push(OrdinalOperation::InscriptionTransferred(entry));
|
||||
}
|
||||
}
|
||||
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
|
||||
insert_new_inscriptions_from_block_in_locations(
|
||||
block,
|
||||
&transaction,
|
||||
&inscriptions_db_tx,
|
||||
&ctx,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Saving supdates for block {}", block.block_identifier.index
|
||||
);
|
||||
transaction.commit().unwrap();
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Transfers in block {} repaired", block.block_identifier.index
|
||||
augment_block_with_ordinals_transfer_data(
|
||||
block,
|
||||
&inscriptions_db_tx,
|
||||
true,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
if let Some(ref post_processor) = post_processor {
|
||||
|
||||
@@ -3,9 +3,10 @@ use std::str::FromStr;
|
||||
|
||||
use chainhook_sdk::bitcoincore_rpc_json::bitcoin::hashes::hex::FromHex;
|
||||
use chainhook_sdk::bitcoincore_rpc_json::bitcoin::Txid;
|
||||
use chainhook_sdk::indexer::bitcoin::{standardize_bitcoin_block, BitcoinBlockFullBreakdown};
|
||||
use chainhook_sdk::types::{
|
||||
BitcoinTransactionData, OrdinalInscriptionCurseType, OrdinalInscriptionRevealData,
|
||||
OrdinalOperation,
|
||||
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, OrdinalInscriptionCurseType,
|
||||
OrdinalInscriptionRevealData, OrdinalOperation,
|
||||
};
|
||||
use chainhook_sdk::utils::Context;
|
||||
use chainhook_sdk::{
|
||||
@@ -310,10 +311,7 @@ pub fn get_inscriptions_from_witness(
|
||||
curse_type: inscription.curse.take(),
|
||||
};
|
||||
|
||||
match &payload.curse_type {
|
||||
Some(_) => Some(OrdinalOperation::CursedInscriptionRevealed(payload)),
|
||||
None => Some(OrdinalOperation::InscriptionRevealed(payload)),
|
||||
}
|
||||
Some(OrdinalOperation::InscriptionRevealed(payload))
|
||||
}
|
||||
|
||||
pub fn get_inscriptions_from_standardized_tx(
|
||||
@@ -377,3 +375,46 @@ fn test_ordinal_inscription_parsing() {
|
||||
|
||||
println!("{:?}", inscription);
|
||||
}
|
||||
|
||||
pub fn parse_ordinals_and_standardize_block(
|
||||
raw_block: BitcoinBlockFullBreakdown,
|
||||
network: &BitcoinNetwork,
|
||||
ctx: &Context,
|
||||
) -> Result<BitcoinBlockData, (String, bool)> {
|
||||
let mut ordinal_operations = BTreeMap::new();
|
||||
|
||||
for tx in raw_block.tx.iter() {
|
||||
ordinal_operations.insert(tx.txid.to_string(), get_inscriptions_from_full_tx(&tx, ctx));
|
||||
}
|
||||
|
||||
let mut block = standardize_bitcoin_block(raw_block, network, ctx)?;
|
||||
|
||||
for tx in block.transactions.iter_mut() {
|
||||
if let Some(ordinal_operations) =
|
||||
ordinal_operations.remove(tx.transaction_identifier.get_hash_bytes_str())
|
||||
{
|
||||
tx.metadata.ordinal_operations = ordinal_operations;
|
||||
}
|
||||
}
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
pub fn parse_inscriptions_in_standardized_block(block: &mut BitcoinBlockData, ctx: &Context) {
|
||||
for tx in block.transactions.iter_mut() {
|
||||
tx.metadata.ordinal_operations = get_inscriptions_from_standardized_tx(tx, ctx);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_inscriptions_revealed_in_block(
|
||||
block: &BitcoinBlockData,
|
||||
) -> Vec<&OrdinalInscriptionRevealData> {
|
||||
let mut ops = vec![];
|
||||
for tx in block.transactions.iter() {
|
||||
for op in tx.metadata.ordinal_operations.iter() {
|
||||
if let OrdinalOperation::InscriptionRevealed(op) = op {
|
||||
ops.push(op);
|
||||
}
|
||||
}
|
||||
}
|
||||
ops
|
||||
}
|
||||
689
components/hord-cli/src/core/protocol/inscription_sequencing.rs
Normal file
689
components/hord-cli/src/core/protocol/inscription_sequencing.rs
Normal file
@@ -0,0 +1,689 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, VecDeque},
|
||||
hash::BuildHasherDefault,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use chainhook_sdk::{
|
||||
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
|
||||
types::{
|
||||
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier,
|
||||
OrdinalInscriptionCurseType, OrdinalOperation, TransactionIdentifier,
|
||||
},
|
||||
utils::Context,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use fxhash::FxHasher;
|
||||
use rusqlite::{Connection, Transaction};
|
||||
|
||||
use crate::{
|
||||
core::HordConfig,
|
||||
db::{
|
||||
find_blessed_inscription_with_ordinal_number,
|
||||
find_latest_cursed_inscription_number_at_block_height,
|
||||
find_latest_inscription_number_at_block_height, format_satpoint_to_watch,
|
||||
insert_new_inscriptions_from_block_in_inscriptions_and_locations, LazyBlockTransaction,
|
||||
TraversalResult,
|
||||
},
|
||||
ord::height::Height,
|
||||
};
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
use crate::db::find_all_inscriptions_in_block;
|
||||
|
||||
use super::{
|
||||
inscription_tracking::augment_transaction_with_ordinals_transfers_data,
|
||||
satoshi_numbering::compute_satoshi_number,
|
||||
};
|
||||
|
||||
pub fn retrieve_inscribed_satoshi_points_from_block_v3(
|
||||
block: &BitcoinBlockData,
|
||||
next_blocks: &Vec<BitcoinBlockData>,
|
||||
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
|
||||
inscriptions_db_tx: &Transaction,
|
||||
hord_config: &HordConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<bool, String> {
|
||||
let (mut transactions_ids, l1_cache_hits) =
|
||||
get_transactions_to_process(block, cache_l1, inscriptions_db_tx, ctx);
|
||||
|
||||
let inner_ctx = if hord_config.logs.ordinals_internals {
|
||||
ctx.clone()
|
||||
} else {
|
||||
Context::empty()
|
||||
};
|
||||
|
||||
let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty();
|
||||
|
||||
let thread_max = hord_config.ingestion_thread_max * 2;
|
||||
|
||||
if has_transactions_to_process {
|
||||
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
|
||||
let (traversal_tx, traversal_rx) = channel();
|
||||
|
||||
let mut tx_thread_pool = vec![];
|
||||
let mut thread_pool_handles = vec![];
|
||||
|
||||
for thread_index in 0..thread_max {
|
||||
let (tx, rx) = channel();
|
||||
tx_thread_pool.push(tx);
|
||||
|
||||
let moved_traversal_tx = traversal_tx.clone();
|
||||
let moved_ctx = inner_ctx.clone();
|
||||
let moved_hord_db_path = hord_config.db_path.clone();
|
||||
let local_cache = cache_l2.clone();
|
||||
|
||||
let handle = hiro_system_kit::thread_named("Worker")
|
||||
.spawn(move || {
|
||||
while let Ok(Some((
|
||||
transaction_id,
|
||||
block_identifier,
|
||||
input_index,
|
||||
prioritary,
|
||||
))) = rx.recv()
|
||||
{
|
||||
let traversal: Result<TraversalResult, String> = compute_satoshi_number(
|
||||
&moved_hord_db_path,
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
input_index,
|
||||
0,
|
||||
&local_cache,
|
||||
&moved_ctx,
|
||||
);
|
||||
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
thread_pool_handles.push(handle);
|
||||
}
|
||||
|
||||
// Empty cache
|
||||
let mut thread_index = 0;
|
||||
for key in l1_cache_hits.iter() {
|
||||
if let Some(entry) = cache_l1.remove(key) {
|
||||
let _ = traversal_tx.send((Ok(entry), true, thread_index));
|
||||
thread_index = (thread_index + 1) % thread_max;
|
||||
}
|
||||
}
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})",
|
||||
block.block_identifier.index,
|
||||
transactions_ids.len(),
|
||||
l1_cache_hits.len(),
|
||||
next_blocks.len(),
|
||||
cache_l1.len(),
|
||||
cache_l2.len(),
|
||||
)
|
||||
});
|
||||
|
||||
let mut rng = thread_rng();
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
let mut priority_queue = VecDeque::new();
|
||||
let mut warmup_queue = VecDeque::new();
|
||||
|
||||
for (transaction_id, input_index) in transactions_ids.into_iter() {
|
||||
priority_queue.push_back((
|
||||
transaction_id,
|
||||
block.block_identifier.clone(),
|
||||
input_index,
|
||||
true,
|
||||
));
|
||||
}
|
||||
|
||||
// Feed each workers with 2 workitems each
|
||||
for thread_index in 0..thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
|
||||
}
|
||||
for thread_index in 0..thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
|
||||
}
|
||||
|
||||
let mut next_block_iter = next_blocks.iter();
|
||||
let mut traversals_received = 0;
|
||||
while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() {
|
||||
if prioritary {
|
||||
traversals_received += 1;
|
||||
}
|
||||
match traversal_result {
|
||||
Ok(traversal) => {
|
||||
inner_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
|
||||
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
|
||||
)
|
||||
});
|
||||
cache_l1.insert(
|
||||
(
|
||||
traversal.transaction_identifier_inscription.clone(),
|
||||
traversal.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "Unable to compute inscription's Satoshi: {e}",)
|
||||
});
|
||||
}
|
||||
}
|
||||
if traversals_received == expected_traversals {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(w) = priority_queue.pop_front() {
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(w) = warmup_queue.pop_front() {
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(next_block) = next_block_iter.next() {
|
||||
let (mut transactions_ids, _) = get_transactions_to_process(
|
||||
next_block,
|
||||
cache_l1,
|
||||
inscriptions_db_tx,
|
||||
ctx,
|
||||
);
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Number of inscriptions in block #{} to pre-process: {}",
|
||||
block.block_identifier.index,
|
||||
transactions_ids.len()
|
||||
)
|
||||
});
|
||||
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
for (transaction_id, input_index) in transactions_ids.into_iter() {
|
||||
warmup_queue.push_back((
|
||||
transaction_id,
|
||||
next_block.block_identifier.clone(),
|
||||
input_index,
|
||||
false,
|
||||
));
|
||||
}
|
||||
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for tx in tx_thread_pool.iter() {
|
||||
// Empty the queue
|
||||
if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() {
|
||||
if let Ok(traversal) = traversal_result {
|
||||
inner_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
|
||||
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
|
||||
)
|
||||
});
|
||||
cache_l1.insert(
|
||||
(
|
||||
traversal.transaction_identifier_inscription.clone(),
|
||||
traversal.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
}
|
||||
let _ = tx.send(None);
|
||||
}
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
|
||||
for handle in thread_pool_handles.into_iter() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"No inscriptions to index in block #{}", block.block_identifier.index
|
||||
)
|
||||
});
|
||||
}
|
||||
Ok(has_transactions_to_process)
|
||||
}
|
||||
|
||||
fn get_transactions_to_process(
|
||||
block: &BitcoinBlockData,
|
||||
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
inscriptions_db_tx: &Transaction,
|
||||
ctx: &Context,
|
||||
) -> (
|
||||
Vec<(TransactionIdentifier, usize)>,
|
||||
Vec<(TransactionIdentifier, usize)>,
|
||||
) {
|
||||
let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![];
|
||||
let mut l1_cache_hits = vec![];
|
||||
|
||||
let mut known_transactions =
|
||||
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_tx, ctx);
|
||||
|
||||
for tx in block.transactions.iter().skip(1) {
|
||||
// Have a new inscription been revealed, if so, are looking at a re-inscription
|
||||
for ordinal_event in tx.metadata.ordinal_operations.iter() {
|
||||
let inscription_data = match ordinal_event {
|
||||
OrdinalOperation::InscriptionRevealed(inscription_data) => inscription_data,
|
||||
OrdinalOperation::InscriptionTransferred(_) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let key = (
|
||||
tx.transaction_identifier.clone(),
|
||||
inscription_data.inscription_input_index,
|
||||
);
|
||||
if cache_l1.contains_key(&key) {
|
||||
l1_cache_hits.push(key);
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(entry) = known_transactions.remove(&key) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Enqueue for traversals
|
||||
transactions_ids.push((
|
||||
tx.transaction_identifier.clone(),
|
||||
inscription_data.inscription_input_index,
|
||||
));
|
||||
}
|
||||
}
|
||||
(transactions_ids, l1_cache_hits)
|
||||
}
|
||||
|
||||
/// For each input of each transaction in the block, we retrieve the UTXO spent (outpoint_pre_transfer)
|
||||
/// and we check using a `storage` (in-memory or sqlite) absctraction if we have some existing inscriptions
|
||||
/// for this entry.
|
||||
/// When this is the case, it means that an inscription_transfer event needs to be produced. We need to
|
||||
/// compute the output index (if any) `post_transfer_output` that will now include the inscription.
|
||||
/// When identifying the output index, we will also need to provide an updated offset for pin pointing
|
||||
/// the satoshi location.
|
||||
|
||||
pub struct SequenceCursor {
|
||||
blessed: Option<i64>,
|
||||
cursed: Option<i64>,
|
||||
inscriptions_db_conn: Connection,
|
||||
current_block_height: u64,
|
||||
}
|
||||
|
||||
impl SequenceCursor {
|
||||
pub fn new(inscriptions_db_conn: Connection) -> SequenceCursor {
|
||||
SequenceCursor {
|
||||
blessed: None,
|
||||
cursed: None,
|
||||
inscriptions_db_conn,
|
||||
current_block_height: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.blessed = None;
|
||||
self.cursed = None;
|
||||
self.current_block_height = 0;
|
||||
}
|
||||
|
||||
pub fn pick_next(&mut self, cursed: bool, block_height: u64) -> i64 {
|
||||
if block_height < self.current_block_height {
|
||||
self.reset();
|
||||
}
|
||||
self.current_block_height = block_height;
|
||||
|
||||
match cursed {
|
||||
true => self.pick_next_cursed(),
|
||||
false => self.pick_next_blessed(),
|
||||
}
|
||||
}
|
||||
|
||||
fn pick_next_blessed(&mut self) -> i64 {
|
||||
match self.blessed {
|
||||
None => {
|
||||
match find_latest_inscription_number_at_block_height(
|
||||
&self.current_block_height,
|
||||
&None,
|
||||
&self.inscriptions_db_conn,
|
||||
&Context::empty(),
|
||||
) {
|
||||
Ok(Some(inscription_number)) => {
|
||||
self.blessed = Some(inscription_number);
|
||||
inscription_number + 1
|
||||
}
|
||||
_ => {
|
||||
self.blessed = Some(0);
|
||||
0
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(value) => value + 1,
|
||||
}
|
||||
}
|
||||
|
||||
fn pick_next_cursed(&mut self) -> i64 {
|
||||
match self.cursed {
|
||||
None => {
|
||||
match find_latest_cursed_inscription_number_at_block_height(
|
||||
&self.current_block_height,
|
||||
&None,
|
||||
&self.inscriptions_db_conn,
|
||||
&Context::empty(),
|
||||
) {
|
||||
Ok(Some(inscription_number)) => {
|
||||
self.cursed = Some(inscription_number);
|
||||
inscription_number - 1
|
||||
}
|
||||
_ => {
|
||||
self.cursed = Some(-1);
|
||||
-1
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(value) => value - 1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn increment_cursed(&mut self) {
|
||||
self.cursed = Some(self.pick_next_cursed());
|
||||
}
|
||||
|
||||
pub fn increment_blessed(&mut self) {
|
||||
self.blessed = Some(self.pick_next_blessed())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
|
||||
block: &mut BitcoinBlockData,
|
||||
sequence_cursor: &mut SequenceCursor,
|
||||
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
inscriptions_db_tx: &Transaction,
|
||||
ctx: &Context,
|
||||
) -> bool {
|
||||
// Handle re-inscriptions
|
||||
let mut reinscriptions_data = HashMap::new();
|
||||
for (_, inscription_data) in inscriptions_data.iter() {
|
||||
if let Some(inscription_id) = find_blessed_inscription_with_ordinal_number(
|
||||
&inscription_data.ordinal_number,
|
||||
inscriptions_db_tx,
|
||||
ctx,
|
||||
) {
|
||||
reinscriptions_data.insert(inscription_data.ordinal_number, inscription_id);
|
||||
}
|
||||
}
|
||||
|
||||
let any_events = augment_block_with_ordinals_inscriptions_data(
|
||||
block,
|
||||
sequence_cursor,
|
||||
inscriptions_data,
|
||||
&reinscriptions_data,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
// Store inscriptions
|
||||
insert_new_inscriptions_from_block_in_inscriptions_and_locations(
|
||||
block,
|
||||
inscriptions_db_tx,
|
||||
ctx,
|
||||
);
|
||||
|
||||
any_events
|
||||
}
|
||||
|
||||
pub fn augment_block_with_ordinals_inscriptions_data(
|
||||
block: &mut BitcoinBlockData,
|
||||
sequence_cursor: &mut SequenceCursor,
|
||||
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
reinscriptions_data: &HashMap<u64, String>,
|
||||
ctx: &Context,
|
||||
) -> bool {
|
||||
// Handle sat oveflows
|
||||
let mut sats_overflows = VecDeque::new();
|
||||
let mut any_event = false;
|
||||
|
||||
let network = match block.metadata.network {
|
||||
BitcoinNetwork::Mainnet => Network::Bitcoin,
|
||||
BitcoinNetwork::Regtest => Network::Regtest,
|
||||
BitcoinNetwork::Testnet => Network::Testnet,
|
||||
};
|
||||
|
||||
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
|
||||
any_event |= augment_transaction_with_ordinals_inscriptions_data(
|
||||
tx,
|
||||
tx_index,
|
||||
&block.block_identifier,
|
||||
sequence_cursor,
|
||||
&network,
|
||||
inscriptions_data,
|
||||
&mut sats_overflows,
|
||||
&reinscriptions_data,
|
||||
ctx,
|
||||
);
|
||||
}
|
||||
|
||||
// Handle sats overflow
|
||||
while let Some(sats_overlow) = sats_overflows.pop_front() {
|
||||
// TODO
|
||||
}
|
||||
any_event
|
||||
}
|
||||
|
||||
pub fn augment_transaction_with_ordinals_inscriptions_data(
|
||||
tx: &mut BitcoinTransactionData,
|
||||
tx_index: usize,
|
||||
block_identifier: &BlockIdentifier,
|
||||
sequence_cursor: &mut SequenceCursor,
|
||||
network: &Network,
|
||||
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
sats_overflows: &mut VecDeque<(usize, usize)>,
|
||||
reinscriptions_data: &HashMap<u64, String>,
|
||||
ctx: &Context,
|
||||
) -> bool {
|
||||
let any_event = tx.metadata.ordinal_operations.is_empty() == false;
|
||||
let mut ordinals_ops_indexes_to_discard = VecDeque::new();
|
||||
|
||||
for (op_index, op) in tx.metadata.ordinal_operations.iter_mut().enumerate() {
|
||||
let (mut is_cursed, inscription) = match op {
|
||||
OrdinalOperation::InscriptionRevealed(inscription) => {
|
||||
(inscription.curse_type.as_ref().is_some(), inscription)
|
||||
}
|
||||
OrdinalOperation::InscriptionTransferred(_) => continue,
|
||||
};
|
||||
|
||||
let mut inscription_number = sequence_cursor.pick_next(is_cursed, block_identifier.index);
|
||||
|
||||
let transaction_identifier = tx.transaction_identifier.clone();
|
||||
let traversal = match inscriptions_data
|
||||
.remove(&(transaction_identifier, inscription.inscription_input_index))
|
||||
{
|
||||
Some(traversal) => traversal,
|
||||
None => {
|
||||
ctx.try_log(|logger| {
|
||||
error!(
|
||||
logger,
|
||||
"Unable to retrieve cached inscription data for inscription {}",
|
||||
tx.transaction_identifier.hash
|
||||
);
|
||||
});
|
||||
ordinals_ops_indexes_to_discard.push_front(op_index);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut curse_type_override = None;
|
||||
|
||||
// Do we need to curse the inscription?
|
||||
if !is_cursed {
|
||||
// Is this inscription re-inscribing an existing blessed inscription?
|
||||
if let Some(exisiting_inscription_id) =
|
||||
reinscriptions_data.get(&traversal.ordinal_number)
|
||||
{
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Satoshi #{} was inscribed with blessed inscription {}, cursing inscription {}",
|
||||
traversal.ordinal_number,
|
||||
exisiting_inscription_id,
|
||||
traversal.get_inscription_id(),
|
||||
);
|
||||
});
|
||||
|
||||
is_cursed = true;
|
||||
inscription_number = sequence_cursor.pick_next(is_cursed, block_identifier.index);
|
||||
curse_type_override = Some(OrdinalInscriptionCurseType::Reinscription)
|
||||
}
|
||||
};
|
||||
|
||||
let outputs = &tx.metadata.outputs;
|
||||
inscription.inscription_number = inscription_number;
|
||||
inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset();
|
||||
inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height();
|
||||
inscription.ordinal_number = traversal.ordinal_number;
|
||||
inscription.transfers_pre_inscription = traversal.transfers;
|
||||
inscription.inscription_fee = tx.metadata.fee;
|
||||
inscription.tx_index = tx_index;
|
||||
inscription.curse_type = match curse_type_override {
|
||||
Some(curse_type) => Some(curse_type),
|
||||
None => inscription.curse_type.take(),
|
||||
};
|
||||
inscription.satpoint_post_inscription = format_satpoint_to_watch(
|
||||
&traversal.transfer_data.transaction_identifier_location,
|
||||
traversal.transfer_data.output_index,
|
||||
traversal.transfer_data.inscription_offset_intra_output,
|
||||
);
|
||||
if let Some(output) = outputs.get(traversal.transfer_data.output_index) {
|
||||
inscription.inscription_output_value = output.value;
|
||||
inscription.inscriber_address = {
|
||||
let script_pub_key = output.get_script_pubkey_hex();
|
||||
match Script::from_hex(&script_pub_key) {
|
||||
Ok(script) => match Address::from_script(&script, network.clone()) {
|
||||
Ok(a) => Some(a.to_string()),
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
}
|
||||
};
|
||||
} else {
|
||||
ctx.try_log(|logger| {
|
||||
warn!(
|
||||
logger,
|
||||
"Database corrupted, skipping cursed inscription => {:?} / {:?}",
|
||||
traversal,
|
||||
outputs
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
if traversal.ordinal_number == 0 {
|
||||
// If the satoshi inscribed correspond to a sat overflow, we will store the inscription
|
||||
// and assign an inscription number after the other inscriptions, to mimick the
|
||||
// bug in ord.
|
||||
sats_overflows.push_back((tx_index, op_index));
|
||||
continue;
|
||||
}
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Inscription {} (#{}) detected on Satoshi {} (block {}, {} transfers)",
|
||||
inscription.inscription_id,
|
||||
inscription.inscription_number,
|
||||
inscription.ordinal_number,
|
||||
block_identifier.index,
|
||||
inscription.transfers_pre_inscription,
|
||||
);
|
||||
});
|
||||
|
||||
if is_cursed {
|
||||
sequence_cursor.increment_cursed();
|
||||
} else {
|
||||
sequence_cursor.increment_blessed();
|
||||
}
|
||||
}
|
||||
any_event
|
||||
}
|
||||
|
||||
pub fn consolidate_transaction_with_pre_computed_inscription_data(
|
||||
tx: &mut BitcoinTransactionData,
|
||||
tx_index: usize,
|
||||
inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
_ctx: &Context,
|
||||
) {
|
||||
for operation in tx.metadata.ordinal_operations.iter_mut() {
|
||||
let inscription = match operation {
|
||||
OrdinalOperation::InscriptionRevealed(ref mut inscription) => inscription,
|
||||
OrdinalOperation::InscriptionTransferred(_) => continue,
|
||||
};
|
||||
|
||||
let Some(traversal) = inscriptions_data.remove(&(tx.transaction_identifier.clone(), inscription.inscription_input_index)) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset();
|
||||
inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height();
|
||||
inscription.ordinal_number = traversal.ordinal_number;
|
||||
inscription.inscription_number = traversal.inscription_number;
|
||||
inscription.transfers_pre_inscription = traversal.transfers;
|
||||
inscription.inscription_fee = tx.metadata.fee;
|
||||
inscription.tx_index = tx_index;
|
||||
inscription.satpoint_post_inscription = format_satpoint_to_watch(
|
||||
&traversal.transfer_data.transaction_identifier_location,
|
||||
traversal.transfer_data.output_index,
|
||||
traversal.transfer_data.inscription_offset_intra_output,
|
||||
);
|
||||
if inscription.inscription_number < 0 {
|
||||
inscription.curse_type = Some(OrdinalInscriptionCurseType::Unknown);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn consolidate_block_with_pre_computed_ordinals_data(
|
||||
block: &mut BitcoinBlockData,
|
||||
inscriptions_db_tx: &Transaction,
|
||||
include_transfers: bool,
|
||||
ctx: &Context,
|
||||
) {
|
||||
let network = match block.metadata.network {
|
||||
BitcoinNetwork::Mainnet => Network::Bitcoin,
|
||||
BitcoinNetwork::Regtest => Network::Regtest,
|
||||
BitcoinNetwork::Testnet => Network::Testnet,
|
||||
};
|
||||
|
||||
let coinbase_subsidy = Height(block.block_identifier.index).subsidy();
|
||||
let coinbase_txid = &block.transactions[0].transaction_identifier.clone();
|
||||
let mut cumulated_fees = 0;
|
||||
let mut inscriptions_data =
|
||||
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_tx, ctx);
|
||||
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
|
||||
// Add inscriptions data
|
||||
consolidate_transaction_with_pre_computed_inscription_data(
|
||||
tx,
|
||||
tx_index,
|
||||
&mut inscriptions_data,
|
||||
ctx,
|
||||
);
|
||||
|
||||
// Add transfers data
|
||||
if include_transfers {
|
||||
let _ = augment_transaction_with_ordinals_transfers_data(
|
||||
tx,
|
||||
tx_index,
|
||||
&block.block_identifier,
|
||||
&network,
|
||||
&coinbase_txid,
|
||||
coinbase_subsidy,
|
||||
&mut cumulated_fees,
|
||||
inscriptions_db_tx,
|
||||
ctx,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
212
components/hord-cli/src/core/protocol/inscription_tracking.rs
Normal file
212
components/hord-cli/src/core/protocol/inscription_tracking.rs
Normal file
@@ -0,0 +1,212 @@
|
||||
use chainhook_sdk::{
|
||||
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
|
||||
types::{
|
||||
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier,
|
||||
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
|
||||
},
|
||||
utils::Context,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
core::{compute_next_satpoint_data, SatPosition},
|
||||
db::{
|
||||
find_inscriptions_at_wached_outpoint, format_outpoint_to_watch,
|
||||
insert_transfer_in_locations_tx,
|
||||
},
|
||||
ord::height::Height,
|
||||
};
|
||||
use rusqlite::Transaction;
|
||||
|
||||
pub fn augment_block_with_ordinals_transfer_data(
|
||||
block: &mut BitcoinBlockData,
|
||||
inscriptions_db_tx: &Transaction,
|
||||
update_db_tx: bool,
|
||||
ctx: &Context,
|
||||
) -> bool {
|
||||
let mut any_event = false;
|
||||
|
||||
let network = match block.metadata.network {
|
||||
BitcoinNetwork::Mainnet => Network::Bitcoin,
|
||||
BitcoinNetwork::Regtest => Network::Regtest,
|
||||
BitcoinNetwork::Testnet => Network::Testnet,
|
||||
};
|
||||
|
||||
let coinbase_subsidy = Height(block.block_identifier.index).subsidy();
|
||||
let coinbase_txid = &block.transactions[0].transaction_identifier.clone();
|
||||
let mut cumulated_fees = 0;
|
||||
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
|
||||
let transfers = augment_transaction_with_ordinals_transfers_data(
|
||||
tx,
|
||||
tx_index,
|
||||
&block.block_identifier,
|
||||
&network,
|
||||
&coinbase_txid,
|
||||
coinbase_subsidy,
|
||||
&mut cumulated_fees,
|
||||
inscriptions_db_tx,
|
||||
ctx,
|
||||
);
|
||||
|
||||
any_event |= !transfers.is_empty();
|
||||
|
||||
if update_db_tx {
|
||||
// Store transfers between each iteration
|
||||
for transfer_data in transfers.into_iter() {
|
||||
insert_transfer_in_locations_tx(
|
||||
&transfer_data,
|
||||
&block.block_identifier,
|
||||
&inscriptions_db_tx,
|
||||
&ctx,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
any_event
|
||||
}
|
||||
|
||||
pub fn augment_transaction_with_ordinals_transfers_data(
|
||||
tx: &mut BitcoinTransactionData,
|
||||
tx_index: usize,
|
||||
block_identifier: &BlockIdentifier,
|
||||
network: &Network,
|
||||
coinbase_txid: &TransactionIdentifier,
|
||||
coinbase_subsidy: u64,
|
||||
cumulated_fees: &mut u64,
|
||||
inscriptions_db_tx: &Transaction,
|
||||
ctx: &Context,
|
||||
) -> Vec<OrdinalInscriptionTransferData> {
|
||||
let mut transfers = vec![];
|
||||
|
||||
for (input_index, input) in tx.metadata.inputs.iter().enumerate() {
|
||||
let outpoint_pre_transfer = format_outpoint_to_watch(
|
||||
&input.previous_output.txid,
|
||||
input.previous_output.vout as usize,
|
||||
);
|
||||
|
||||
let entries =
|
||||
match find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &inscriptions_db_tx)
|
||||
{
|
||||
Ok(entries) => entries,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| warn!(logger, "unable query inscriptions: {e}"));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// For each satpoint inscribed retrieved, we need to compute the next
|
||||
// outpoint to watch
|
||||
for watched_satpoint in entries.into_iter() {
|
||||
let satpoint_pre_transfer =
|
||||
format!("{}:{}", outpoint_pre_transfer, watched_satpoint.offset);
|
||||
|
||||
// Question is: are inscriptions moving to a new output,
|
||||
// burnt or lost in fees and transfered to the miner?
|
||||
|
||||
let inputs = tx
|
||||
.metadata
|
||||
.inputs
|
||||
.iter()
|
||||
.map(|o| o.previous_output.value)
|
||||
.collect::<_>();
|
||||
let outputs = tx.metadata.outputs.iter().map(|o| o.value).collect::<_>();
|
||||
let post_transfer_data =
|
||||
compute_next_satpoint_data(input_index, watched_satpoint.offset, &inputs, &outputs);
|
||||
|
||||
let (
|
||||
outpoint_post_transfer,
|
||||
offset_post_transfer,
|
||||
updated_address,
|
||||
post_transfer_output_value,
|
||||
) = match post_transfer_data {
|
||||
SatPosition::Output((output_index, offset)) => {
|
||||
let outpoint =
|
||||
format_outpoint_to_watch(&tx.transaction_identifier, output_index);
|
||||
let script_pub_key_hex =
|
||||
tx.metadata.outputs[output_index].get_script_pubkey_hex();
|
||||
let updated_address = match Script::from_hex(&script_pub_key_hex) {
|
||||
Ok(script) => match Address::from_script(&script, network.clone()) {
|
||||
Ok(address) => Some(address.to_string()),
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
warn!(
|
||||
logger,
|
||||
"unable to retrieve address from {script_pub_key_hex}: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
warn!(
|
||||
logger,
|
||||
"unable to retrieve address from {script_pub_key_hex}: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// At this point we know that inscriptions are being moved.
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Inscription {} moved from {} to {} (block: {})",
|
||||
watched_satpoint.inscription_id,
|
||||
satpoint_pre_transfer,
|
||||
outpoint,
|
||||
block_identifier.index,
|
||||
)
|
||||
});
|
||||
|
||||
(
|
||||
outpoint,
|
||||
offset,
|
||||
updated_address,
|
||||
Some(tx.metadata.outputs[output_index].value),
|
||||
)
|
||||
}
|
||||
SatPosition::Fee(offset) => {
|
||||
// Get Coinbase TX
|
||||
let total_offset = coinbase_subsidy + *cumulated_fees + offset;
|
||||
let outpoint = format_outpoint_to_watch(&coinbase_txid, 0);
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Inscription {} spent in fees ({}+{}+{})",
|
||||
watched_satpoint.inscription_id,
|
||||
coinbase_subsidy,
|
||||
cumulated_fees,
|
||||
offset
|
||||
)
|
||||
});
|
||||
(outpoint, total_offset, None, None)
|
||||
}
|
||||
};
|
||||
|
||||
let satpoint_post_transfer =
|
||||
format!("{}:{}", outpoint_post_transfer, offset_post_transfer);
|
||||
|
||||
let transfer_data = OrdinalInscriptionTransferData {
|
||||
inscription_id: watched_satpoint.inscription_id.clone(),
|
||||
updated_address,
|
||||
tx_index,
|
||||
satpoint_pre_transfer,
|
||||
satpoint_post_transfer,
|
||||
post_transfer_output_value,
|
||||
};
|
||||
|
||||
transfers.push(transfer_data.clone());
|
||||
|
||||
// Attach transfer event
|
||||
tx.metadata
|
||||
.ordinal_operations
|
||||
.push(OrdinalOperation::InscriptionTransferred(transfer_data));
|
||||
}
|
||||
}
|
||||
*cumulated_fees += tx.metadata.fee;
|
||||
|
||||
transfers
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod inscribing;
|
||||
pub mod numbering;
|
||||
pub mod sequencing;
|
||||
pub mod inscription_parsing;
|
||||
pub mod inscription_sequencing;
|
||||
pub mod inscription_tracking;
|
||||
pub mod satoshi_numbering;
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::db::{
|
||||
use crate::db::{LazyBlockTransaction, TraversalResult};
|
||||
use crate::ord::height::Height;
|
||||
|
||||
pub fn retrieve_satoshi_point_using_lazy_storage_v3(
|
||||
pub fn compute_satoshi_number(
|
||||
blocks_db_dir: &PathBuf,
|
||||
block_identifier: &BlockIdentifier,
|
||||
transaction_identifier: &TransactionIdentifier,
|
||||
@@ -1,836 +0,0 @@
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
hash::BuildHasherDefault,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use chainhook_sdk::{
|
||||
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
|
||||
types::{
|
||||
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType,
|
||||
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
|
||||
},
|
||||
utils::Context,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use fxhash::FxHasher;
|
||||
use rusqlite::{Connection, Transaction};
|
||||
|
||||
use crate::{
|
||||
core::{
|
||||
compute_next_satpoint_data, get_inscriptions_revealed_in_block, HordConfig, SatPosition,
|
||||
},
|
||||
db::{
|
||||
find_inscription_with_ordinal_number, find_inscriptions_at_wached_outpoint,
|
||||
find_latest_cursed_inscription_number_at_block_height,
|
||||
find_latest_inscription_number_at_block_height, format_outpoint_to_watch,
|
||||
format_satpoint_to_watch, get_any_entry_in_ordinal_activities,
|
||||
insert_entry_in_inscriptions, insert_transfer_in_locations_tx, InscriptionHeigthHint,
|
||||
LazyBlockTransaction, TraversalResult,
|
||||
},
|
||||
ord::height::Height,
|
||||
};
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
use crate::db::find_all_inscriptions_in_block;
|
||||
|
||||
use super::numbering::retrieve_satoshi_point_using_lazy_storage_v3;
|
||||
|
||||
pub fn retrieve_inscribed_satoshi_points_from_block_v3(
|
||||
block: &BitcoinBlockData,
|
||||
next_blocks: &Vec<BitcoinBlockData>,
|
||||
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
|
||||
existing_inscriptions: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
inscriptions_db_conn: &mut Connection,
|
||||
hord_config: &HordConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<bool, String> {
|
||||
let (mut transactions_ids, l1_cache_hits) = get_transactions_to_process(
|
||||
block,
|
||||
cache_l1,
|
||||
existing_inscriptions,
|
||||
inscriptions_db_conn,
|
||||
ctx,
|
||||
);
|
||||
|
||||
let inner_ctx = if hord_config.logs.ordinals_internals {
|
||||
ctx.clone()
|
||||
} else {
|
||||
Context::empty()
|
||||
};
|
||||
|
||||
let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty();
|
||||
|
||||
let thread_max = hord_config.ingestion_thread_max * 2;
|
||||
|
||||
if has_transactions_to_process {
|
||||
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
|
||||
let (traversal_tx, traversal_rx) = channel();
|
||||
|
||||
let mut tx_thread_pool = vec![];
|
||||
let mut thread_pool_handles = vec![];
|
||||
|
||||
for thread_index in 0..thread_max {
|
||||
let (tx, rx) = channel();
|
||||
tx_thread_pool.push(tx);
|
||||
|
||||
let moved_traversal_tx = traversal_tx.clone();
|
||||
let moved_ctx = inner_ctx.clone();
|
||||
let moved_hord_db_path = hord_config.db_path.clone();
|
||||
let local_cache = cache_l2.clone();
|
||||
|
||||
let handle = hiro_system_kit::thread_named("Worker")
|
||||
.spawn(move || {
|
||||
while let Ok(Some((
|
||||
transaction_id,
|
||||
block_identifier,
|
||||
input_index,
|
||||
prioritary,
|
||||
))) = rx.recv()
|
||||
{
|
||||
let traversal: Result<TraversalResult, String> =
|
||||
retrieve_satoshi_point_using_lazy_storage_v3(
|
||||
&moved_hord_db_path,
|
||||
&block_identifier,
|
||||
&transaction_id,
|
||||
input_index,
|
||||
0,
|
||||
&local_cache,
|
||||
&moved_ctx,
|
||||
);
|
||||
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
thread_pool_handles.push(handle);
|
||||
}
|
||||
|
||||
// Empty cache
|
||||
let mut thread_index = 0;
|
||||
for key in l1_cache_hits.iter() {
|
||||
if let Some(entry) = cache_l1.remove(key) {
|
||||
let _ = traversal_tx.send((Ok(entry), true, thread_index));
|
||||
thread_index = (thread_index + 1) % thread_max;
|
||||
}
|
||||
}
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})",
|
||||
block.block_identifier.index,
|
||||
transactions_ids.len(),
|
||||
l1_cache_hits.len(),
|
||||
next_blocks.len(),
|
||||
cache_l1.len(),
|
||||
cache_l2.len(),
|
||||
)
|
||||
});
|
||||
|
||||
let mut rng = thread_rng();
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
let mut priority_queue = VecDeque::new();
|
||||
let mut warmup_queue = VecDeque::new();
|
||||
|
||||
for (transaction_id, input_index) in transactions_ids.into_iter() {
|
||||
priority_queue.push_back((
|
||||
transaction_id,
|
||||
block.block_identifier.clone(),
|
||||
input_index,
|
||||
true,
|
||||
));
|
||||
}
|
||||
|
||||
// Feed each workers with 2 workitems each
|
||||
for thread_index in 0..thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
|
||||
}
|
||||
for thread_index in 0..thread_max {
|
||||
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
|
||||
}
|
||||
|
||||
let mut next_block_iter = next_blocks.iter();
|
||||
let mut traversals_received = 0;
|
||||
while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() {
|
||||
if prioritary {
|
||||
traversals_received += 1;
|
||||
}
|
||||
match traversal_result {
|
||||
Ok(traversal) => {
|
||||
inner_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
|
||||
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
|
||||
)
|
||||
});
|
||||
cache_l1.insert(
|
||||
(
|
||||
traversal.transaction_identifier_inscription.clone(),
|
||||
traversal.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
error!(logger, "Unable to compute inscription's Satoshi: {e}",)
|
||||
});
|
||||
}
|
||||
}
|
||||
if traversals_received == expected_traversals {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(w) = priority_queue.pop_front() {
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(w) = warmup_queue.pop_front() {
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(next_block) = next_block_iter.next() {
|
||||
let (mut transactions_ids, _) = get_transactions_to_process(
|
||||
next_block,
|
||||
cache_l1,
|
||||
existing_inscriptions,
|
||||
inscriptions_db_conn,
|
||||
ctx,
|
||||
);
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Number of inscriptions in block #{} to pre-process: {}",
|
||||
block.block_identifier.index,
|
||||
transactions_ids.len()
|
||||
)
|
||||
});
|
||||
|
||||
transactions_ids.shuffle(&mut rng);
|
||||
for (transaction_id, input_index) in transactions_ids.into_iter() {
|
||||
warmup_queue.push_back((
|
||||
transaction_id,
|
||||
next_block.block_identifier.clone(),
|
||||
input_index,
|
||||
false,
|
||||
));
|
||||
}
|
||||
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for tx in tx_thread_pool.iter() {
|
||||
// Empty the queue
|
||||
if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() {
|
||||
if let Ok(traversal) = traversal_result {
|
||||
inner_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).",
|
||||
traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers
|
||||
)
|
||||
});
|
||||
cache_l1.insert(
|
||||
(
|
||||
traversal.transaction_identifier_inscription.clone(),
|
||||
traversal.inscription_input_index,
|
||||
),
|
||||
traversal,
|
||||
);
|
||||
}
|
||||
}
|
||||
let _ = tx.send(None);
|
||||
}
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
|
||||
for handle in thread_pool_handles.into_iter() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"No inscriptions to index in block #{}", block.block_identifier.index
|
||||
)
|
||||
});
|
||||
}
|
||||
Ok(has_transactions_to_process)
|
||||
}
|
||||
|
||||
fn get_transactions_to_process(
|
||||
block: &BitcoinBlockData,
|
||||
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
existing_inscriptions: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
inscriptions_db_conn: &mut Connection,
|
||||
ctx: &Context,
|
||||
) -> (
|
||||
Vec<(TransactionIdentifier, usize)>,
|
||||
Vec<(TransactionIdentifier, usize)>,
|
||||
) {
|
||||
let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![];
|
||||
let mut l1_cache_hits = vec![];
|
||||
|
||||
let mut known_transactions =
|
||||
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx);
|
||||
|
||||
for tx in block.transactions.iter().skip(1) {
|
||||
// Have a new inscription been revealed, if so, are looking at a re-inscription
|
||||
for ordinal_event in tx.metadata.ordinal_operations.iter() {
|
||||
let inscription_data = match ordinal_event {
|
||||
OrdinalOperation::InscriptionRevealed(inscription_data) => inscription_data,
|
||||
OrdinalOperation::CursedInscriptionRevealed(inscription_data) => inscription_data,
|
||||
OrdinalOperation::InscriptionTransferred(_) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let key = (
|
||||
tx.transaction_identifier.clone(),
|
||||
inscription_data.inscription_input_index,
|
||||
);
|
||||
if cache_l1.contains_key(&key) {
|
||||
l1_cache_hits.push(key);
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(entry) = known_transactions.remove(&key) {
|
||||
existing_inscriptions.insert(key, entry);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Enqueue for traversals
|
||||
transactions_ids.push((
|
||||
tx.transaction_identifier.clone(),
|
||||
inscription_data.inscription_input_index,
|
||||
));
|
||||
}
|
||||
}
|
||||
(transactions_ids, l1_cache_hits)
|
||||
}
|
||||
|
||||
pub fn update_hord_db_and_augment_bitcoin_block_v3(
|
||||
new_block: &mut BitcoinBlockData,
|
||||
next_blocks: &Vec<BitcoinBlockData>,
|
||||
cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
|
||||
inscription_height_hint: &mut InscriptionHeigthHint,
|
||||
inscriptions_db_conn_rw: &mut Connection,
|
||||
hord_config: &HordConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
let mut existing_inscriptions = HashMap::new();
|
||||
|
||||
let transactions_processed = retrieve_inscribed_satoshi_points_from_block_v3(
|
||||
&new_block,
|
||||
&next_blocks,
|
||||
cache_l1,
|
||||
cache_l2,
|
||||
&mut existing_inscriptions,
|
||||
inscriptions_db_conn_rw,
|
||||
&hord_config,
|
||||
ctx,
|
||||
)?;
|
||||
|
||||
if !transactions_processed {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let discard_changes: bool = get_any_entry_in_ordinal_activities(
|
||||
&new_block.block_identifier.index,
|
||||
inscriptions_db_conn_rw,
|
||||
ctx,
|
||||
);
|
||||
|
||||
let inner_ctx = if discard_changes {
|
||||
Context::empty()
|
||||
} else {
|
||||
if hord_config.logs.ordinals_internals {
|
||||
ctx.clone()
|
||||
} else {
|
||||
Context::empty()
|
||||
}
|
||||
};
|
||||
|
||||
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
|
||||
let any_inscription_revealed =
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
|
||||
new_block,
|
||||
&transaction,
|
||||
&cache_l1,
|
||||
inscription_height_hint,
|
||||
&inner_ctx,
|
||||
)?;
|
||||
|
||||
// Have inscriptions been transfered?
|
||||
let any_inscription_transferred =
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
|
||||
new_block,
|
||||
&transaction,
|
||||
&inner_ctx,
|
||||
)?;
|
||||
|
||||
if !any_inscription_revealed && !any_inscription_transferred {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if discard_changes {
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Ignoring updates for block #{}, activities present in database",
|
||||
new_block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
} else {
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Saving updates for block {}", new_block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
transaction.commit().unwrap();
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Updates saved for block {}", new_block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
let inscriptions_revealed = get_inscriptions_revealed_in_block(&new_block)
|
||||
.iter()
|
||||
.map(|d| d.inscription_number.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Block #{} revealed {} inscriptions [{}]",
|
||||
new_block.block_identifier.index,
|
||||
inscriptions_revealed.len(),
|
||||
inscriptions_revealed.join(", ")
|
||||
)
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// For each input of each transaction in the block, we retrieve the UTXO spent (outpoint_pre_transfer)
|
||||
/// and we check using a `storage` (in-memory or sqlite) absctraction if we have some existing inscriptions
|
||||
/// for this entry.
|
||||
/// When this is the case, it means that an inscription_transfer event needs to be produced. We need to
|
||||
/// compute the output index (if any) `post_transfer_output` that will now include the inscription.
|
||||
/// When identifying the output index, we will also need to provide an updated offset for pin pointing
|
||||
/// the satoshi location.
|
||||
pub fn update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
|
||||
block: &mut BitcoinBlockData,
|
||||
hord_db_tx: &Transaction,
|
||||
ctx: &Context,
|
||||
) -> Result<bool, String> {
|
||||
let mut storage_updated = false;
|
||||
let mut cumulated_fees = 0;
|
||||
let subsidy = Height(block.block_identifier.index).subsidy();
|
||||
let coinbase_txid = &block.transactions[0].transaction_identifier.clone();
|
||||
let network = match block.metadata.network {
|
||||
BitcoinNetwork::Mainnet => Network::Bitcoin,
|
||||
BitcoinNetwork::Regtest => Network::Regtest,
|
||||
BitcoinNetwork::Testnet => Network::Testnet,
|
||||
};
|
||||
for (tx_index, new_tx) in block.transactions.iter_mut().skip(1).enumerate() {
|
||||
for (input_index, input) in new_tx.metadata.inputs.iter().enumerate() {
|
||||
let outpoint_pre_transfer = format_outpoint_to_watch(
|
||||
&input.previous_output.txid,
|
||||
input.previous_output.vout as usize,
|
||||
);
|
||||
|
||||
let entries =
|
||||
find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &hord_db_tx)?;
|
||||
|
||||
// For each satpoint inscribed retrieved, we need to compute the next
|
||||
// outpoint to watch
|
||||
for watched_satpoint in entries.into_iter() {
|
||||
let satpoint_pre_transfer =
|
||||
format!("{}:{}", outpoint_pre_transfer, watched_satpoint.offset);
|
||||
|
||||
// Question is: are inscriptions moving to a new output,
|
||||
// burnt or lost in fees and transfered to the miner?
|
||||
|
||||
let inputs = new_tx
|
||||
.metadata
|
||||
.inputs
|
||||
.iter()
|
||||
.map(|o| o.previous_output.value)
|
||||
.collect::<_>();
|
||||
let outputs = new_tx
|
||||
.metadata
|
||||
.outputs
|
||||
.iter()
|
||||
.map(|o| o.value)
|
||||
.collect::<_>();
|
||||
let post_transfer_data = compute_next_satpoint_data(
|
||||
input_index,
|
||||
watched_satpoint.offset,
|
||||
&inputs,
|
||||
&outputs,
|
||||
);
|
||||
|
||||
let (
|
||||
outpoint_post_transfer,
|
||||
offset_post_transfer,
|
||||
updated_address,
|
||||
post_transfer_output_value,
|
||||
) = match post_transfer_data {
|
||||
SatPosition::Output((output_index, offset)) => {
|
||||
let outpoint =
|
||||
format_outpoint_to_watch(&new_tx.transaction_identifier, output_index);
|
||||
let script_pub_key_hex =
|
||||
new_tx.metadata.outputs[output_index].get_script_pubkey_hex();
|
||||
let updated_address = match Script::from_hex(&script_pub_key_hex) {
|
||||
Ok(script) => match Address::from_script(&script, network.clone()) {
|
||||
Ok(address) => Some(address.to_string()),
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
warn!(
|
||||
logger,
|
||||
"unable to retrieve address from {script_pub_key_hex}: {}", e.to_string()
|
||||
)
|
||||
});
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
warn!(
|
||||
logger,
|
||||
"unable to retrieve address from {script_pub_key_hex}: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// At this point we know that inscriptions are being moved.
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Inscription {} moved from {} to {} (block: {})",
|
||||
watched_satpoint.inscription_id,
|
||||
satpoint_pre_transfer,
|
||||
outpoint,
|
||||
block.block_identifier.index,
|
||||
)
|
||||
});
|
||||
|
||||
(
|
||||
outpoint,
|
||||
offset,
|
||||
updated_address,
|
||||
Some(new_tx.metadata.outputs[output_index].value),
|
||||
)
|
||||
}
|
||||
SatPosition::Fee(offset) => {
|
||||
// Get Coinbase TX
|
||||
let total_offset = subsidy + cumulated_fees + offset;
|
||||
let outpoint = format_outpoint_to_watch(&coinbase_txid, 0);
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Inscription {} spent in fees ({}+{}+{})",
|
||||
watched_satpoint.inscription_id,
|
||||
subsidy,
|
||||
cumulated_fees,
|
||||
offset
|
||||
)
|
||||
});
|
||||
(outpoint, total_offset, None, None)
|
||||
}
|
||||
};
|
||||
|
||||
let satpoint_post_transfer =
|
||||
format!("{}:{}", outpoint_post_transfer, offset_post_transfer);
|
||||
|
||||
let transfer_data = OrdinalInscriptionTransferData {
|
||||
inscription_id: watched_satpoint.inscription_id.clone(),
|
||||
updated_address,
|
||||
tx_index,
|
||||
satpoint_pre_transfer,
|
||||
satpoint_post_transfer,
|
||||
post_transfer_output_value,
|
||||
};
|
||||
|
||||
// Update watched outpoint
|
||||
|
||||
insert_transfer_in_locations_tx(
|
||||
&transfer_data,
|
||||
&block.block_identifier,
|
||||
&hord_db_tx,
|
||||
&ctx,
|
||||
);
|
||||
storage_updated = true;
|
||||
|
||||
// Attach transfer event
|
||||
new_tx
|
||||
.metadata
|
||||
.ordinal_operations
|
||||
.push(OrdinalOperation::InscriptionTransferred(transfer_data));
|
||||
}
|
||||
}
|
||||
cumulated_fees += new_tx.metadata.fee;
|
||||
}
|
||||
Ok(storage_updated)
|
||||
}
|
||||
|
||||
pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
|
||||
block: &mut BitcoinBlockData,
|
||||
transaction: &Transaction,
|
||||
cache_l1: &HashMap<(TransactionIdentifier, usize), TraversalResult>,
|
||||
inscription_height_hint: &mut InscriptionHeigthHint,
|
||||
ctx: &Context,
|
||||
) -> Result<bool, String> {
|
||||
let mut storage_updated = false;
|
||||
let network = match block.metadata.network {
|
||||
BitcoinNetwork::Mainnet => Network::Bitcoin,
|
||||
BitcoinNetwork::Regtest => Network::Regtest,
|
||||
BitcoinNetwork::Testnet => Network::Testnet,
|
||||
};
|
||||
|
||||
let mut latest_cursed_inscription_loaded = false;
|
||||
let mut latest_cursed_inscription_number = 0;
|
||||
let mut cursed_inscription_sequence_updated = false;
|
||||
|
||||
let mut latest_blessed_inscription_loaded = false;
|
||||
let mut latest_blessed_inscription_number = 0;
|
||||
let mut blessed_inscription_sequence_updated = false;
|
||||
|
||||
let mut sats_overflow = vec![];
|
||||
|
||||
for (tx_index, new_tx) in block.transactions.iter_mut().skip(1).enumerate() {
|
||||
let mut ordinals_events_indexes_to_discard = VecDeque::new();
|
||||
let mut ordinals_events_indexes_to_curse = VecDeque::new();
|
||||
|
||||
// Have a new inscription been revealed, if so, are looking at a re-inscription
|
||||
for (ordinal_event_index, ordinal_event) in
|
||||
new_tx.metadata.ordinal_operations.iter_mut().enumerate()
|
||||
{
|
||||
let (inscription, is_cursed) = match ordinal_event {
|
||||
OrdinalOperation::InscriptionRevealed(inscription) => (inscription, false),
|
||||
OrdinalOperation::CursedInscriptionRevealed(inscription) => (inscription, true),
|
||||
OrdinalOperation::InscriptionTransferred(_) => continue,
|
||||
};
|
||||
|
||||
let mut inscription_number = if is_cursed {
|
||||
latest_cursed_inscription_number = if !latest_cursed_inscription_loaded {
|
||||
latest_cursed_inscription_loaded = true;
|
||||
match find_latest_cursed_inscription_number_at_block_height(
|
||||
&block.block_identifier.index,
|
||||
&inscription_height_hint.cursed,
|
||||
&transaction,
|
||||
&ctx,
|
||||
)? {
|
||||
None => -1,
|
||||
Some(inscription_number) => inscription_number - 1,
|
||||
}
|
||||
} else {
|
||||
latest_cursed_inscription_number - 1
|
||||
};
|
||||
latest_cursed_inscription_number
|
||||
} else {
|
||||
latest_blessed_inscription_number = if !latest_blessed_inscription_loaded {
|
||||
latest_blessed_inscription_loaded = true;
|
||||
match find_latest_inscription_number_at_block_height(
|
||||
&block.block_identifier.index,
|
||||
&inscription_height_hint.blessed,
|
||||
&transaction,
|
||||
&ctx,
|
||||
)? {
|
||||
None => 0,
|
||||
Some(inscription_number) => inscription_number + 1,
|
||||
}
|
||||
} else {
|
||||
latest_blessed_inscription_number + 1
|
||||
};
|
||||
latest_blessed_inscription_number
|
||||
};
|
||||
|
||||
let transaction_identifier = new_tx.transaction_identifier.clone();
|
||||
let traversal = match cache_l1
|
||||
.get(&(transaction_identifier, inscription.inscription_input_index))
|
||||
{
|
||||
Some(traversal) => traversal,
|
||||
None => {
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Unable to retrieve cached inscription data for inscription {}",
|
||||
new_tx.transaction_identifier.hash
|
||||
);
|
||||
});
|
||||
ordinals_events_indexes_to_discard.push_front(ordinal_event_index);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let outputs = &new_tx.metadata.outputs;
|
||||
inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset();
|
||||
inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height();
|
||||
inscription.ordinal_number = traversal.ordinal_number;
|
||||
inscription.inscription_number = traversal.inscription_number;
|
||||
inscription.transfers_pre_inscription = traversal.transfers;
|
||||
inscription.inscription_fee = new_tx.metadata.fee;
|
||||
inscription.tx_index = tx_index;
|
||||
inscription.satpoint_post_inscription = format_satpoint_to_watch(
|
||||
&traversal.transfer_data.transaction_identifier_location,
|
||||
traversal.transfer_data.output_index,
|
||||
traversal.transfer_data.inscription_offset_intra_output,
|
||||
);
|
||||
if let Some(output) = outputs.get(traversal.transfer_data.output_index) {
|
||||
inscription.inscription_output_value = output.value;
|
||||
inscription.inscriber_address = {
|
||||
let script_pub_key = output.get_script_pubkey_hex();
|
||||
match Script::from_hex(&script_pub_key) {
|
||||
Ok(script) => match Address::from_script(&script, network) {
|
||||
Ok(a) => Some(a.to_string()),
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
}
|
||||
};
|
||||
} else {
|
||||
ctx.try_log(|logger| {
|
||||
warn!(
|
||||
logger,
|
||||
"Database corrupted, skipping cursed inscription => {:?} / {:?}",
|
||||
traversal,
|
||||
outputs
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
if traversal.ordinal_number == 0 {
|
||||
// If the satoshi inscribed correspond to a sat overflow, we will store the inscription
|
||||
// and assign an inscription number after the other inscriptions, to mimick the
|
||||
// bug in ord.
|
||||
sats_overflow.push(inscription.clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(_entry) =
|
||||
find_inscription_with_ordinal_number(&traversal.ordinal_number, &transaction, &ctx)
|
||||
{
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Transaction {} in block {} is overriding an existing inscription {}",
|
||||
new_tx.transaction_identifier.hash,
|
||||
block.block_identifier.index,
|
||||
traversal.ordinal_number
|
||||
);
|
||||
});
|
||||
|
||||
inscription_number = if !latest_cursed_inscription_loaded {
|
||||
latest_cursed_inscription_loaded = true;
|
||||
match find_latest_cursed_inscription_number_at_block_height(
|
||||
&block.block_identifier.index,
|
||||
&inscription_height_hint.cursed,
|
||||
&transaction,
|
||||
&ctx,
|
||||
)? {
|
||||
None => -1,
|
||||
Some(inscription_number) => inscription_number - 1,
|
||||
}
|
||||
} else {
|
||||
latest_cursed_inscription_number - 1
|
||||
};
|
||||
inscription.curse_type = Some(OrdinalInscriptionCurseType::Reinscription);
|
||||
|
||||
if !is_cursed {
|
||||
ordinals_events_indexes_to_curse.push_front(ordinal_event_index);
|
||||
latest_blessed_inscription_number -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
inscription.inscription_number = inscription_number;
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Inscription {} (#{}) detected on Satoshi {} (block {}, {} transfers)",
|
||||
inscription.inscription_id,
|
||||
inscription.inscription_number,
|
||||
inscription.ordinal_number,
|
||||
block.block_identifier.index,
|
||||
inscription.transfers_pre_inscription,
|
||||
);
|
||||
});
|
||||
insert_entry_in_inscriptions(&inscription, &block.block_identifier, &transaction, &ctx);
|
||||
if inscription.curse_type.is_some() {
|
||||
cursed_inscription_sequence_updated = true;
|
||||
} else {
|
||||
blessed_inscription_sequence_updated = true;
|
||||
}
|
||||
storage_updated = true;
|
||||
}
|
||||
|
||||
for index in ordinals_events_indexes_to_curse.into_iter() {
|
||||
match new_tx.metadata.ordinal_operations.remove(index) {
|
||||
OrdinalOperation::InscriptionRevealed(inscription_data)
|
||||
| OrdinalOperation::CursedInscriptionRevealed(inscription_data) => {
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Inscription {} (#{}) transitioned from blessed to cursed",
|
||||
inscription_data.inscription_id,
|
||||
inscription_data.inscription_number,
|
||||
);
|
||||
});
|
||||
new_tx.metadata.ordinal_operations.insert(
|
||||
index,
|
||||
OrdinalOperation::CursedInscriptionRevealed(inscription_data),
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
for index in ordinals_events_indexes_to_discard.into_iter() {
|
||||
new_tx.metadata.ordinal_operations.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
for inscription in sats_overflow.iter_mut() {
|
||||
inscription.inscription_number = latest_blessed_inscription_number;
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Inscription {} (#{}) detected on Satoshi overflow {} (block {}, {} transfers)",
|
||||
inscription.inscription_id,
|
||||
inscription.inscription_number,
|
||||
inscription.ordinal_number,
|
||||
block.block_identifier.index,
|
||||
inscription.transfers_pre_inscription,
|
||||
);
|
||||
});
|
||||
insert_entry_in_inscriptions(&inscription, &block.block_identifier, &transaction, &ctx);
|
||||
latest_blessed_inscription_number += 1;
|
||||
storage_updated = true;
|
||||
if inscription.curse_type.is_some() {
|
||||
cursed_inscription_sequence_updated = true;
|
||||
} else {
|
||||
blessed_inscription_sequence_updated = true;
|
||||
}
|
||||
}
|
||||
|
||||
if cursed_inscription_sequence_updated {
|
||||
inscription_height_hint.cursed = Some(block.block_identifier.index);
|
||||
}
|
||||
if blessed_inscription_sequence_updated {
|
||||
inscription_height_hint.blessed = Some(block.block_identifier.index);
|
||||
}
|
||||
|
||||
Ok(storage_updated)
|
||||
}
|
||||
@@ -19,22 +19,9 @@ use chainhook_sdk::{
|
||||
utils::Context,
|
||||
};
|
||||
|
||||
use crate::ord::sat::Sat;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct InscriptionHeigthHint {
|
||||
pub cursed: Option<u64>,
|
||||
pub blessed: Option<u64>,
|
||||
}
|
||||
|
||||
impl InscriptionHeigthHint {
|
||||
pub fn new() -> InscriptionHeigthHint {
|
||||
InscriptionHeigthHint {
|
||||
cursed: None,
|
||||
blessed: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
use crate::{
|
||||
core::protocol::inscription_parsing::get_inscriptions_revealed_in_block, ord::sat::Sat,
|
||||
};
|
||||
|
||||
fn get_default_hord_db_file_path(base_dir: &PathBuf) -> PathBuf {
|
||||
let mut destination_path = base_dir.clone();
|
||||
@@ -94,7 +81,7 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
|
||||
[],
|
||||
) {
|
||||
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Err(e) = conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS locations (
|
||||
@@ -107,7 +94,11 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
|
||||
[],
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
warn!(logger, "Unable to create table locations: {}", e.to_string())
|
||||
warn!(
|
||||
logger,
|
||||
"Unable to create table locations: {}",
|
||||
e.to_string()
|
||||
)
|
||||
});
|
||||
} else {
|
||||
if let Err(e) = conn.execute(
|
||||
@@ -127,7 +118,7 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
|
||||
[],
|
||||
) {
|
||||
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
conn
|
||||
@@ -379,12 +370,42 @@ pub fn insert_entry_in_inscriptions(
|
||||
) {
|
||||
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
|
||||
}
|
||||
insert_inscription_in_locations(
|
||||
&inscription_data,
|
||||
&block_identifier,
|
||||
&inscriptions_db_conn_rw,
|
||||
ctx,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn insert_new_inscriptions_from_block_in_inscriptions_and_locations(
|
||||
block: &BitcoinBlockData,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
for inscription_data in get_inscriptions_revealed_in_block(&block).iter() {
|
||||
insert_entry_in_inscriptions(
|
||||
inscription_data,
|
||||
&block.block_identifier,
|
||||
inscriptions_db_conn_rw,
|
||||
&ctx,
|
||||
);
|
||||
insert_inscription_in_locations(
|
||||
&inscription_data,
|
||||
&block.block_identifier,
|
||||
&inscriptions_db_conn_rw,
|
||||
ctx,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_new_inscriptions_from_block_in_locations(
|
||||
block: &BitcoinBlockData,
|
||||
inscriptions_db_conn_rw: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
for inscription_data in get_inscriptions_revealed_in_block(&block).iter() {
|
||||
insert_inscription_in_locations(
|
||||
inscription_data,
|
||||
&block.block_identifier,
|
||||
inscriptions_db_conn_rw,
|
||||
&ctx,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_inscription_in_locations(
|
||||
@@ -438,18 +459,18 @@ pub fn insert_transfer_in_locations(
|
||||
|
||||
pub fn get_any_entry_in_ordinal_activities(
|
||||
block_height: &u64,
|
||||
inscriptions_db_conn: &Connection,
|
||||
inscriptions_db_tx: &Connection,
|
||||
_ctx: &Context,
|
||||
) -> bool {
|
||||
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
|
||||
let mut stmt = inscriptions_db_conn
|
||||
let mut stmt = inscriptions_db_tx
|
||||
.prepare("SELECT DISTINCT block_height FROM inscriptions WHERE block_height = ?")
|
||||
.unwrap();
|
||||
let mut rows = stmt.query(args).unwrap();
|
||||
while let Ok(Some(_)) = rows.next() {
|
||||
return true;
|
||||
}
|
||||
let mut stmt = inscriptions_db_conn
|
||||
let mut stmt = inscriptions_db_tx
|
||||
.prepare("SELECT DISTINCT block_height FROM locations WHERE block_height = ?")
|
||||
.unwrap();
|
||||
let mut rows = stmt.query(args).unwrap();
|
||||
@@ -539,7 +560,7 @@ pub fn find_latest_transfers_block_height(
|
||||
let mut rows = stmt.query(args).unwrap();
|
||||
while let Ok(Some(row)) = rows.next() {
|
||||
let block_height: u64 = row.get(0).unwrap();
|
||||
return Some(block_height)
|
||||
return Some(block_height);
|
||||
}
|
||||
None
|
||||
}
|
||||
@@ -682,14 +703,14 @@ pub fn find_latest_cursed_inscription_number_at_block_height(
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn find_inscription_with_ordinal_number(
|
||||
pub fn find_blessed_inscription_with_ordinal_number(
|
||||
ordinal_number: &u64,
|
||||
inscriptions_db_conn: &Connection,
|
||||
_ctx: &Context,
|
||||
) -> Option<String> {
|
||||
let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()];
|
||||
let mut stmt = inscriptions_db_conn
|
||||
.prepare("SELECT inscription_id FROM inscriptions WHERE ordinal_number = ? AND inscription_number > 0")
|
||||
.prepare("SELECT inscription_id FROM inscriptions WHERE ordinal_number = ? AND inscription_number >= 0")
|
||||
.unwrap();
|
||||
let mut rows = stmt.query(args).unwrap();
|
||||
while let Ok(Some(row)) = rows.next() {
|
||||
@@ -750,17 +771,17 @@ pub fn find_inscription_with_id(
|
||||
|
||||
pub fn find_all_inscriptions_in_block(
|
||||
block_height: &u64,
|
||||
inscriptions_db_conn: &Connection,
|
||||
inscriptions_db_tx: &Connection,
|
||||
ctx: &Context,
|
||||
) -> BTreeMap<(TransactionIdentifier, usize), TraversalResult> {
|
||||
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
|
||||
let mut stmt = inscriptions_db_conn
|
||||
let mut stmt = inscriptions_db_tx
|
||||
.prepare("SELECT inscription_number, ordinal_number, inscription_id FROM inscriptions where block_height = ? ORDER BY inscription_number ASC")
|
||||
.unwrap();
|
||||
let mut results = BTreeMap::new();
|
||||
let mut rows = stmt.query(args).unwrap();
|
||||
|
||||
let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_conn, ctx);
|
||||
let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_tx, ctx);
|
||||
while let Ok(Some(row)) = rows.next() {
|
||||
let inscription_number: i64 = row.get(0).unwrap();
|
||||
let ordinal_number: u64 = row.get(1).unwrap();
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use crate::config::{Config, PredicatesApi};
|
||||
use crate::core::pipeline::processors::inscription_indexing::re_augment_block_with_ordinals_operations;
|
||||
use crate::core::{self, get_inscriptions_revealed_in_block};
|
||||
use crate::core::protocol::inscription_parsing::{
|
||||
get_inscriptions_revealed_in_block, parse_ordinals_and_standardize_block,
|
||||
};
|
||||
use crate::core::protocol::inscription_sequencing::consolidate_block_with_pre_computed_ordinals_data;
|
||||
use crate::db::{get_any_entry_in_ordinal_activities, open_readonly_hord_db_conn};
|
||||
use crate::download::download_ordinals_dataset_if_required;
|
||||
use crate::service::{
|
||||
@@ -75,7 +77,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
|
||||
// Are we dealing with an ordinals-based predicate?
|
||||
// If so, we could use the ordinal storage to provide a set of hints.
|
||||
let inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;
|
||||
let mut inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
@@ -109,7 +111,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
let block_breakdown =
|
||||
download_and_parse_block_with_retry(&http_client, &block_hash, &bitcoin_config, ctx)
|
||||
.await?;
|
||||
let mut block = match core::parse_ordinals_and_standardize_block(
|
||||
let mut block = match parse_ordinals_and_standardize_block(
|
||||
block_breakdown,
|
||||
&event_observer_config.bitcoin_network,
|
||||
ctx,
|
||||
@@ -124,8 +126,15 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
}
|
||||
};
|
||||
|
||||
let empty_ctx = Context::empty();
|
||||
re_augment_block_with_ordinals_operations(&mut block, &inscriptions_db_conn, &empty_ctx);
|
||||
{
|
||||
let inscriptions_db_tx = inscriptions_db_conn.transaction().unwrap();
|
||||
consolidate_block_with_pre_computed_ordinals_data(
|
||||
&mut block,
|
||||
&inscriptions_db_tx,
|
||||
true,
|
||||
&ctx,
|
||||
);
|
||||
}
|
||||
|
||||
let inscriptions_revealed = get_inscriptions_revealed_in_block(&block)
|
||||
.iter()
|
||||
|
||||
@@ -6,13 +6,13 @@ use crate::core::pipeline::processors::inscription_indexing::process_blocks;
|
||||
use crate::core::pipeline::processors::start_inscription_indexing_processor;
|
||||
use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor;
|
||||
use crate::core::pipeline::{download_and_pipeline_blocks, PostProcessorCommand};
|
||||
use crate::core::protocol::inscription_parsing::parse_inscriptions_in_standardized_block;
|
||||
use crate::core::protocol::inscription_sequencing::SequenceCursor;
|
||||
use crate::core::{
|
||||
new_traversals_lazy_cache, parse_inscriptions_in_standardized_block,
|
||||
revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db,
|
||||
new_traversals_lazy_cache, revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db,
|
||||
};
|
||||
use crate::db::{
|
||||
find_latest_inscription_block_height, initialize_hord_db, insert_entry_in_blocks,
|
||||
open_readonly_hord_db_conn, open_readwrite_hord_dbs, InscriptionHeigthHint, LazyBlock, find_latest_transfers_block_height,
|
||||
insert_entry_in_blocks, open_readonly_hord_db_conn, open_readwrite_hord_dbs, LazyBlock,
|
||||
};
|
||||
use crate::scan::bitcoin::process_block_with_predicates;
|
||||
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
|
||||
@@ -105,7 +105,6 @@ impl Service {
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
|
||||
// let (cursor, tip) = {
|
||||
// let inscriptions_db_conn =
|
||||
// open_readonly_hord_db_conn(&self.config.expected_cache_path(), &self.ctx)?;
|
||||
@@ -117,6 +116,7 @@ impl Service {
|
||||
// };
|
||||
// self.replay_transfers(cursor, tip, Some(tx_replayer.clone()))
|
||||
// .await?;
|
||||
|
||||
self.update_state(Some(tx_replayer.clone())).await?;
|
||||
|
||||
// Catch-up with chain tip
|
||||
@@ -259,12 +259,15 @@ impl Service {
|
||||
|
||||
parse_inscriptions_in_standardized_block(block, &ctx);
|
||||
}
|
||||
let inscriptions_db_conn =
|
||||
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)
|
||||
.expect("unable to open inscriptions db");
|
||||
let mut sequence_cursor = SequenceCursor::new(inscriptions_db_conn);
|
||||
|
||||
let mut hint = InscriptionHeigthHint::new();
|
||||
let updated_blocks = process_blocks(
|
||||
&mut blocks,
|
||||
&mut sequence_cursor,
|
||||
&moved_traversals_cache,
|
||||
&mut hint,
|
||||
&mut inscriptions_db_conn_rw,
|
||||
&config.get_hord_config(),
|
||||
&None,
|
||||
|
||||
Reference in New Issue
Block a user