feat: plug inscription processing in ibd

This commit is contained in:
Ludo Galabru
2023-03-24 16:54:14 -04:00
parent 9fab5a34a2
commit df36617214
7 changed files with 135 additions and 35 deletions

View File

@@ -7,8 +7,8 @@ use crate::scan::stacks::scan_stacks_chain_with_predicate;
use chainhook_event_observer::chainhooks::types::ChainhookFullSpecification;
use chainhook_event_observer::hord::db::{
build_bitcoin_traversal_local_storage, find_inscriptions_at_wached_outpoint,
initialize_hord_db, open_readonly_hord_db_conn, retrieve_satoshi_point_using_local_storage,
fetch_and_cache_blocks_in_hord_db, find_inscriptions_at_wached_outpoint, initialize_hord_db,
open_readonly_hord_db_conn, retrieve_satoshi_point_using_local_storage,
};
use chainhook_event_observer::observer::BitcoinConfig;
use chainhook_event_observer::utils::Context;
@@ -381,17 +381,19 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
username: config.network.bitcoin_node_rpc_username.clone(),
password: config.network.bitcoin_node_rpc_password.clone(),
rpc_url: config.network.bitcoin_node_rpc_url.clone(),
network: config.network.bitcoin_network.clone(),
};
let hord_db_conn = initialize_hord_db(&config.expected_cache_path(), &ctx);
let _ = build_bitcoin_traversal_local_storage(
let _ = fetch_and_cache_blocks_in_hord_db(
&bitcoin_config,
&hord_db_conn,
cmd.start_block,
cmd.end_block,
&ctx,
cmd.network_threads,
None,
)
.await;
}

View File

