feat: restore ability to replay transfers

This commit is contained in:
Ludo Galabru
2023-08-03 09:37:26 +02:00
parent dcdfd1655c
commit 98e7e9b21d
7 changed files with 349 additions and 246 deletions

View File

@@ -666,7 +666,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
RepairCommand::Transfers(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let service = Service::new(config, ctx.clone());
service.replay_transfers(cmd.start_block, cmd.end_block, None)?;
service.replay_transfers(cmd.start_block, cmd.end_block, None).await?;
}
},
Command::Db(HordDbCommand::Check(cmd)) => {

View File

@@ -7,8 +7,8 @@ use std::{
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType, OrdinalOperation,
TransactionIdentifier, OrdinalInscriptionTransferData,
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType,
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier,
},
utils::Context,
};
@@ -238,7 +238,7 @@ pub fn re_augment_block_with_ordinals_operations(
// Restore inscriptions data
let mut inscriptions =
find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx);
let mut should_become_cursed = vec![];
for (tx_index, tx) in block.transactions.iter_mut().enumerate() {
for (op_index, operation) in tx.metadata.ordinal_operations.iter_mut().enumerate() {
@@ -296,7 +296,10 @@ pub fn re_augment_block_with_ordinals_operations(
let OrdinalOperation::InscriptionRevealed(inscription) = tx.metadata.ordinal_operations.remove(op_index) else {
continue;
};
tx.metadata.ordinal_operations.insert(op_index, OrdinalOperation::CursedInscriptionRevealed(inscription));
tx.metadata.ordinal_operations.insert(
op_index,
OrdinalOperation::CursedInscriptionRevealed(inscription),
);
}
// TODO: Handle transfers

View File

@@ -1,4 +1,5 @@
pub mod block_ingestion;
pub mod inscription_indexing;
pub mod transfers_recomputing;
pub use inscription_indexing::start_inscription_indexing_processor;

View File

@@ -0,0 +1,205 @@
use std::{
collections::BTreeMap,
thread::{sleep, JoinHandle},
time::Duration,
};
use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation},
utils::Context,
};
use crossbeam_channel::{Sender, TryRecvError};
use crate::{
core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
db::{
find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_locations,
parse_satpoint_to_watch, remove_entries_from_locations_at_block_height,
},
};
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::open_readwrite_hord_db_conn,
};
pub fn start_transfers_recomputing_processor(
config: &Config,
ctx: &Context,
post_processor: Option<Sender<BitcoinBlockData>>,
) -> PostProcessorController {
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();
let config = config.clone();
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Inscription indexing runloop")
.spawn(move || {
let mut inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap();
let mut empty_cycles = 0;
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start transfers recomputing runloop");
}
loop {
let mut blocks = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(_, blocks)) => {
empty_cycles = 0;
blocks
}
Ok(PostProcessorCommand::Terminate) => break,
Ok(PostProcessorCommand::Start) => continue,
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 10 {
empty_cycles = 0;
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
}
sleep(Duration::from_secs(1));
continue;
}
_ => {
break;
}
},
};
info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());
for block in blocks.iter_mut() {
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
};
info!(
ctx.expect_logger(),
"Cleaning transfers from block {}", block.block_identifier.index
);
let inscriptions = find_all_inscriptions_in_block(
&block.block_identifier.index,
&inscriptions_db_conn_rw,
&ctx,
);
info!(
ctx.expect_logger(),
"{} inscriptions retrieved at block {}",
inscriptions.len(),
block.block_identifier.index
);
let mut operations = BTreeMap::new();
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
remove_entries_from_locations_at_block_height(
&block.block_identifier.index,
&transaction,
&ctx,
);
for (_, entry) in inscriptions.iter() {
let inscription_id = entry.get_inscription_id();
info!(
ctx.expect_logger(),
"Processing inscription {}", inscription_id
);
insert_entry_in_locations(
&inscription_id,
block.block_identifier.index,
&entry.transfer_data,
&transaction,
&ctx,
);
operations.insert(
entry.transaction_identifier_inscription.clone(),
OrdinalInscriptionTransferData {
inscription_id: entry.get_inscription_id(),
updated_address: None,
satpoint_pre_transfer: format_satpoint_to_watch(
&entry.transaction_identifier_inscription,
entry.inscription_input_index,
0,
),
satpoint_post_transfer: format_satpoint_to_watch(
&entry.transfer_data.transaction_identifier_location,
entry.transfer_data.output_index,
entry.transfer_data.inscription_offset_intra_output,
),
post_transfer_output_value: None,
tx_index: 0,
},
);
}
info!(
ctx.expect_logger(),
"Rewriting transfers for block {}", block.block_identifier.index
);
for tx in block.transactions.iter_mut() {
tx.metadata.ordinal_operations.clear();
if let Some(mut entry) = operations.remove(&tx.transaction_identifier) {
let (_, output_index, _) =
parse_satpoint_to_watch(&entry.satpoint_post_transfer);
let script_pub_key_hex =
tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match Script::from_hex(&script_pub_key_hex) {
Ok(script) => {
match Address::from_script(&script, network.clone()) {
Ok(address) => Some(address.to_string()),
Err(_e) => None,
}
}
Err(_e) => None,
};
entry.updated_address = updated_address;
entry.post_transfer_output_value =
Some(tx.metadata.outputs[output_index].value);
tx.metadata
.ordinal_operations
.push(OrdinalOperation::InscriptionTransferred(entry));
}
}
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
block,
&transaction,
&ctx,
)
.unwrap();
info!(
ctx.expect_logger(),
"Saving supdates for block {}", block.block_identifier.index
);
transaction.commit().unwrap();
info!(
ctx.expect_logger(),
"Transfers in block {} repaired", block.block_identifier.index
);
if let Some(ref post_processor) = post_processor {
let _ = post_processor.send(block.clone());
}
}
}
})
.expect("unable to spawn thread");
PostProcessorController {
commands_tx,
events_rx,
thread_handle: handle,
}
}

