mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 16:52:57 +08:00
feat: plug inscription processing in ibd
This commit is contained in:
@@ -7,8 +7,8 @@ use crate::scan::stacks::scan_stacks_chain_with_predicate;
|
||||
|
||||
use chainhook_event_observer::chainhooks::types::ChainhookFullSpecification;
|
||||
use chainhook_event_observer::hord::db::{
|
||||
build_bitcoin_traversal_local_storage, find_inscriptions_at_wached_outpoint,
|
||||
initialize_hord_db, open_readonly_hord_db_conn, retrieve_satoshi_point_using_local_storage,
|
||||
fetch_and_cache_blocks_in_hord_db, find_inscriptions_at_wached_outpoint, initialize_hord_db,
|
||||
open_readonly_hord_db_conn, retrieve_satoshi_point_using_local_storage,
|
||||
};
|
||||
use chainhook_event_observer::observer::BitcoinConfig;
|
||||
use chainhook_event_observer::utils::Context;
|
||||
@@ -381,17 +381,19 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
username: config.network.bitcoin_node_rpc_username.clone(),
|
||||
password: config.network.bitcoin_node_rpc_password.clone(),
|
||||
rpc_url: config.network.bitcoin_node_rpc_url.clone(),
|
||||
network: config.network.bitcoin_network.clone(),
|
||||
};
|
||||
|
||||
let hord_db_conn = initialize_hord_db(&config.expected_cache_path(), &ctx);
|
||||
|
||||
let _ = build_bitcoin_traversal_local_storage(
|
||||
let _ = fetch_and_cache_blocks_in_hord_db(
|
||||
&bitcoin_config,
|
||||
&hord_db_conn,
|
||||
cmd.start_block,
|
||||
cmd.end_block,
|
||||
&ctx,
|
||||
cmd.network_threads,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -465,8 +465,8 @@ impl Node {
|
||||
.map_err(|e| format!("unable to parse response ({})", e))?;
|
||||
|
||||
let block = indexer::bitcoin::standardize_bitcoin_block(
|
||||
&event_observer_config,
|
||||
raw_block,
|
||||
&event_observer_config.bitcoin_network,
|
||||
&self.ctx,
|
||||
)?;
|
||||
|
||||
|
||||
@@ -9,9 +9,8 @@ use chainhook_event_observer::chainhooks::types::{
|
||||
BitcoinChainhookFullSpecification, BitcoinPredicateType, Protocols,
|
||||
};
|
||||
use chainhook_event_observer::hord::db::{
|
||||
build_bitcoin_traversal_local_storage, find_all_inscriptions,
|
||||
find_compacted_block_at_block_height, find_latest_compacted_block_known,
|
||||
open_readonly_hord_db_conn, open_readwrite_hord_db_conn,
|
||||
fetch_and_cache_blocks_in_hord_db, find_all_inscriptions, find_compacted_block_at_block_height,
|
||||
find_latest_compacted_block_known, open_readonly_hord_db_conn, open_readwrite_hord_db_conn,
|
||||
};
|
||||
use chainhook_event_observer::indexer;
|
||||
use chainhook_event_observer::indexer::bitcoin::{
|
||||
@@ -115,13 +114,14 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
"Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks",
|
||||
(end_block - start_block)
|
||||
);
|
||||
build_bitcoin_traversal_local_storage(
|
||||
fetch_and_cache_blocks_in_hord_db(
|
||||
&config.get_event_observer_config().get_bitcoin_config(),
|
||||
&rw_hord_db_conn,
|
||||
start_block,
|
||||
end_block,
|
||||
&ctx,
|
||||
8,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -163,8 +163,8 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
let block_breakdown =
|
||||
retrieve_full_block_breakdown_with_retry(&block_hash, &bitcoin_config, ctx).await?;
|
||||
let block = indexer::bitcoin::standardize_bitcoin_block(
|
||||
&event_observer_config,
|
||||
block_breakdown,
|
||||
&event_observer_config.bitcoin_network,
|
||||
ctx,
|
||||
)?;
|
||||
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
use std::path::PathBuf;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::PathBuf,
|
||||
sync::mpsc::{channel, Sender},
|
||||
};
|
||||
|
||||
use chainhook_types::{
|
||||
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier,
|
||||
@@ -11,13 +15,13 @@ use threadpool::ThreadPool;
|
||||
use crate::{
|
||||
indexer::bitcoin::{
|
||||
retrieve_block_hash_with_retry, retrieve_full_block_breakdown_with_retry,
|
||||
BitcoinBlockFullBreakdown,
|
||||
standardize_bitcoin_block, BitcoinBlockFullBreakdown,
|
||||
},
|
||||
observer::BitcoinConfig,
|
||||
utils::Context,
|
||||
};
|
||||
|
||||
use super::ord::height::Height;
|
||||
use super::{ord::height::Height, update_hord_db_and_augment_bitcoin_block};
|
||||
|
||||
fn get_default_hord_db_file_path(base_dir: &PathBuf) -> PathBuf {
|
||||
let mut destination_path = base_dir.clone();
|
||||
@@ -414,13 +418,98 @@ pub fn remove_entry_from_inscriptions(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build_bitcoin_traversal_local_storage(
|
||||
pub async fn update_hord_db(
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
hord_db_path: &PathBuf,
|
||||
hord_db_conn: &Connection,
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
_ctx: &Context,
|
||||
network_thread: usize,
|
||||
) -> Result<(), String> {
|
||||
let (block_tx, block_rx) = channel::<BitcoinBlockFullBreakdown>();
|
||||
let first_inscription_block_height = 767430;
|
||||
let ctx = _ctx.clone();
|
||||
let network = bitcoin_config.network.clone();
|
||||
let hord_db_path = hord_db_path.clone();
|
||||
let handle = hiro_system_kit::thread_named("Inscriptions indexing")
|
||||
.spawn(move || {
|
||||
let mut cursor = first_inscription_block_height;
|
||||
let mut inbox = HashMap::new();
|
||||
|
||||
while let Ok(raw_block) = block_rx.recv() {
|
||||
// Early return, only considering blocks after 1st inscription
|
||||
if raw_block.height < first_inscription_block_height {
|
||||
continue;
|
||||
}
|
||||
let block_height = raw_block.height;
|
||||
inbox.insert(raw_block.height, raw_block);
|
||||
|
||||
// In the context of ordinals, we're constrained to process blocks sequentially
|
||||
// Blocks are processed by a threadpool and could be coming out of order.
|
||||
// Inbox block for later if the current block is not the one we should be
|
||||
// processing.
|
||||
if block_height != cursor {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Is the action of processing a block allows us
|
||||
// to process more blocks present in the inbox?
|
||||
while let Some(next_block) = inbox.remove(&cursor) {
|
||||
let mut new_block = match standardize_bitcoin_block(next_block, &network, &ctx)
|
||||
{
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(logger, "Unable to standardize bitcoin block: {e}",)
|
||||
});
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
&mut new_block,
|
||||
&hord_db_path,
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
logger,
|
||||
"Unable to augment bitcoin block with hord_db: {e}",
|
||||
)
|
||||
});
|
||||
return;
|
||||
}
|
||||
cursor += 1;
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("unable to detach thread");
|
||||
|
||||
fetch_and_cache_blocks_in_hord_db(
|
||||
bitcoin_config,
|
||||
hord_db_conn,
|
||||
start_block,
|
||||
end_block,
|
||||
&_ctx,
|
||||
network_thread,
|
||||
Some(block_tx),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let _ = handle.join();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fetch_and_cache_blocks_in_hord_db(
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
hord_db_conn: &Connection,
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
ctx: &Context,
|
||||
network_thread: usize,
|
||||
block_tx: Option<Sender<BitcoinBlockFullBreakdown>>,
|
||||
) -> Result<(), String> {
|
||||
let retrieve_block_hash_pool = ThreadPool::new(network_thread);
|
||||
let (block_hash_tx, block_hash_rx) = crossbeam_channel::unbounded();
|
||||
@@ -471,10 +560,14 @@ pub async fn build_bitcoin_traversal_local_storage(
|
||||
.spawn(move || {
|
||||
while let Ok(Some(block_data)) = block_data_rx.recv() {
|
||||
let block_compressed_tx_moved = block_compressed_tx.clone();
|
||||
let block_tx = block_tx.clone();
|
||||
compress_block_data_pool.execute(move || {
|
||||
let compressed_block = CompactedBlock::from_full_block(&block_data);
|
||||
let _ = block_compressed_tx_moved
|
||||
.send(Some((block_data.height as u32, compressed_block)));
|
||||
let block_index = block_data.height as u32;
|
||||
if let Some(block_tx) = block_tx {
|
||||
let _ = block_tx.send(block_data);
|
||||
}
|
||||
let _ = block_compressed_tx_moved.send(Some((block_index, compressed_block)));
|
||||
});
|
||||
|
||||
let res = compress_block_data_pool.join();
|
||||
|
||||
@@ -7,6 +7,7 @@ use bitcoincore_rpc::bitcoin::{Address, Network, Script};
|
||||
use chainhook_types::{BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation};
|
||||
use hiro_system_kit::slog;
|
||||
use std::collections::VecDeque;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::{
|
||||
hord::{
|
||||
@@ -64,7 +65,7 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
|
||||
|
||||
pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
new_block: &mut BitcoinBlockData,
|
||||
config: &EventObserverConfig,
|
||||
hord_db_path: &PathBuf,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
{
|
||||
@@ -77,7 +78,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
});
|
||||
|
||||
let compacted_block = CompactedBlock::from_standardized_block(&new_block);
|
||||
let rw_hord_db_conn = open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx)?;
|
||||
let rw_hord_db_conn = open_readwrite_hord_db_conn(&hord_db_path, &ctx)?;
|
||||
insert_entry_in_blocks(
|
||||
new_block.block_identifier.index as u32,
|
||||
&compacted_block,
|
||||
@@ -100,8 +101,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
new_tx.metadata.ordinal_operations.iter_mut().enumerate()
|
||||
{
|
||||
if let OrdinalOperation::InscriptionRevealed(inscription) = ordinal_event {
|
||||
let hord_db_conn =
|
||||
open_readonly_hord_db_conn(&config.get_cache_path_buf(), &ctx).unwrap(); // TODO(lgalabru)
|
||||
let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &ctx).unwrap(); // TODO(lgalabru)
|
||||
|
||||
let (ordinal_block_height, ordinal_offset, ordinal_number) = {
|
||||
// Are we looking at a re-inscription?
|
||||
@@ -170,8 +170,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
|
||||
{
|
||||
let rw_hord_db_conn =
|
||||
open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx)
|
||||
.unwrap(); // TODO(lgalabru)
|
||||
open_readwrite_hord_db_conn(&hord_db_path, &ctx).unwrap(); // TODO(lgalabru)
|
||||
store_new_inscription(
|
||||
&inscription,
|
||||
&new_block.block_identifier,
|
||||
@@ -186,7 +185,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
// Have inscriptions been transfered?
|
||||
let mut sats_in_offset = 0;
|
||||
let mut sats_out_offset = 0;
|
||||
let hord_db_conn = open_readonly_hord_db_conn(&config.get_cache_path_buf(), &ctx).unwrap(); // TODO(lgalabru)
|
||||
let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &ctx)?;
|
||||
|
||||
for input in new_tx.metadata.inputs.iter() {
|
||||
// input.previous_output.txid
|
||||
@@ -291,8 +290,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
|
||||
|
||||
// Update watched outpoint
|
||||
{
|
||||
let rw_hord_db_conn =
|
||||
open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx).unwrap(); // TODO(lgalabru)
|
||||
let rw_hord_db_conn = open_readwrite_hord_db_conn(&hord_db_path, &ctx)?;
|
||||
update_transfered_inscription(
|
||||
&inscription_id,
|
||||
&outpoint_post_transfer,
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::chainhooks::types::{
|
||||
get_canonical_pox_config, get_stacks_canonical_magic_bytes, PoxConfig, StacksOpcodes,
|
||||
};
|
||||
|
||||
use crate::observer::{BitcoinConfig, EventObserverConfig};
|
||||
use crate::observer::BitcoinConfig;
|
||||
use crate::utils::Context;
|
||||
use bitcoincore_rpc::bitcoin::hashes::hex::FromHex;
|
||||
use bitcoincore_rpc::bitcoin::hashes::Hash;
|
||||
@@ -17,10 +17,10 @@ use bitcoincore_rpc_json::{
|
||||
pub use blocks_pool::BitcoinBlockPool;
|
||||
use chainhook_types::bitcoin::{OutPoint, TxIn, TxOut};
|
||||
use chainhook_types::{
|
||||
BitcoinBlockData, BitcoinBlockMetadata, BitcoinTransactionData, BitcoinTransactionMetadata,
|
||||
BlockCommitmentData, BlockHeader, BlockIdentifier, KeyRegistrationData, LockSTXData,
|
||||
OrdinalInscriptionRevealData, OrdinalOperation, PoxReward, StacksBaseChainOperation,
|
||||
StacksBlockCommitmentData, TransactionIdentifier, TransferSTXData,
|
||||
BitcoinBlockData, BitcoinBlockMetadata, BitcoinNetwork, BitcoinTransactionData,
|
||||
BitcoinTransactionMetadata, BlockCommitmentData, BlockHeader, BlockIdentifier,
|
||||
KeyRegistrationData, LockSTXData, OrdinalInscriptionRevealData, OrdinalOperation, PoxReward,
|
||||
StacksBaseChainOperation, StacksBlockCommitmentData, TransactionIdentifier, TransferSTXData,
|
||||
};
|
||||
use hiro_system_kit::slog;
|
||||
|
||||
@@ -261,14 +261,14 @@ pub async fn retrieve_block_hash(
|
||||
}
|
||||
|
||||
pub fn standardize_bitcoin_block(
|
||||
config: &EventObserverConfig,
|
||||
block: BitcoinBlockFullBreakdown,
|
||||
network: &BitcoinNetwork,
|
||||
ctx: &Context,
|
||||
) -> Result<BitcoinBlockData, String> {
|
||||
let mut transactions = vec![];
|
||||
let block_height = block.height as u64;
|
||||
let expected_magic_bytes = get_stacks_canonical_magic_bytes(&config.bitcoin_network);
|
||||
let pox_config = get_canonical_pox_config(&config.bitcoin_network);
|
||||
let expected_magic_bytes = get_stacks_canonical_magic_bytes(&network);
|
||||
let pox_config = get_canonical_pox_config(&network);
|
||||
|
||||
ctx.try_log(|logger| slog::debug!(logger, "Standardizing Bitcoin block {}", block.hash,));
|
||||
|
||||
|
||||
@@ -152,6 +152,7 @@ impl EventObserverConfig {
|
||||
username: self.bitcoin_node_username.clone(),
|
||||
password: self.bitcoin_node_password.clone(),
|
||||
rpc_url: self.bitcoin_node_rpc_url.clone(),
|
||||
network: self.bitcoin_network.clone(),
|
||||
};
|
||||
bitcoin_config
|
||||
}
|
||||
@@ -224,6 +225,7 @@ pub struct BitcoinConfig {
|
||||
pub username: String,
|
||||
pub password: String,
|
||||
pub rpc_url: String,
|
||||
pub network: BitcoinNetwork,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -507,7 +509,8 @@ pub async fn start_observer_commands_handler(
|
||||
break;
|
||||
}
|
||||
ObserverCommand::ProcessBitcoinBlock(block_data) => {
|
||||
let new_block = standardize_bitcoin_block(&config, block_data, &ctx)?;
|
||||
let new_block =
|
||||
standardize_bitcoin_block(block_data, &config.bitcoin_network, &ctx)?;
|
||||
bitcoin_block_store.insert(new_block.block_identifier.clone(), new_block);
|
||||
}
|
||||
ObserverCommand::CacheBitcoinBlock(block) => {
|
||||
@@ -528,7 +531,9 @@ pub async fn start_observer_commands_handler(
|
||||
match bitcoin_block_store.get_mut(&header.block_identifier) {
|
||||
Some(block) => {
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
block, &config, &ctx,
|
||||
block,
|
||||
&config.get_cache_path_buf(),
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
@@ -612,7 +617,9 @@ pub async fn start_observer_commands_handler(
|
||||
match bitcoin_block_store.get_mut(&header.block_identifier) {
|
||||
Some(block) => {
|
||||
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
|
||||
block, &config, &ctx,
|
||||
block,
|
||||
&config.get_cache_path_buf(),
|
||||
&ctx,
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
|
||||
Reference in New Issue
Block a user