mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-14 08:29:31 +08:00
feat: restore ability to replay transfers
This commit is contained in:
@@ -666,7 +666,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
RepairCommand::Transfers(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
let service = Service::new(config, ctx.clone());
|
||||
service.replay_transfers(cmd.start_block, cmd.end_block, None)?;
|
||||
service.replay_transfers(cmd.start_block, cmd.end_block, None).await?;
|
||||
}
|
||||
},
|
||||
Command::Db(HordDbCommand::Check(cmd)) => {
|
||||
|
||||
@@ -7,8 +7,8 @@ use std::{
|
||||
use chainhook_sdk::{
|
||||
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
|
||||
types::{
|
||||
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType, OrdinalOperation,
|
||||
TransactionIdentifier, OrdinalInscriptionTransferData,
|
||||
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType,
|
||||
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
|
||||
},
|
||||
utils::Context,
|
||||
};
|
||||
@@ -238,7 +238,7 @@ pub fn re_augment_block_with_ordinals_operations(
|
||||
// Restore inscriptions data
|
||||
let mut inscriptions =
|
||||
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx);
|
||||
|
||||
|
||||
let mut should_become_cursed = vec![];
|
||||
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
|
||||
for (op_index, operation) in tx.metadata.ordinal_operations.iter_mut().enumerate() {
|
||||
@@ -296,7 +296,10 @@ pub fn re_augment_block_with_ordinals_operations(
|
||||
let OrdinalOperation::InscriptionRevealed(inscription) = tx.metadata.ordinal_operations.remove(op_index) else {
|
||||
continue;
|
||||
};
|
||||
tx.metadata.ordinal_operations.insert(op_index, OrdinalOperation::CursedInscriptionRevealed(inscription));
|
||||
tx.metadata.ordinal_operations.insert(
|
||||
op_index,
|
||||
OrdinalOperation::CursedInscriptionRevealed(inscription),
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: Handle transfers
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod block_ingestion;
|
||||
pub mod inscription_indexing;
|
||||
pub mod transfers_recomputing;
|
||||
|
||||
pub use inscription_indexing::start_inscription_indexing_processor;
|
||||
|
||||
@@ -0,0 +1,205 @@
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
thread::{sleep, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use chainhook_sdk::{
|
||||
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
|
||||
types::{BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation},
|
||||
utils::Context,
|
||||
};
|
||||
use crossbeam_channel::{Sender, TryRecvError};
|
||||
|
||||
use crate::{
|
||||
core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
|
||||
db::{
|
||||
find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_locations,
|
||||
parse_satpoint_to_watch, remove_entries_from_locations_at_block_height,
|
||||
},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::Config,
|
||||
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
|
||||
db::open_readwrite_hord_db_conn,
|
||||
};
|
||||
|
||||
pub fn start_transfers_recomputing_processor(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
post_processor: Option<Sender<BitcoinBlockData>>,
|
||||
) -> PostProcessorController {
|
||||
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
|
||||
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();
|
||||
|
||||
let config = config.clone();
|
||||
let ctx = ctx.clone();
|
||||
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Inscription indexing runloop")
|
||||
.spawn(move || {
|
||||
let mut inscriptions_db_conn_rw =
|
||||
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
|
||||
let mut empty_cycles = 0;
|
||||
|
||||
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
|
||||
info!(ctx.expect_logger(), "Start transfers recomputing runloop");
|
||||
}
|
||||
|
||||
loop {
|
||||
let mut blocks = match commands_rx.try_recv() {
|
||||
Ok(PostProcessorCommand::ProcessBlocks(_, blocks)) => {
|
||||
empty_cycles = 0;
|
||||
blocks
|
||||
}
|
||||
Ok(PostProcessorCommand::Terminate) => break,
|
||||
Ok(PostProcessorCommand::Start) => continue,
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => {
|
||||
empty_cycles += 1;
|
||||
if empty_cycles == 10 {
|
||||
empty_cycles = 0;
|
||||
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
break;
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());
|
||||
|
||||
for block in blocks.iter_mut() {
|
||||
let network = match block.metadata.network {
|
||||
BitcoinNetwork::Mainnet => Network::Bitcoin,
|
||||
BitcoinNetwork::Regtest => Network::Regtest,
|
||||
BitcoinNetwork::Testnet => Network::Testnet,
|
||||
};
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Cleaning transfers from block {}", block.block_identifier.index
|
||||
);
|
||||
let inscriptions = find_all_inscriptions_in_block(
|
||||
&block.block_identifier.index,
|
||||
&inscriptions_db_conn_rw,
|
||||
&ctx,
|
||||
);
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"{} inscriptions retrieved at block {}",
|
||||
inscriptions.len(),
|
||||
block.block_identifier.index
|
||||
);
|
||||
let mut operations = BTreeMap::new();
|
||||
|
||||
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
|
||||
|
||||
remove_entries_from_locations_at_block_height(
|
||||
&block.block_identifier.index,
|
||||
&transaction,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
for (_, entry) in inscriptions.iter() {
|
||||
let inscription_id = entry.get_inscription_id();
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Processing inscription {}", inscription_id
|
||||
);
|
||||
insert_entry_in_locations(
|
||||
&inscription_id,
|
||||
block.block_identifier.index,
|
||||
&entry.transfer_data,
|
||||
&transaction,
|
||||
&ctx,
|
||||
);
|
||||
|
||||
operations.insert(
|
||||
entry.transaction_identifier_inscription.clone(),
|
||||
OrdinalInscriptionTransferData {
|
||||
inscription_id: entry.get_inscription_id(),
|
||||
updated_address: None,
|
||||
satpoint_pre_transfer: format_satpoint_to_watch(
|
||||
&entry.transaction_identifier_inscription,
|
||||
entry.inscription_input_index,
|
||||
0,
|
||||
),
|
||||
satpoint_post_transfer: format_satpoint_to_watch(
|
||||
&entry.transfer_data.transaction_identifier_location,
|
||||
entry.transfer_data.output_index,
|
||||
entry.transfer_data.inscription_offset_intra_output,
|
||||
),
|
||||
post_transfer_output_value: None,
|
||||
tx_index: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Rewriting transfers for block {}", block.block_identifier.index
|
||||
);
|
||||
|
||||
for tx in block.transactions.iter_mut() {
|
||||
tx.metadata.ordinal_operations.clear();
|
||||
if let Some(mut entry) = operations.remove(&tx.transaction_identifier) {
|
||||
let (_, output_index, _) =
|
||||
parse_satpoint_to_watch(&entry.satpoint_post_transfer);
|
||||
|
||||
let script_pub_key_hex =
|
||||
tx.metadata.outputs[output_index].get_script_pubkey_hex();
|
||||
let updated_address = match Script::from_hex(&script_pub_key_hex) {
|
||||
Ok(script) => {
|
||||
match Address::from_script(&script, network.clone()) {
|
||||
Ok(address) => Some(address.to_string()),
|
||||
Err(_e) => None,
|
||||
}
|
||||
}
|
||||
Err(_e) => None,
|
||||
};
|
||||
|
||||
entry.updated_address = updated_address;
|
||||
entry.post_transfer_output_value =
|
||||
Some(tx.metadata.outputs[output_index].value);
|
||||
|
||||
tx.metadata
|
||||
.ordinal_operations
|
||||
.push(OrdinalOperation::InscriptionTransferred(entry));
|
||||
}
|
||||
}
|
||||
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
|
||||
block,
|
||||
&transaction,
|
||||
&ctx,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Saving supdates for block {}", block.block_identifier.index
|
||||
);
|
||||
transaction.commit().unwrap();
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Transfers in block {} repaired", block.block_identifier.index
|
||||
);
|
||||
|
||||
if let Some(ref post_processor) = post_processor {
|
||||
let _ = post_processor.send(block.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
PostProcessorController {
|
||||
commands_tx,
|
||||
events_rx,
|
||||
thread_handle: handle,
|
||||
}
|
||||
}
|
||||
@@ -76,22 +76,6 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
|
||||
)
|
||||
});
|
||||
} else {
|
||||
if let Err(e) = conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS locations (
|
||||
inscription_id TEXT NOT NULL,
|
||||
block_height INTEGER NOT NULL,
|
||||
tx_index INTEGER NOT NULL,
|
||||
outpoint_to_watch TEXT NOT NULL,
|
||||
offset INTEGER NOT NULL,
|
||||
UNIQUE(outpoint_to_watch,offset)
|
||||
)",
|
||||
[],
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
warn!(logger, "Unable to create table locations:{}", e.to_string())
|
||||
});
|
||||
}
|
||||
|
||||
if let Err(e) = conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_ordinal_number ON inscriptions(ordinal_number);",
|
||||
[],
|
||||
@@ -110,7 +94,22 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
|
||||
[],
|
||||
) {
|
||||
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Err(e) = conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS locations (
|
||||
inscription_id TEXT NOT NULL,
|
||||
block_height INTEGER NOT NULL,
|
||||
tx_index INTEGER NOT NULL,
|
||||
outpoint_to_watch TEXT NOT NULL,
|
||||
offset INTEGER NOT NULL
|
||||
)",
|
||||
[],
|
||||
) {
|
||||
ctx.try_log(|logger| {
|
||||
warn!(logger, "Unable to create table locations: {}", e.to_string())
|
||||
});
|
||||
} else {
|
||||
if let Err(e) = conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS index_locations_on_block_height ON locations(block_height);",
|
||||
[],
|
||||
@@ -128,8 +127,9 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
|
||||
[],
|
||||
) {
|
||||
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
conn
|
||||
}
|
||||
|
||||
|
||||
@@ -133,10 +133,7 @@ fn handle_create_predicate(
|
||||
let predicate_uuid = predicate.get_uuid().to_string();
|
||||
|
||||
if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) {
|
||||
let key: String = format!(
|
||||
"{}",
|
||||
ChainhookSpecification::bitcoin_key(&predicate_uuid)
|
||||
);
|
||||
let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid));
|
||||
match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) {
|
||||
Ok(Some(_)) => {
|
||||
return Json(json!({
|
||||
@@ -172,10 +169,7 @@ fn handle_get_predicate(
|
||||
|
||||
match open_readwrite_predicates_db_conn(api_config) {
|
||||
Ok(mut predicates_db_conn) => {
|
||||
let key: String = format!(
|
||||
"{}",
|
||||
ChainhookSpecification::bitcoin_key(&predicate_uuid)
|
||||
);
|
||||
let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid));
|
||||
let entry = match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) {
|
||||
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({
|
||||
"chain": "stacks",
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::cli::fetch_and_standardize_block;
|
||||
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
|
||||
use crate::core::pipeline::processors::inscription_indexing::process_blocks;
|
||||
use crate::core::pipeline::processors::start_inscription_indexing_processor;
|
||||
use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor;
|
||||
use crate::core::pipeline::{download_and_pipeline_blocks, PostProcessorCommand};
|
||||
use crate::core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx;
|
||||
use crate::core::{
|
||||
@@ -12,10 +13,10 @@ use crate::core::{
|
||||
revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db,
|
||||
};
|
||||
use crate::db::{
|
||||
find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_blocks,
|
||||
insert_entry_in_locations, open_readwrite_hord_db_conn, open_readwrite_hord_dbs,
|
||||
parse_satpoint_to_watch, remove_entries_from_locations_at_block_height, InscriptionHeigthHint,
|
||||
LazyBlock,
|
||||
find_all_inscriptions_in_block, find_latest_inscription_block_height, format_satpoint_to_watch,
|
||||
insert_entry_in_blocks, insert_entry_in_locations, open_readonly_hord_db_conn,
|
||||
open_readwrite_hord_db_conn, open_readwrite_hord_dbs, parse_satpoint_to_watch,
|
||||
remove_entries_from_locations_at_block_height, InscriptionHeigthHint, LazyBlock, initialize_hord_db,
|
||||
};
|
||||
use crate::scan::bitcoin::process_block_with_predicates;
|
||||
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
|
||||
@@ -36,6 +37,7 @@ use chainhook_sdk::types::{
|
||||
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation,
|
||||
};
|
||||
use chainhook_sdk::utils::Context;
|
||||
use crossbeam_channel::unbounded;
|
||||
use redis::{Commands, Connection};
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
@@ -64,7 +66,9 @@ impl Service {
|
||||
let hord_config = self.config.get_hord_config();
|
||||
|
||||
// Sleep
|
||||
// std::thread::sleep(std::time::Duration::from_secs(180));
|
||||
std::thread::sleep(std::time::Duration::from_secs(600));
|
||||
|
||||
let _ = initialize_hord_db(&self.config.expected_cache_path(), &self.ctx);
|
||||
|
||||
// Force rebuild
|
||||
// {
|
||||
@@ -84,72 +88,48 @@ impl Service {
|
||||
// )?;
|
||||
// }
|
||||
|
||||
// download_and_pipeline_blocks(&self.config, 767400, 767429, 767400, None, &self.ctx).await?;
|
||||
let (tx_replayer, rx_replayer) = unbounded();
|
||||
let mut moved_event_observer_config = event_observer_config.clone();
|
||||
let moved_ctx = self.ctx.clone();
|
||||
|
||||
// Catch-up with chain tip
|
||||
{
|
||||
// Start predicate processor
|
||||
let (tx_replayer, rx_replayer) = channel();
|
||||
|
||||
let blocks_post_processor =
|
||||
start_inscription_indexing_processor(&self.config, &self.ctx, Some(tx_replayer));
|
||||
|
||||
let mut moved_event_observer_config = event_observer_config.clone();
|
||||
let moved_ctx = self.ctx.clone();
|
||||
|
||||
let _ = hiro_system_kit::thread_named("Initial predicate processing")
|
||||
.spawn(move || {
|
||||
if let Some(mut chainhook_config) =
|
||||
moved_event_observer_config.chainhook_config.take()
|
||||
{
|
||||
let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> =
|
||||
vec![];
|
||||
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() {
|
||||
bitcoin_predicates_ref.push(bitcoin_predicate);
|
||||
}
|
||||
while let Ok(block) = rx_replayer.recv() {
|
||||
let future = process_block_with_predicates(
|
||||
block,
|
||||
&bitcoin_predicates_ref,
|
||||
&moved_event_observer_config,
|
||||
&moved_ctx,
|
||||
);
|
||||
let res = hiro_system_kit::nestable_block_on(future);
|
||||
if let Err(_) = res {
|
||||
error!(moved_ctx.expect_logger(), "Initial ingestion failing");
|
||||
}
|
||||
let _ = hiro_system_kit::thread_named("Initial predicate processing")
|
||||
.spawn(move || {
|
||||
if let Some(mut chainhook_config) =
|
||||
moved_event_observer_config.chainhook_config.take()
|
||||
{
|
||||
let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> = vec![];
|
||||
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() {
|
||||
bitcoin_predicates_ref.push(bitcoin_predicate);
|
||||
}
|
||||
while let Ok(block) = rx_replayer.recv() {
|
||||
let future = process_block_with_predicates(
|
||||
block,
|
||||
&bitcoin_predicates_ref,
|
||||
&moved_event_observer_config,
|
||||
&moved_ctx,
|
||||
);
|
||||
let res = hiro_system_kit::nestable_block_on(future);
|
||||
if let Err(_) = res {
|
||||
error!(moved_ctx.expect_logger(), "Initial ingestion failing");
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
}
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
while let Some((start_block, end_block, speed)) =
|
||||
should_sync_hord_db(&self.config, &self.ctx)?
|
||||
{
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Indexing inscriptions from block #{start_block} to block #{end_block}"
|
||||
);
|
||||
|
||||
let hord_config = self.config.get_hord_config();
|
||||
|
||||
download_and_pipeline_blocks(
|
||||
&self.config,
|
||||
start_block,
|
||||
end_block,
|
||||
hord_config.first_inscription_height,
|
||||
None,
|
||||
Some(&blocks_post_processor),
|
||||
speed,
|
||||
&self.ctx,
|
||||
)
|
||||
.await?;
|
||||
let tip = {
|
||||
let inscriptions_db_conn =
|
||||
open_readonly_hord_db_conn(&self.config.expected_cache_path(), &self.ctx)?;
|
||||
match find_latest_inscription_block_height(&inscriptions_db_conn, &self.ctx)? {
|
||||
Some(height) => height,
|
||||
None => panic!(),
|
||||
}
|
||||
};
|
||||
|
||||
let _ = blocks_post_processor
|
||||
.commands_tx
|
||||
.send(PostProcessorCommand::Terminate);
|
||||
}
|
||||
self.replay_transfers(767430, tip, Some(tx_replayer.clone())).await?;
|
||||
self.update_state(Some(tx_replayer.clone())).await?;
|
||||
|
||||
// Catch-up with chain tip
|
||||
|
||||
// Bitcoin scan operation threadpool
|
||||
let (observer_command_tx, observer_command_rx) = channel();
|
||||
@@ -420,165 +400,85 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn replay_transfers(
|
||||
pub async fn update_state(
|
||||
&self,
|
||||
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
|
||||
) -> Result<(), String> {
|
||||
// Start predicate processor
|
||||
let blocks_post_processor =
|
||||
start_transfers_recomputing_processor(&self.config, &self.ctx, block_post_processor);
|
||||
|
||||
while let Some((start_block, end_block, speed)) =
|
||||
should_sync_hord_db(&self.config, &self.ctx)?
|
||||
{
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Indexing inscriptions from block #{start_block} to block #{end_block}"
|
||||
);
|
||||
|
||||
let hord_config = self.config.get_hord_config();
|
||||
let first_inscription_height = hord_config.first_inscription_height;
|
||||
download_and_pipeline_blocks(
|
||||
&self.config,
|
||||
start_block,
|
||||
end_block,
|
||||
first_inscription_height,
|
||||
if end_block < first_inscription_height {
|
||||
Some(&blocks_post_processor)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
if end_block < first_inscription_height {
|
||||
None
|
||||
} else {
|
||||
Some(&blocks_post_processor)
|
||||
},
|
||||
speed,
|
||||
&self.ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let _ = blocks_post_processor
|
||||
.commands_tx
|
||||
.send(PostProcessorCommand::Terminate);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn replay_transfers(
|
||||
&self,
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
|
||||
) -> Result<(), String> {
|
||||
info!(self.ctx.expect_logger(), "Transfers only");
|
||||
// Start predicate processor
|
||||
let blocks_post_processor =
|
||||
start_transfers_recomputing_processor(&self.config, &self.ctx, block_post_processor);
|
||||
|
||||
let bitcoin_config = BitcoinConfig {
|
||||
username: self.config.network.bitcoind_rpc_username.clone(),
|
||||
password: self.config.network.bitcoind_rpc_password.clone(),
|
||||
rpc_url: self.config.network.bitcoind_rpc_url.clone(),
|
||||
network: self.config.network.bitcoin_network.clone(),
|
||||
bitcoin_block_signaling: self.config.network.bitcoin_block_signaling.clone(),
|
||||
};
|
||||
let (tx, rx) = crossbeam_channel::bounded(100);
|
||||
let moved_ctx = self.ctx.clone();
|
||||
hiro_system_kit::thread_named("Block fetch")
|
||||
.spawn(move || {
|
||||
let http_client = build_http_client();
|
||||
for cursor in start_block..=end_block {
|
||||
info!(moved_ctx.expect_logger(), "Fetching block {}", cursor);
|
||||
let future = fetch_and_standardize_block(
|
||||
&http_client,
|
||||
cursor,
|
||||
&bitcoin_config,
|
||||
&moved_ctx,
|
||||
);
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Indexing inscriptions from block #{start_block} to block #{end_block}"
|
||||
);
|
||||
|
||||
let block = hiro_system_kit::nestable_block_on(future).unwrap();
|
||||
let hord_config = self.config.get_hord_config();
|
||||
let first_inscription_height = hord_config.first_inscription_height;
|
||||
download_and_pipeline_blocks(
|
||||
&self.config,
|
||||
start_block,
|
||||
end_block,
|
||||
first_inscription_height,
|
||||
None,
|
||||
Some(&blocks_post_processor),
|
||||
100,
|
||||
&self.ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let _ = tx.send(block);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
let _ = blocks_post_processor
|
||||
.commands_tx
|
||||
.send(PostProcessorCommand::Terminate);
|
||||
|
||||
let mut inscriptions_db_conn_rw =
|
||||
open_readwrite_hord_db_conn(&self.config.expected_cache_path(), &self.ctx)?;
|
||||
|
||||
while let Ok(mut block) = rx.recv() {
|
||||
let network = match block.metadata.network {
|
||||
BitcoinNetwork::Mainnet => Network::Bitcoin,
|
||||
BitcoinNetwork::Regtest => Network::Regtest,
|
||||
BitcoinNetwork::Testnet => Network::Testnet,
|
||||
};
|
||||
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Cleaning transfers from block {}", block.block_identifier.index
|
||||
);
|
||||
let inscriptions = find_all_inscriptions_in_block(
|
||||
&block.block_identifier.index,
|
||||
&inscriptions_db_conn_rw,
|
||||
&self.ctx,
|
||||
);
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"{} inscriptions retrieved at block {}",
|
||||
inscriptions.len(),
|
||||
block.block_identifier.index
|
||||
);
|
||||
let mut operations = BTreeMap::new();
|
||||
|
||||
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
|
||||
|
||||
remove_entries_from_locations_at_block_height(
|
||||
&block.block_identifier.index,
|
||||
&transaction,
|
||||
&self.ctx,
|
||||
);
|
||||
|
||||
for (_, entry) in inscriptions.iter() {
|
||||
let inscription_id = entry.get_inscription_id();
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Processing inscription {}", inscription_id
|
||||
);
|
||||
insert_entry_in_locations(
|
||||
&inscription_id,
|
||||
block.block_identifier.index,
|
||||
&entry.transfer_data,
|
||||
&transaction,
|
||||
&self.ctx,
|
||||
);
|
||||
|
||||
operations.insert(
|
||||
entry.transaction_identifier_inscription.clone(),
|
||||
OrdinalInscriptionTransferData {
|
||||
inscription_id: entry.get_inscription_id(),
|
||||
updated_address: None,
|
||||
satpoint_pre_transfer: format_satpoint_to_watch(
|
||||
&entry.transaction_identifier_inscription,
|
||||
entry.inscription_input_index,
|
||||
0,
|
||||
),
|
||||
satpoint_post_transfer: format_satpoint_to_watch(
|
||||
&entry.transfer_data.transaction_identifier_location,
|
||||
entry.transfer_data.output_index,
|
||||
entry.transfer_data.inscription_offset_intra_output,
|
||||
),
|
||||
post_transfer_output_value: None,
|
||||
tx_index: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Rewriting transfers for block {}", block.block_identifier.index
|
||||
);
|
||||
|
||||
for tx in block.transactions.iter_mut() {
|
||||
tx.metadata.ordinal_operations.clear();
|
||||
if let Some(mut entry) = operations.remove(&tx.transaction_identifier) {
|
||||
let (_, output_index, _) =
|
||||
parse_satpoint_to_watch(&entry.satpoint_post_transfer);
|
||||
|
||||
let script_pub_key_hex =
|
||||
tx.metadata.outputs[output_index].get_script_pubkey_hex();
|
||||
let updated_address = match Script::from_hex(&script_pub_key_hex) {
|
||||
Ok(script) => match Address::from_script(&script, network.clone()) {
|
||||
Ok(address) => Some(address.to_string()),
|
||||
Err(_e) => None,
|
||||
},
|
||||
Err(_e) => None,
|
||||
};
|
||||
|
||||
entry.updated_address = updated_address;
|
||||
entry.post_transfer_output_value =
|
||||
Some(tx.metadata.outputs[output_index].value);
|
||||
|
||||
tx.metadata
|
||||
.ordinal_operations
|
||||
.push(OrdinalOperation::InscriptionTransferred(entry));
|
||||
}
|
||||
}
|
||||
|
||||
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
|
||||
&mut block,
|
||||
&transaction,
|
||||
&self.ctx,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Saving supdates for block {}", block.block_identifier.index
|
||||
);
|
||||
transaction.commit().unwrap();
|
||||
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Transfers in block {} repaired", block.block_identifier.index
|
||||
);
|
||||
|
||||
if let Some(ref tx) = block_post_processor {
|
||||
let _ = tx.send(block);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user