@@ -465,8 +465,8 @@ impl Node {
.map_err(|e| format!("unable to parse response ({})", e))?;
let block = indexer::bitcoin::standardize_bitcoin_block(
&event_observer_config,
raw_block,
&event_observer_config.bitcoin_network,
&self.ctx,
)?;

View File

@@ -9,9 +9,8 @@ use chainhook_event_observer::chainhooks::types::{
BitcoinChainhookFullSpecification, BitcoinPredicateType, Protocols,
};
use chainhook_event_observer::hord::db::{
build_bitcoin_traversal_local_storage, find_all_inscriptions,
find_compacted_block_at_block_height, find_latest_compacted_block_known,
open_readonly_hord_db_conn, open_readwrite_hord_db_conn,
fetch_and_cache_blocks_in_hord_db, find_all_inscriptions, find_compacted_block_at_block_height,
find_latest_compacted_block_known, open_readonly_hord_db_conn, open_readwrite_hord_db_conn,
};
use chainhook_event_observer::indexer;
use chainhook_event_observer::indexer::bitcoin::{
@@ -115,13 +114,14 @@ pub async fn scan_bitcoin_chain_with_predicate(
"Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks",
(end_block - start_block)
);
build_bitcoin_traversal_local_storage(
fetch_and_cache_blocks_in_hord_db(
&config.get_event_observer_config().get_bitcoin_config(),
&rw_hord_db_conn,
start_block,
end_block,
&ctx,
8,
None,
)
.await?;
}
@@ -163,8 +163,8 @@ pub async fn scan_bitcoin_chain_with_predicate(
let block_breakdown =
retrieve_full_block_breakdown_with_retry(&block_hash, &bitcoin_config, ctx).await?;
let block = indexer::bitcoin::standardize_bitcoin_block(
&event_observer_config,
block_breakdown,
&event_observer_config.bitcoin_network,
ctx,
)?;

View File

@@ -1,4 +1,8 @@
use std::path::PathBuf;
use std::{
collections::HashMap,
path::PathBuf,
sync::mpsc::{channel, Sender},
};
use chainhook_types::{
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier,
@@ -11,13 +15,13 @@ use threadpool::ThreadPool;
use crate::{
indexer::bitcoin::{
retrieve_block_hash_with_retry, retrieve_full_block_breakdown_with_retry,
BitcoinBlockFullBreakdown,
standardize_bitcoin_block, BitcoinBlockFullBreakdown,
},
observer::BitcoinConfig,
utils::Context,
};
use super::ord::height::Height;
use super::{ord::height::Height, update_hord_db_and_augment_bitcoin_block};
fn get_default_hord_db_file_path(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
@@ -414,13 +418,98 @@ pub fn remove_entry_from_inscriptions(
}
}
pub async fn build_bitcoin_traversal_local_storage(
pub async fn update_hord_db(
bitcoin_config: &BitcoinConfig,
hord_db_path: &PathBuf,
hord_db_conn: &Connection,
start_block: u64,
end_block: u64,
_ctx: &Context,
network_thread: usize,
) -> Result<(), String> {
let (block_tx, block_rx) = channel::<BitcoinBlockFullBreakdown>();
let first_inscription_block_height = 767430;
let ctx = _ctx.clone();
let network = bitcoin_config.network.clone();
let hord_db_path = hord_db_path.clone();
let handle = hiro_system_kit::thread_named("Inscriptions indexing")
.spawn(move || {
let mut cursor = first_inscription_block_height;
let mut inbox = HashMap::new();
while let Ok(raw_block) = block_rx.recv() {
// Early return, only considering blocks after 1st inscription
if raw_block.height < first_inscription_block_height {
continue;
}
let block_height = raw_block.height;
inbox.insert(raw_block.height, raw_block);
// In the context of ordinals, we're constrained to process blocks sequentially
// Blocks are processed by a threadpool and could be coming out of order.
// Inbox block for later if the current block is not the one we should be
// processing.
if block_height != cursor {
continue;
}
// Is the action of processing a block allows us
// to process more blocks present in the inbox?
while let Some(next_block) = inbox.remove(&cursor) {
let mut new_block = match standardize_bitcoin_block(next_block, &network, &ctx)
{
Ok(block) => block,
Err(e) => {
ctx.try_log(|logger| {
slog::error!(logger, "Unable to standardize bitcoin block: {e}",)
});
return;
}
};
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
&mut new_block,
&hord_db_path,
&ctx,
) {
ctx.try_log(|logger| {
slog::error!(
logger,
"Unable to augment bitcoin block with hord_db: {e}",
)
});
return;
}
cursor += 1;
}
}
})
.expect("unable to detach thread");
fetch_and_cache_blocks_in_hord_db(
bitcoin_config,
hord_db_conn,
start_block,
end_block,
&_ctx,
network_thread,
Some(block_tx),
)
.await?;
let _ = handle.join();
Ok(())
}
pub async fn fetch_and_cache_blocks_in_hord_db(
bitcoin_config: &BitcoinConfig,
hord_db_conn: &Connection,
start_block: u64,
end_block: u64,
ctx: &Context,
network_thread: usize,
block_tx: Option<Sender<BitcoinBlockFullBreakdown>>,
) -> Result<(), String> {
let retrieve_block_hash_pool = ThreadPool::new(network_thread);
let (block_hash_tx, block_hash_rx) = crossbeam_channel::unbounded();
@@ -471,10 +560,14 @@ pub async fn build_bitcoin_traversal_local_storage(
.spawn(move || {
while let Ok(Some(block_data)) = block_data_rx.recv() {
let block_compressed_tx_moved = block_compressed_tx.clone();
let block_tx = block_tx.clone();
compress_block_data_pool.execute(move || {
let compressed_block = CompactedBlock::from_full_block(&block_data);
let _ = block_compressed_tx_moved
.send(Some((block_data.height as u32, compressed_block)));
let block_index = block_data.height as u32;
if let Some(block_tx) = block_tx {
let _ = block_tx.send(block_data);
}
let _ = block_compressed_tx_moved.send(Some((block_index, compressed_block)));
});
let res = compress_block_data_pool.join();

View File

@@ -7,6 +7,7 @@ use bitcoincore_rpc::bitcoin::{Address, Network, Script};
use chainhook_types::{BitcoinBlockData, OrdinalInscriptionTransferData, OrdinalOperation};
use hiro_system_kit::slog;
use std::collections::VecDeque;
use std::path::PathBuf;
use crate::{
hord::{
@@ -64,7 +65,7 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
pub fn update_hord_db_and_augment_bitcoin_block(
new_block: &mut BitcoinBlockData,
config: &EventObserverConfig,
hord_db_path: &PathBuf,
ctx: &Context,
) -> Result<(), String> {
{
@@ -77,7 +78,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
});
let compacted_block = CompactedBlock::from_standardized_block(&new_block);
let rw_hord_db_conn = open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx)?;
let rw_hord_db_conn = open_readwrite_hord_db_conn(&hord_db_path, &ctx)?;
insert_entry_in_blocks(
new_block.block_identifier.index as u32,
&compacted_block,
@@ -100,8 +101,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
new_tx.metadata.ordinal_operations.iter_mut().enumerate()
{
if let OrdinalOperation::InscriptionRevealed(inscription) = ordinal_event {
let hord_db_conn =
open_readonly_hord_db_conn(&config.get_cache_path_buf(), &ctx).unwrap(); // TODO(lgalabru)
let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &ctx).unwrap(); // TODO(lgalabru)
let (ordinal_block_height, ordinal_offset, ordinal_number) = {
// Are we looking at a re-inscription?
@@ -170,8 +170,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
{
let rw_hord_db_conn =
open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx)
.unwrap(); // TODO(lgalabru)
open_readwrite_hord_db_conn(&hord_db_path, &ctx).unwrap(); // TODO(lgalabru)
store_new_inscription(
&inscription,
&new_block.block_identifier,
@@ -186,7 +185,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
// Have inscriptions been transfered?
let mut sats_in_offset = 0;
let mut sats_out_offset = 0;
let hord_db_conn = open_readonly_hord_db_conn(&config.get_cache_path_buf(), &ctx).unwrap(); // TODO(lgalabru)
let hord_db_conn = open_readonly_hord_db_conn(&hord_db_path, &ctx)?;
for input in new_tx.metadata.inputs.iter() {
// input.previous_output.txid
@@ -291,8 +290,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
// Update watched outpoint
{
let rw_hord_db_conn =
open_readwrite_hord_db_conn(&config.get_cache_path_buf(), &ctx).unwrap(); // TODO(lgalabru)
let rw_hord_db_conn = open_readwrite_hord_db_conn(&hord_db_path, &ctx)?;
update_transfered_inscription(
&inscription_id,
&outpoint_post_transfer,

View File

@@ -6,7 +6,7 @@ use crate::chainhooks::types::{
get_canonical_pox_config, get_stacks_canonical_magic_bytes, PoxConfig, StacksOpcodes,
};
use crate::observer::{BitcoinConfig, EventObserverConfig};
use crate::observer::BitcoinConfig;
use crate::utils::Context;
use bitcoincore_rpc::bitcoin::hashes::hex::FromHex;
use bitcoincore_rpc::bitcoin::hashes::Hash;
@@ -17,10 +17,10 @@ use bitcoincore_rpc_json::{
pub use blocks_pool::BitcoinBlockPool;
use chainhook_types::bitcoin::{OutPoint, TxIn, TxOut};
use chainhook_types::{
BitcoinBlockData, BitcoinBlockMetadata, BitcoinTransactionData, BitcoinTransactionMetadata,
BlockCommitmentData, BlockHeader, BlockIdentifier, KeyRegistrationData, LockSTXData,
OrdinalInscriptionRevealData, OrdinalOperation, PoxReward, StacksBaseChainOperation,
StacksBlockCommitmentData, TransactionIdentifier, TransferSTXData,
BitcoinBlockData, BitcoinBlockMetadata, BitcoinNetwork, BitcoinTransactionData,
BitcoinTransactionMetadata, BlockCommitmentData, BlockHeader, BlockIdentifier,
KeyRegistrationData, LockSTXData, OrdinalInscriptionRevealData, OrdinalOperation, PoxReward,
StacksBaseChainOperation, StacksBlockCommitmentData, TransactionIdentifier, TransferSTXData,
};
use hiro_system_kit::slog;
@@ -261,14 +261,14 @@ pub async fn retrieve_block_hash(
}
pub fn standardize_bitcoin_block(
config: &EventObserverConfig,
block: BitcoinBlockFullBreakdown,
network: &BitcoinNetwork,
ctx: &Context,
) -> Result<BitcoinBlockData, String> {
let mut transactions = vec![];
let block_height = block.height as u64;
let expected_magic_bytes = get_stacks_canonical_magic_bytes(&config.bitcoin_network);
let pox_config = get_canonical_pox_config(&config.bitcoin_network);
let expected_magic_bytes = get_stacks_canonical_magic_bytes(&network);
let pox_config = get_canonical_pox_config(&network);
ctx.try_log(|logger| slog::debug!(logger, "Standardizing Bitcoin block {}", block.hash,));

View File

@@ -152,6 +152,7 @@ impl EventObserverConfig {
username: self.bitcoin_node_username.clone(),
password: self.bitcoin_node_password.clone(),
rpc_url: self.bitcoin_node_rpc_url.clone(),
network: self.bitcoin_network.clone(),
};
bitcoin_config
}
@@ -224,6 +225,7 @@ pub struct BitcoinConfig {
pub username: String,
pub password: String,
pub rpc_url: String,
pub network: BitcoinNetwork,
}
#[derive(Debug, Clone)]
@@ -507,7 +509,8 @@ pub async fn start_observer_commands_handler(
break;
}
ObserverCommand::ProcessBitcoinBlock(block_data) => {
let new_block = standardize_bitcoin_block(&config, block_data, &ctx)?;
let new_block =
standardize_bitcoin_block(block_data, &config.bitcoin_network, &ctx)?;
bitcoin_block_store.insert(new_block.block_identifier.clone(), new_block);
}
ObserverCommand::CacheBitcoinBlock(block) => {
@@ -528,7 +531,9 @@ pub async fn start_observer_commands_handler(
match bitcoin_block_store.get_mut(&header.block_identifier) {
Some(block) => {
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block, &config, &ctx,
block,
&config.get_cache_path_buf(),
&ctx,
) {
ctx.try_log(|logger| {
slog::error!(
@@ -612,7 +617,9 @@ pub async fn start_observer_commands_handler(
match bitcoin_block_store.get_mut(&header.block_identifier) {
Some(block) => {
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block, &config, &ctx,
block,
&config.get_cache_path_buf(),
&ctx,
) {
ctx.try_log(|logger| {
slog::error!(