View File

@@ -76,22 +76,6 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
)
});
} else {
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS locations (
inscription_id TEXT NOT NULL,
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
outpoint_to_watch TEXT NOT NULL,
offset INTEGER NOT NULL,
UNIQUE(outpoint_to_watch,offset)
)",
[],
) {
ctx.try_log(|logger| {
warn!(logger, "Unable to create table locations:{}", e.to_string())
});
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_ordinal_number ON inscriptions(ordinal_number);",
[],
@@ -110,7 +94,22 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
}
}
}
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS locations (
inscription_id TEXT NOT NULL,
block_height INTEGER NOT NULL,
tx_index INTEGER NOT NULL,
outpoint_to_watch TEXT NOT NULL,
offset INTEGER NOT NULL
)",
[],
) {
ctx.try_log(|logger| {
warn!(logger, "Unable to create table locations: {}", e.to_string())
});
} else {
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_locations_on_block_height ON locations(block_height);",
[],
@@ -128,8 +127,9 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection {
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
}
}
}
conn
}

View File

@@ -133,10 +133,7 @@ fn handle_create_predicate(
let predicate_uuid = predicate.get_uuid().to_string();
if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) {
let key: String = format!(
"{}",
ChainhookSpecification::bitcoin_key(&predicate_uuid)
);
let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid));
match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) {
Ok(Some(_)) => {
return Json(json!({
@@ -172,10 +169,7 @@ fn handle_get_predicate(
match open_readwrite_predicates_db_conn(api_config) {
Ok(mut predicates_db_conn) => {
let key: String = format!(
"{}",
ChainhookSpecification::bitcoin_key(&predicate_uuid)
);
let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid));
let entry = match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) {
Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({
"chain": "stacks",

View File

@@ -5,6 +5,7 @@ use crate::cli::fetch_and_standardize_block;
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::core::pipeline::processors::inscription_indexing::process_blocks;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor;
use crate::core::pipeline::{download_and_pipeline_blocks, PostProcessorCommand};
use crate::core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx;
use crate::core::{
@@ -12,10 +13,10 @@ use crate::core::{
revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db,
};
use crate::db::{
find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_blocks,
insert_entry_in_locations, open_readwrite_hord_db_conn, open_readwrite_hord_dbs,
parse_satpoint_to_watch, remove_entries_from_locations_at_block_height, InscriptionHeigthHint,
LazyBlock,
find_all_inscriptions_in_block, find_latest_inscription_block_height, format_satpoint_to_watch,
insert_entry_in_blocks, insert_entry_in_locations, open_readonly_hord_db_conn,
open_readwrite_hord_db_conn, open_readwrite_hord_dbs, parse_satpoint_to_watch,
remove_entries_from_locations_at_block_height, InscriptionHeigthHint, LazyBlock, initialize_hord_db,
};
use crate::scan::bitcoin::process_block_with_predicates;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
@@ -36,6 +37,7 @@ use chainhook_sdk::types::{
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation,
};
use chainhook_sdk::utils::Context;
use crossbeam_channel::unbounded;
use redis::{Commands, Connection};
use std::collections::BTreeMap;
@@ -64,7 +66,9 @@ impl Service {
let hord_config = self.config.get_hord_config();
// Sleep
// std::thread::sleep(std::time::Duration::from_secs(180));
std::thread::sleep(std::time::Duration::from_secs(600));
let _ = initialize_hord_db(&self.config.expected_cache_path(), &self.ctx);
// Force rebuild
// {
@@ -84,72 +88,48 @@ impl Service {
// )?;
// }
// download_and_pipeline_blocks(&self.config, 767400, 767429, 767400, None, &self.ctx).await?;
let (tx_replayer, rx_replayer) = unbounded();
let mut moved_event_observer_config = event_observer_config.clone();
let moved_ctx = self.ctx.clone();
// Catch-up with chain tip
{
// Start predicate processor
let (tx_replayer, rx_replayer) = channel();
let blocks_post_processor =
start_inscription_indexing_processor(&self.config, &self.ctx, Some(tx_replayer));
let mut moved_event_observer_config = event_observer_config.clone();
let moved_ctx = self.ctx.clone();
let _ = hiro_system_kit::thread_named("Initial predicate processing")
.spawn(move || {
if let Some(mut chainhook_config) =
moved_event_observer_config.chainhook_config.take()
{
let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> =
vec![];
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() {
bitcoin_predicates_ref.push(bitcoin_predicate);
}
while let Ok(block) = rx_replayer.recv() {
let future = process_block_with_predicates(
block,
&bitcoin_predicates_ref,
&moved_event_observer_config,
&moved_ctx,
);
let res = hiro_system_kit::nestable_block_on(future);
if let Err(_) = res {
error!(moved_ctx.expect_logger(), "Initial ingestion failing");
}
let _ = hiro_system_kit::thread_named("Initial predicate processing")
.spawn(move || {
if let Some(mut chainhook_config) =
moved_event_observer_config.chainhook_config.take()
{
let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> = vec![];
for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() {
bitcoin_predicates_ref.push(bitcoin_predicate);
}
while let Ok(block) = rx_replayer.recv() {
let future = process_block_with_predicates(
block,
&bitcoin_predicates_ref,
&moved_event_observer_config,
&moved_ctx,
);
let res = hiro_system_kit::nestable_block_on(future);
if let Err(_) = res {
error!(moved_ctx.expect_logger(), "Initial ingestion failing");
}
}
})
.expect("unable to spawn thread");
}
})
.expect("unable to spawn thread");
while let Some((start_block, end_block, speed)) =
should_sync_hord_db(&self.config, &self.ctx)?
{
info!(
self.ctx.expect_logger(),
"Indexing inscriptions from block #{start_block} to block #{end_block}"
);
let hord_config = self.config.get_hord_config();
download_and_pipeline_blocks(
&self.config,
start_block,
end_block,
hord_config.first_inscription_height,
None,
Some(&blocks_post_processor),
speed,
&self.ctx,
)
.await?;
let tip = {
let inscriptions_db_conn =
open_readonly_hord_db_conn(&self.config.expected_cache_path(), &self.ctx)?;
match find_latest_inscription_block_height(&inscriptions_db_conn, &self.ctx)? {
Some(height) => height,
None => panic!(),
}
};
let _ = blocks_post_processor
.commands_tx
.send(PostProcessorCommand::Terminate);
}
self.replay_transfers(767430, tip, Some(tx_replayer.clone())).await?;
self.update_state(Some(tx_replayer.clone())).await?;
// Catch-up with chain tip
// Bitcoin scan operation threadpool
let (observer_command_tx, observer_command_rx) = channel();
@@ -420,165 +400,85 @@ impl Service {
Ok(())
}
pub fn replay_transfers(
pub async fn update_state(
&self,
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
) -> Result<(), String> {
// Start predicate processor
let blocks_post_processor =
start_transfers_recomputing_processor(&self.config, &self.ctx, block_post_processor);
while let Some((start_block, end_block, speed)) =
should_sync_hord_db(&self.config, &self.ctx)?
{
info!(
self.ctx.expect_logger(),
"Indexing inscriptions from block #{start_block} to block #{end_block}"
);
let hord_config = self.config.get_hord_config();
let first_inscription_height = hord_config.first_inscription_height;
download_and_pipeline_blocks(
&self.config,
start_block,
end_block,
first_inscription_height,
if end_block < first_inscription_height {
Some(&blocks_post_processor)
} else {
None
},
if end_block < first_inscription_height {
None
} else {
Some(&blocks_post_processor)
},
speed,
&self.ctx,
)
.await?;
}
let _ = blocks_post_processor
.commands_tx
.send(PostProcessorCommand::Terminate);
Ok(())
}
pub async fn replay_transfers(
&self,
start_block: u64,
end_block: u64,
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
) -> Result<(), String> {
info!(self.ctx.expect_logger(), "Transfers only");
// Start predicate processor
let blocks_post_processor =
start_transfers_recomputing_processor(&self.config, &self.ctx, block_post_processor);
let bitcoin_config = BitcoinConfig {
username: self.config.network.bitcoind_rpc_username.clone(),
password: self.config.network.bitcoind_rpc_password.clone(),
rpc_url: self.config.network.bitcoind_rpc_url.clone(),
network: self.config.network.bitcoin_network.clone(),
bitcoin_block_signaling: self.config.network.bitcoin_block_signaling.clone(),
};
let (tx, rx) = crossbeam_channel::bounded(100);
let moved_ctx = self.ctx.clone();
hiro_system_kit::thread_named("Block fetch")
.spawn(move || {
let http_client = build_http_client();
for cursor in start_block..=end_block {
info!(moved_ctx.expect_logger(), "Fetching block {}", cursor);
let future = fetch_and_standardize_block(
&http_client,
cursor,
&bitcoin_config,
&moved_ctx,
);
info!(
self.ctx.expect_logger(),
"Indexing inscriptions from block #{start_block} to block #{end_block}"
);
let block = hiro_system_kit::nestable_block_on(future).unwrap();
let hord_config = self.config.get_hord_config();
let first_inscription_height = hord_config.first_inscription_height;
download_and_pipeline_blocks(
&self.config,
start_block,
end_block,
first_inscription_height,
None,
Some(&blocks_post_processor),
100,
&self.ctx,
)
.await?;
let _ = tx.send(block);
}
})
.unwrap();
let _ = blocks_post_processor
.commands_tx
.send(PostProcessorCommand::Terminate);
let mut inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&self.config.expected_cache_path(), &self.ctx)?;
while let Ok(mut block) = rx.recv() {
let network = match block.metadata.network {
BitcoinNetwork::Mainnet => Network::Bitcoin,
BitcoinNetwork::Regtest => Network::Regtest,
BitcoinNetwork::Testnet => Network::Testnet,
};
info!(
self.ctx.expect_logger(),
"Cleaning transfers from block {}", block.block_identifier.index
);
let inscriptions = find_all_inscriptions_in_block(
&block.block_identifier.index,
&inscriptions_db_conn_rw,
&self.ctx,
);
info!(
self.ctx.expect_logger(),
"{} inscriptions retrieved at block {}",
inscriptions.len(),
block.block_identifier.index
);
let mut operations = BTreeMap::new();
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
remove_entries_from_locations_at_block_height(
&block.block_identifier.index,
&transaction,
&self.ctx,
);
for (_, entry) in inscriptions.iter() {
let inscription_id = entry.get_inscription_id();
info!(
self.ctx.expect_logger(),
"Processing inscription {}", inscription_id
);
insert_entry_in_locations(
&inscription_id,
block.block_identifier.index,
&entry.transfer_data,
&transaction,
&self.ctx,
);
operations.insert(
entry.transaction_identifier_inscription.clone(),
OrdinalInscriptionTransferData {
inscription_id: entry.get_inscription_id(),
updated_address: None,
satpoint_pre_transfer: format_satpoint_to_watch(
&entry.transaction_identifier_inscription,
entry.inscription_input_index,
0,
),
satpoint_post_transfer: format_satpoint_to_watch(
&entry.transfer_data.transaction_identifier_location,
entry.transfer_data.output_index,
entry.transfer_data.inscription_offset_intra_output,
),
post_transfer_output_value: None,
tx_index: 0,
},
);
}
info!(
self.ctx.expect_logger(),
"Rewriting transfers for block {}", block.block_identifier.index
);
for tx in block.transactions.iter_mut() {
tx.metadata.ordinal_operations.clear();
if let Some(mut entry) = operations.remove(&tx.transaction_identifier) {
let (_, output_index, _) =
parse_satpoint_to_watch(&entry.satpoint_post_transfer);
let script_pub_key_hex =
tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match Script::from_hex(&script_pub_key_hex) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(address) => Some(address.to_string()),
Err(_e) => None,
},
Err(_e) => None,
};
entry.updated_address = updated_address;
entry.post_transfer_output_value =
Some(tx.metadata.outputs[output_index].value);
tx.metadata
.ordinal_operations
.push(OrdinalOperation::InscriptionTransferred(entry));
}
}
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
&mut block,
&transaction,
&self.ctx,
)
.unwrap();
info!(
self.ctx.expect_logger(),
"Saving supdates for block {}", block.block_identifier.index
);
transaction.commit().unwrap();
info!(
self.ctx.expect_logger(),
"Transfers in block {} repaired", block.block_identifier.index
);
if let Some(ref tx) = block_post_processor {
let _ = tx.send(block);
}
}
Ok(())
}
}