mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 16:52:57 +08:00
chore: adding retry mechanism
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -24,3 +24,4 @@ components/chainhook-db/examples/arkadiko-data-indexing/vault-monitor/tmp
|
||||
components/chainhook-db/examples/arkadiko-data-indexing/vault-monitor/vendor
|
||||
components/chainhook-cli/cache
|
||||
components/chainhook-cli/index.redb
|
||||
cache/
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -440,6 +440,7 @@ dependencies = [
|
||||
"hex",
|
||||
"hiro-system-kit",
|
||||
"num_cpus",
|
||||
"rand 0.8.5",
|
||||
"redis",
|
||||
"reqwest",
|
||||
"rusqlite",
|
||||
|
||||
@@ -15,6 +15,7 @@ redis = "0.21.5"
|
||||
serde-redis = "0.12.0"
|
||||
hex = "0.4.3"
|
||||
ciborium = "0.2.0"
|
||||
rand = "0.8.5"
|
||||
# tikv-client = { git = "https://github.com/tikv/client-rust.git", rev = "8f54e6114227718e256027df2577bbacdf425f86" }
|
||||
# raft-proto = { git = "https://github.com/tikv/raft-rs", rev="f73766712a538c2f6eb135b455297ad6c03fc58d", version = "0.7.0"}
|
||||
chainhook-event-observer = { version = "=1.0.4", default-features = false, features = ["ordinals"], path = "../chainhook-event-observer" }
|
||||
|
||||
@@ -2,7 +2,8 @@ use crate::block::DigestingCommand;
|
||||
use crate::config::Config;
|
||||
use crate::node::Node;
|
||||
use crate::scan::bitcoin::{
|
||||
scan_bitcoin_chain_with_predicate, retrieve_satoshi_point_using_local_storage,
|
||||
build_bitcoin_traversal_local_storage, retrieve_satoshi_point_using_local_storage,
|
||||
scan_bitcoin_chain_with_predicate,
|
||||
};
|
||||
use crate::scan::stacks::scan_stacks_chain_with_predicate;
|
||||
|
||||
@@ -148,6 +149,9 @@ enum DbCommand {
|
||||
/// Dump DB storage
|
||||
#[clap(name = "dump", bin_name = "dump")]
|
||||
Dump(DumpDbCommand),
|
||||
/// Dump DB storage
|
||||
#[clap(name = "build", bin_name = "build")]
|
||||
Build(BuildDbCommand),
|
||||
}
|
||||
|
||||
#[derive(Parser, PartialEq, Clone, Debug)]
|
||||
@@ -222,6 +226,39 @@ struct DumpDbCommand {
|
||||
pub config_path: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Parser, PartialEq, Clone, Debug)]
|
||||
struct BuildDbCommand {
|
||||
/// Target Devnet network
|
||||
#[clap(
|
||||
long = "devnet",
|
||||
conflicts_with = "testnet",
|
||||
conflicts_with = "mainnet"
|
||||
)]
|
||||
pub devnet: bool,
|
||||
/// Target Testnet network
|
||||
#[clap(
|
||||
long = "testnet",
|
||||
conflicts_with = "devnet",
|
||||
conflicts_with = "mainnet"
|
||||
)]
|
||||
pub testnet: bool,
|
||||
/// Target Mainnet network
|
||||
#[clap(
|
||||
long = "mainnet",
|
||||
conflicts_with = "testnet",
|
||||
conflicts_with = "devnet"
|
||||
)]
|
||||
pub mainnet: bool,
|
||||
/// Load config file path
|
||||
#[clap(
|
||||
long = "config-path",
|
||||
conflicts_with = "mainnet",
|
||||
conflicts_with = "testnet",
|
||||
conflicts_with = "devnet"
|
||||
)]
|
||||
pub config_path: Option<String>,
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
let logger = hiro_system_kit::log::setup_logger();
|
||||
let _guard = hiro_system_kit::log::setup_global_logger(logger.clone());
|
||||
@@ -290,13 +327,24 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
OrdinalsCommand::Satoshi(cmd) => {
|
||||
let config =
|
||||
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
|
||||
let txid = TransactionIdentifier { hash: cmd.txid.clone() };
|
||||
let block_id = BlockIdentifier { hash: "".into(), index: cmd.block_height };
|
||||
retrieve_satoshi_point_using_local_storage(&config, &block_id, &txid)
|
||||
.await?;
|
||||
let txid = TransactionIdentifier {
|
||||
hash: cmd.txid.clone(),
|
||||
};
|
||||
let block_id = BlockIdentifier {
|
||||
hash: "".into(),
|
||||
index: cmd.block_height,
|
||||
};
|
||||
retrieve_satoshi_point_using_local_storage(&config, &block_id, &txid, &ctx).await?;
|
||||
}
|
||||
},
|
||||
Command::Db(subcmd) => match subcmd {
|
||||
DbCommand::Build(cmd) => {
|
||||
let config =
|
||||
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
|
||||
|
||||
build_bitcoin_traversal_local_storage(config.clone(), 0, 300000, &ctx, 1).await?;
|
||||
// build_bitcoin_traversal_local_storage(config, 500001, 700000, &ctx, 4).await?;
|
||||
}
|
||||
DbCommand::Dump(cmd) => {
|
||||
let config =
|
||||
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
|
||||
|
||||
@@ -466,7 +466,6 @@ impl Node {
|
||||
|
||||
let block = indexer::bitcoin::standardize_bitcoin_block(
|
||||
&self.config.network,
|
||||
cursor,
|
||||
raw_block,
|
||||
&mut bitcoin_context,
|
||||
&self.ctx,
|
||||
|
||||
@@ -14,7 +14,8 @@ use chainhook_event_observer::chainhooks::types::{
|
||||
Protocols,
|
||||
};
|
||||
use chainhook_event_observer::indexer::bitcoin::{
|
||||
BitcoinBlockFullBreakdown, BitcoinTransactionOutputFullBreakdown,
|
||||
retrieve_full_block_breakdown_with_retry, BitcoinBlockFullBreakdown,
|
||||
BitcoinTransactionOutputFullBreakdown,
|
||||
};
|
||||
use chainhook_event_observer::indexer::ordinals::indexing::entry::Entry;
|
||||
use chainhook_event_observer::indexer::ordinals::indexing::{
|
||||
@@ -26,22 +27,23 @@ use chainhook_event_observer::indexer::ordinals::inscription_id::InscriptionId;
|
||||
use chainhook_event_observer::indexer::ordinals::sat_point::SatPoint;
|
||||
use chainhook_event_observer::indexer::{self, BitcoinChainContext};
|
||||
use chainhook_event_observer::observer::{
|
||||
EventObserverConfig, DEFAULT_CONTROL_PORT, DEFAULT_INGESTION_PORT,
|
||||
BitcoinConfig, EventObserverConfig, DEFAULT_CONTROL_PORT, DEFAULT_INGESTION_PORT,
|
||||
};
|
||||
use chainhook_event_observer::redb::ReadableTable;
|
||||
use chainhook_event_observer::utils::{file_append, send_request, Context};
|
||||
use chainhook_types::{
|
||||
BitcoinTransactionData, OrdinalInscriptionRevealData, OrdinalOperation, TransactionIdentifier, BlockIdentifier,
|
||||
BitcoinTransactionData, BlockIdentifier, OrdinalInscriptionRevealData, OrdinalOperation,
|
||||
TransactionIdentifier,
|
||||
};
|
||||
use rand::prelude::*;
|
||||
use reqwest::Client as HttpClient;
|
||||
use rusqlite::{Connection, OpenFlags, Result, ToSql};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
|
||||
use std::io::Read;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::time::Duration;
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
pub fn initialize_bitcoin_block_traversal_cache(path: &PathBuf) -> Connection {
|
||||
pub fn initialize_ordinal_state_storage(path: &PathBuf, ctx: &Context) -> Connection {
|
||||
let conn = create_or_open_readwrite_db(path);
|
||||
if let Err(e) = conn.execute(
|
||||
"CREATE TABLE blocks (
|
||||
@@ -50,8 +52,20 @@ pub fn initialize_bitcoin_block_traversal_cache(path: &PathBuf) -> Connection {
|
||||
)",
|
||||
[],
|
||||
) {
|
||||
println!("Error: {}", e.to_string());
|
||||
error!(ctx.expect_logger(), "{}", e.to_string());
|
||||
}
|
||||
if let Err(e) = conn.execute(
|
||||
"CREATE TABLE inscriptions (
|
||||
inscription_id TEXT NOT NULL PRIMARY KEY,
|
||||
outpoint_to_watch TEXT NOT NULL,
|
||||
ancestors TEXT NOT NULL,
|
||||
descendants TEXT
|
||||
)",
|
||||
[],
|
||||
) {
|
||||
error!(ctx.expect_logger(), "{}", e.to_string());
|
||||
}
|
||||
|
||||
conn
|
||||
}
|
||||
|
||||
@@ -128,7 +142,7 @@ impl CompactedBlock {
|
||||
for tx in block.tx.iter().skip(1) {
|
||||
let mut inputs = vec![];
|
||||
for input in tx.vin.iter().skip(0) {
|
||||
let txin = hex::decode(tx.txid.to_string()).unwrap();
|
||||
let txin = hex::decode(input.txid.unwrap().to_string()).unwrap();
|
||||
|
||||
inputs.push((
|
||||
[txin[0], txin[1], txin[2], txin[3]],
|
||||
@@ -171,15 +185,33 @@ pub fn retrieve_compacted_block_from_index(
|
||||
let mut stmt = storage_conn
|
||||
.prepare("SELECT compacted_bytes FROM blocks WHERE id = ?1")
|
||||
.unwrap();
|
||||
let block_iter = stmt
|
||||
let result_iter = stmt
|
||||
.query_map(args, |row| {
|
||||
let hex_bytes: String = row.get(0).unwrap();
|
||||
Ok(CompactedBlock::from_hex_bytes(&hex_bytes))
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
for block in block_iter {
|
||||
return Some(block.unwrap());
|
||||
for result in result_iter {
|
||||
return Some(result.unwrap());
|
||||
}
|
||||
return None;
|
||||
}
|
||||
|
||||
pub fn scan_outpoints_to_watch_with_txin(txin: &str, storage_conn: &Connection) -> Option<String> {
|
||||
let args: &[&dyn ToSql] = &[&txin.to_sql().unwrap()];
|
||||
let mut stmt = storage_conn
|
||||
.prepare("SELECT inscription_id FROM inscriptions WHERE outpoint_to_watch = ?1")
|
||||
.unwrap();
|
||||
let result_iter = stmt
|
||||
.query_map(args, |row| {
|
||||
let inscription_id: String = row.get(0).unwrap();
|
||||
Ok(inscription_id)
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
for result in result_iter {
|
||||
return Some(result.unwrap());
|
||||
}
|
||||
return None;
|
||||
}
|
||||
@@ -188,16 +220,16 @@ pub fn write_compacted_block_to_index(
|
||||
block_id: u32,
|
||||
compacted_block: &CompactedBlock,
|
||||
storage_conn: &Connection,
|
||||
ctx: &Context,
|
||||
) {
|
||||
let serialized_compacted_block = compacted_block.to_hex_bytes();
|
||||
|
||||
if let Err(e) = storage_conn
|
||||
.execute(
|
||||
if let Err(e) = storage_conn.execute(
|
||||
"INSERT INTO blocks (id, compacted_bytes) VALUES (?1, ?2)",
|
||||
rusqlite::params![&block_id, &serialized_compacted_block],
|
||||
) {
|
||||
println!("Error: {}", e.to_string());
|
||||
}
|
||||
error!(ctx.expect_logger(), "Error: {}", e.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn scan_bitcoin_chain_with_predicate(
|
||||
@@ -343,17 +375,35 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
|
||||
let mut total_hits = vec![];
|
||||
|
||||
let (retrieve_ordinal_tx, retrieve_ordinal_rx) =
|
||||
channel::<std::option::Option<(BlockIdentifier, BitcoinTransactionData, Vec<HookAction>)>>();
|
||||
let (retrieve_ordinal_tx, retrieve_ordinal_rx) = channel::<
|
||||
std::option::Option<(BlockIdentifier, BitcoinTransactionData, Vec<HookAction>)>,
|
||||
>();
|
||||
let (process_ordinal_tx, process_ordinal_rx) = channel();
|
||||
let (cache_block_tx, cache_block_rx) = channel();
|
||||
|
||||
// use threadpool::ThreadPool;
|
||||
// use std::sync::mpsc::channel;
|
||||
// let n_workers = 4;
|
||||
// let pool = ThreadPool::new(n_workers);
|
||||
// let (tx, rx) = channel();
|
||||
|
||||
let _config = config.clone();
|
||||
let ctx_ = ctx.clone();
|
||||
let handle_1 = hiro_system_kit::thread_named("Ordinal retrieval")
|
||||
.spawn(move || {
|
||||
while let Ok(Some((block_identifier, mut transaction, actions))) = retrieve_ordinal_rx.recv() {
|
||||
println!("Retrieving satoshi point for {}", transaction.transaction_identifier.hash);
|
||||
let f = retrieve_satoshi_point_using_local_storage(&_config, &block_identifier, &transaction.transaction_identifier);
|
||||
while let Ok(Some((block_identifier, mut transaction, actions))) =
|
||||
retrieve_ordinal_rx.recv()
|
||||
{
|
||||
info!(
|
||||
ctx_.expect_logger(),
|
||||
"Retrieving satoshi point for {}", transaction.transaction_identifier.hash
|
||||
);
|
||||
let f = retrieve_satoshi_point_using_local_storage(
|
||||
&_config,
|
||||
&block_identifier,
|
||||
&transaction.transaction_identifier,
|
||||
&ctx_,
|
||||
);
|
||||
let (block_number, block_offset) = match hiro_system_kit::nestable_block_on(f) {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
@@ -394,16 +444,27 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
let db_file = config.get_bitcoin_block_traversal_db_path();
|
||||
let handle_3 = hiro_system_kit::thread_named("Ordinal ingestion")
|
||||
.spawn(move || {
|
||||
let conn = initialize_bitcoin_block_traversal_cache(&db_file);
|
||||
let conn = initialize_ordinal_state_storage(&db_file, &ctx_);
|
||||
while let Ok(Some((height, compacted_block))) = cache_block_rx.recv() {
|
||||
info!(ctx_.expect_logger(), "Caching block #{height}");
|
||||
write_compacted_block_to_index(height, &compacted_block, &conn);
|
||||
write_compacted_block_to_index(height, &compacted_block, &conn, &ctx_);
|
||||
}
|
||||
})
|
||||
.expect("unable to detach thread");
|
||||
|
||||
let mut pipeline_started = false;
|
||||
|
||||
use std::sync::mpsc::channel;
|
||||
use threadpool::ThreadPool;
|
||||
let n_workers = 4;
|
||||
let pool = ThreadPool::new(n_workers);
|
||||
|
||||
let bitcoin_config = BitcoinConfig {
|
||||
username: config.network.bitcoin_node_rpc_username.clone(),
|
||||
password: config.network.bitcoin_node_rpc_password.clone(),
|
||||
rpc_url: config.network.bitcoin_node_rpc_url.clone(),
|
||||
};
|
||||
|
||||
for cursor in start_block..=end_block {
|
||||
debug!(
|
||||
ctx.expect_logger(),
|
||||
@@ -427,15 +488,8 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
}
|
||||
};
|
||||
|
||||
let block_breakdown = loop {
|
||||
match retrieve_block_full_breakdown(config, &block_hash).await {
|
||||
Ok(res) => break res,
|
||||
Err(e) => {
|
||||
error!(ctx.expect_logger(), "Error retrieving block {}", cursor,);
|
||||
}
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_millis(3000));
|
||||
};
|
||||
let block_breakdown =
|
||||
retrieve_full_block_breakdown_with_retry(&bitcoin_config, &block_hash, ctx).await?;
|
||||
|
||||
let _ = cache_block_tx.send(Some((
|
||||
block_breakdown.height as u32,
|
||||
@@ -444,7 +498,6 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
|
||||
let block = indexer::bitcoin::standardize_bitcoin_block(
|
||||
&config.network,
|
||||
cursor,
|
||||
block_breakdown,
|
||||
&mut bitcoin_context,
|
||||
ctx,
|
||||
@@ -462,8 +515,11 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
);
|
||||
if is_scanning_inscriptions {
|
||||
pipeline_started = true;
|
||||
let _ = retrieve_ordinal_tx
|
||||
.send(Some((block.block_identifier.clone(), tx.clone(), vec![predicate_spec.action.clone()])));
|
||||
let _ = retrieve_ordinal_tx.send(Some((
|
||||
block.block_identifier.clone(),
|
||||
tx.clone(),
|
||||
vec![predicate_spec.action.clone()],
|
||||
)));
|
||||
} else {
|
||||
hits.push(tx);
|
||||
}
|
||||
@@ -512,41 +568,6 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn retrieve_block_full_breakdown(
|
||||
config: &Config,
|
||||
block_hash: &str,
|
||||
) -> Result<BitcoinBlockFullBreakdown, String> {
|
||||
let body = json!({
|
||||
"jsonrpc": "1.0",
|
||||
"id": "chainhook-cli",
|
||||
"method": "getblock",
|
||||
"params": [block_hash, 3]
|
||||
});
|
||||
let http_client = HttpClient::builder()
|
||||
.timeout(Duration::from_secs(20))
|
||||
.build()
|
||||
.expect("Unable to build http client");
|
||||
let raw_block = http_client
|
||||
.post(&config.network.bitcoin_node_rpc_url)
|
||||
.basic_auth(
|
||||
&config.network.bitcoin_node_rpc_username,
|
||||
Some(&config.network.bitcoin_node_rpc_password),
|
||||
)
|
||||
.header("Content-Type", "application/json")
|
||||
.header("Host", &config.network.bitcoin_node_rpc_url[7..])
|
||||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("unable to send request ({})", e))?
|
||||
.json::<jsonrpc::Response>()
|
||||
.await
|
||||
.map_err(|e| format!("unable to parse response ({})", e))?
|
||||
.result::<indexer::bitcoin::BitcoinBlockFullBreakdown>()
|
||||
.map_err(|e| format!("unable to parse response ({})", e))?;
|
||||
|
||||
Ok(raw_block)
|
||||
}
|
||||
|
||||
pub async fn retrieve_block_hash(config: &Config, block_height: &u64) -> Result<String, String> {
|
||||
let body = json!({
|
||||
"jsonrpc": "1.0",
|
||||
@@ -579,10 +600,117 @@ pub async fn retrieve_block_hash(config: &Config, block_height: &u64) -> Result<
|
||||
Ok(block_hash)
|
||||
}
|
||||
|
||||
pub async fn build_bitcoin_traversal_local_storage(
|
||||
config: Config,
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
ctx: &Context,
|
||||
network_thread: usize,
|
||||
) -> Result<(), String> {
|
||||
let retrieve_block_hash_pool = ThreadPool::new(network_thread);
|
||||
let (block_hash_tx, block_hash_rx) = crossbeam_channel::unbounded();
|
||||
let retrieve_block_data_pool = ThreadPool::new(network_thread);
|
||||
let (block_data_tx, block_data_rx) = crossbeam_channel::unbounded();
|
||||
let compress_block_data_pool = ThreadPool::new(8);
|
||||
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::unbounded();
|
||||
|
||||
let bitcoin_config = BitcoinConfig {
|
||||
username: config.network.bitcoin_node_rpc_username.clone(),
|
||||
password: config.network.bitcoin_node_rpc_password.clone(),
|
||||
rpc_url: config.network.bitcoin_node_rpc_url.clone(),
|
||||
};
|
||||
|
||||
for block_cursor in start_block..end_block {
|
||||
let block_height = block_cursor.clone();
|
||||
let block_hash_tx = block_hash_tx.clone();
|
||||
let config = config.clone();
|
||||
retrieve_block_hash_pool.execute(move || {
|
||||
let mut err_count = 0;
|
||||
let mut rng = rand::thread_rng();
|
||||
loop {
|
||||
let future = retrieve_block_hash(&config, &block_height);
|
||||
match hiro_system_kit::nestable_block_on(future) {
|
||||
Ok(block_hash) => {
|
||||
err_count = 0;
|
||||
block_hash_tx.send(Some((block_cursor, block_hash)));
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
err_count += 1;
|
||||
let delay = (err_count + (rng.next_u64() % 3)) * 1000;
|
||||
println!("retry hash:fetch in {delay}");
|
||||
std::thread::sleep(std::time::Duration::from_millis(delay));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let db_file = config.get_bitcoin_block_traversal_db_path();
|
||||
let moved_ctx = ctx.clone();
|
||||
let handle = hiro_system_kit::thread_named("Block data retrieval").spawn(move || {
|
||||
while let Ok(Some((block_height, block_hash))) = block_hash_rx.recv() {
|
||||
println!("fetch {block_height}:{block_hash}");
|
||||
let moved_bitcoin_config = bitcoin_config.clone();
|
||||
let block_data_tx = block_data_tx.clone();
|
||||
let moved_ctx = moved_ctx.clone();
|
||||
retrieve_block_data_pool.execute(move || {
|
||||
let future = retrieve_full_block_breakdown_with_retry(
|
||||
&moved_bitcoin_config,
|
||||
&block_hash,
|
||||
&moved_ctx,
|
||||
);
|
||||
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
|
||||
block_data_tx.send(Some(block_data));
|
||||
});
|
||||
retrieve_block_data_pool.join()
|
||||
}
|
||||
});
|
||||
|
||||
let handle = hiro_system_kit::thread_named("Block data compression").spawn(move || {
|
||||
while let Ok(Some(block_data)) = block_data_rx.recv() {
|
||||
println!("store {}:{}", block_data.height, block_data.hash);
|
||||
let block_compressed_tx = block_compressed_tx.clone();
|
||||
compress_block_data_pool.execute(move || {
|
||||
let compressed_block = CompactedBlock::from_full_block(&block_data);
|
||||
block_compressed_tx.send(Some((block_data.height as u32, compressed_block)));
|
||||
});
|
||||
|
||||
compress_block_data_pool.join()
|
||||
}
|
||||
});
|
||||
|
||||
let conn = initialize_ordinal_state_storage(&db_file, &ctx);
|
||||
|
||||
while let Ok(Some((block_height, compacted_block))) = block_compressed_rx.recv() {
|
||||
info!(ctx.expect_logger(), "Storing block #{block_height}");
|
||||
write_compacted_block_to_index(block_height, &compacted_block, &conn, &ctx);
|
||||
}
|
||||
|
||||
retrieve_block_hash_pool.join();
|
||||
|
||||
// Pool of threads fetching block hash
|
||||
// In: block numbers
|
||||
// Out: block hash
|
||||
|
||||
// Pool of threads fetching full blocks
|
||||
// In: block hash
|
||||
// Out: full block
|
||||
|
||||
// Pool of thread compressing full blocks
|
||||
// In: full block
|
||||
// Out: compacted block
|
||||
|
||||
// Receive Compacted block, storing on disc
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn retrieve_satoshi_point_using_local_storage(
|
||||
config: &Config,
|
||||
block_identifier: &BlockIdentifier,
|
||||
transaction_identifier: &TransactionIdentifier,
|
||||
ctx: &Context,
|
||||
) -> Result<(u64, u64), String> {
|
||||
let path = config.get_bitcoin_block_traversal_db_path();
|
||||
let storage_conn = open_existing_readonly_db(&path);
|
||||
@@ -598,6 +726,14 @@ pub async fn retrieve_satoshi_point_using_local_storage(
|
||||
loop {
|
||||
let res = retrieve_compacted_block_from_index(ordinal_block_number, &storage_conn).unwrap();
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"{ordinal_block_number}:{:?}:{:?}",
|
||||
hex::encode(&res.0 .0 .0),
|
||||
hex::encode(txid)
|
||||
);
|
||||
std::thread::sleep(std::time::Duration::from_millis(300));
|
||||
|
||||
// evaluate exit condition: did we reach a coinbase transaction?
|
||||
let coinbase_txid = &res.0 .0 .0;
|
||||
if coinbase_txid.eq(&tx_cursor.0) {
|
||||
@@ -647,6 +783,8 @@ pub async fn retrieve_satoshi_point_using_local_storage(
|
||||
continue;
|
||||
}
|
||||
|
||||
info!(ctx.expect_logger(), "Evaluating {}", hex::encode(&txid));
|
||||
|
||||
let mut sats_out = 0;
|
||||
for (index, output_value) in outputs.iter().enumerate() {
|
||||
if index == tx_cursor.1 {
|
||||
@@ -662,7 +800,14 @@ pub async fn retrieve_satoshi_point_using_local_storage(
|
||||
if sats_in >= sats_out {
|
||||
ordinal_offset = sats_out - (sats_in - txin_value);
|
||||
ordinal_block_number = block_height;
|
||||
// println!("{h}: {blockhash} -> {} [in:{} , out: {}] {}/{vout} (input #{in_index}) {compounded_offset}", transaction.txid, transaction.vin.len(), transaction.vout.len(), txid);
|
||||
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Block {ordinal_block_number} / Tx {} / [in:{sats_in}, out:{sats_out}]: {block_height} -> {ordinal_block_number}:{ordinal_offset} -> {}:{vout}",
|
||||
hex::encode(&txid),
|
||||
hex::encode(&txin),
|
||||
|
||||
);
|
||||
tx_cursor = (txin, vout as usize);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@ use chainhook_types::{
|
||||
OrdinalInscriptionRevealData, OrdinalOperation, PobBlockCommitmentData, PoxBlockCommitmentData,
|
||||
PoxReward, StacksBaseChainOperation, TransactionIdentifier, TransferSTXData,
|
||||
};
|
||||
use clarity_repl::clarity::util::hash::to_hex;
|
||||
use hiro_system_kit::slog;
|
||||
use rocket::serde::json::Value as JsonValue;
|
||||
use serde::Deserialize;
|
||||
@@ -138,15 +137,33 @@ pub struct RewardParticipant {
|
||||
amt: u64,
|
||||
}
|
||||
|
||||
pub async fn retrieve_full_block_breakdown(
|
||||
pub async fn retrieve_full_block_breakdown_with_retry(
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
marshalled_block: JsonValue,
|
||||
_ctx: &Context,
|
||||
) -> Result<(u64, BitcoinBlockFullBreakdown), String> {
|
||||
let partial_block: NewBitcoinBlock = serde_json::from_value(marshalled_block)
|
||||
.map_err(|e| format!("unable for parse bitcoin block: {}", e.to_string()))?;
|
||||
let block_hash = partial_block.burn_block_hash.strip_prefix("0x").unwrap();
|
||||
block_hash: &str,
|
||||
ctx: &Context,
|
||||
) -> Result<BitcoinBlockFullBreakdown, String> {
|
||||
let mut errors_count = 0;
|
||||
let block = loop {
|
||||
match retrieve_full_block_breakdown(bitcoin_config, block_hash, ctx).await {
|
||||
Ok(result) => break result,
|
||||
Err(e) => {
|
||||
errors_count += 1;
|
||||
error!(
|
||||
"unable to retrieve block #{block_hash} (attempt #{errors_count}): {}",
|
||||
e.to_string()
|
||||
);
|
||||
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
async fn retrieve_full_block_breakdown(
|
||||
bitcoin_config: &BitcoinConfig,
|
||||
block_hash: &str,
|
||||
_ctx: &Context,
|
||||
) -> Result<BitcoinBlockFullBreakdown, String> {
|
||||
use reqwest::Client as HttpClient;
|
||||
let body = json!({
|
||||
"jsonrpc": "1.0",
|
||||
@@ -173,19 +190,17 @@ pub async fn retrieve_full_block_breakdown(
|
||||
.result::<BitcoinBlockFullBreakdown>()
|
||||
.map_err(|e| format!("unable to parse response ({})", e))?;
|
||||
|
||||
let block_height = partial_block.burn_block_height;
|
||||
Ok((block_height, block))
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
pub fn standardize_bitcoin_block(
|
||||
indexer_config: &IndexerConfig,
|
||||
block_height: u64,
|
||||
block: BitcoinBlockFullBreakdown,
|
||||
bitcoin_context: &mut BitcoinChainContext,
|
||||
ctx: &Context,
|
||||
) -> Result<BitcoinBlockData, String> {
|
||||
let mut transactions = vec![];
|
||||
|
||||
let block_height = block.height as u64;
|
||||
let expected_magic_bytes = get_stacks_canonical_magic_bytes(&indexer_config.bitcoin_network);
|
||||
let pox_config = get_canonical_pox_config(&indexer_config.bitcoin_network);
|
||||
|
||||
@@ -227,7 +242,7 @@ pub fn standardize_bitcoin_block(
|
||||
},
|
||||
script_sig: format!(
|
||||
"0x{}",
|
||||
to_hex(&input.script_sig.expect("not provided for coinbase txs").hex)
|
||||
hex::encode(&input.script_sig.expect("not provided for coinbase txs").hex)
|
||||
),
|
||||
sequence: input.sequence,
|
||||
witness: input
|
||||
@@ -235,7 +250,7 @@ pub fn standardize_bitcoin_block(
|
||||
.unwrap_or(vec![])
|
||||
.to_vec()
|
||||
.iter()
|
||||
.map(|w| format!("0x{}", to_hex(w)))
|
||||
.map(|w| format!("0x{}", hex::encode(w)))
|
||||
.collect::<Vec<_>>(),
|
||||
})
|
||||
}
|
||||
@@ -244,7 +259,7 @@ pub fn standardize_bitcoin_block(
|
||||
for output in tx.vout.drain(..) {
|
||||
outputs.push(TxOut {
|
||||
value: output.value.to_sat(),
|
||||
script_pubkey: format!("0x{}", to_hex(&output.script_pub_key.hex)),
|
||||
script_pubkey: format!("0x{}", hex::encode(&output.script_pub_key.hex)),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -311,7 +326,7 @@ fn try_parse_ordinal_operation(
|
||||
return Some(OrdinalOperation::InscriptionRevealed(
|
||||
OrdinalInscriptionRevealData {
|
||||
content_type: inscription.content_type().unwrap_or("unknown").to_string(),
|
||||
content_bytes: format!("0x{}", to_hex(&inscription_content_bytes)),
|
||||
content_bytes: format!("0x{}", hex::encode(&inscription_content_bytes)),
|
||||
content_length: inscription_content_bytes.len(),
|
||||
inscription_id: inscription_id.to_string(),
|
||||
inscription_authors: vec![],
|
||||
@@ -390,7 +405,7 @@ fn try_parse_stacks_operation(
|
||||
let mut rewards = vec![];
|
||||
for output in outputs[1..pox_config.rewarded_addresses_per_block].into_iter() {
|
||||
rewards.push(PoxReward {
|
||||
recipient: format!("0x{}", to_hex(&output.script_pub_key.hex)),
|
||||
recipient: format!("0x{}", hex::encode(&output.script_pub_key.hex)),
|
||||
amount: output.value.to_sat(),
|
||||
});
|
||||
}
|
||||
@@ -422,7 +437,7 @@ fn try_parse_block_commit_op(bytes: &[u8]) -> Option<BlockCommitmentData> {
|
||||
}
|
||||
|
||||
Some(BlockCommitmentData {
|
||||
stacks_block_hash: format!("0x{}", to_hex(&bytes[0..32])),
|
||||
stacks_block_hash: format!("0x{}", hex::encode(&bytes[0..32])),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -83,13 +83,11 @@ impl Indexer {
|
||||
|
||||
pub fn handle_bitcoin_block(
|
||||
&mut self,
|
||||
block_height: u64,
|
||||
block: BitcoinBlockFullBreakdown,
|
||||
ctx: &Context,
|
||||
) -> Result<Option<BitcoinChainEvent>, String> {
|
||||
let block = bitcoin::standardize_bitcoin_block(
|
||||
&self.config,
|
||||
block_height,
|
||||
block,
|
||||
&mut self.bitcoin_context,
|
||||
ctx,
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::chainhooks::stacks::{
|
||||
StacksChainhookOccurrence, StacksChainhookOccurrencePayload,
|
||||
};
|
||||
use crate::chainhooks::types::{ChainhookConfig, ChainhookSpecification};
|
||||
use crate::indexer::bitcoin::retrieve_full_block_breakdown;
|
||||
use crate::indexer::bitcoin::{retrieve_full_block_breakdown_with_retry, NewBitcoinBlock};
|
||||
use crate::indexer::ordinals::{indexing::updater::OrdinalIndexUpdater, initialize_ordinal_index};
|
||||
use crate::indexer::{self, Indexer, IndexerConfig};
|
||||
use crate::utils::{send_request, Context};
|
||||
@@ -948,11 +948,11 @@ pub fn handle_ping(ctx: &State<Context>) -> Json<JsonValue> {
|
||||
}
|
||||
|
||||
#[openapi(skip)]
|
||||
#[post("/new_burn_block", format = "json", data = "<marshalled_block>")]
|
||||
#[post("/new_burn_block", format = "json", data = "<bitcoin_block>")]
|
||||
pub async fn handle_new_bitcoin_block(
|
||||
indexer_rw_lock: &State<Arc<RwLock<Indexer>>>,
|
||||
bitcoin_config: &State<BitcoinConfig>,
|
||||
marshalled_block: Json<JsonValue>,
|
||||
bitcoin_block: Json<NewBitcoinBlock>,
|
||||
background_job_tx: &State<Arc<Mutex<Sender<ObserverCommand>>>>,
|
||||
ctx: &State<Context>,
|
||||
) -> Json<JsonValue> {
|
||||
@@ -961,10 +961,9 @@ pub async fn handle_new_bitcoin_block(
|
||||
// kind of update that this new block would imply, taking
|
||||
// into account the last 7 blocks.
|
||||
|
||||
let (block_height, block) =
|
||||
match retrieve_full_block_breakdown(bitcoin_config, marshalled_block.into_inner(), ctx)
|
||||
.await
|
||||
{
|
||||
let block_hash = bitcoin_block.burn_block_hash.strip_prefix("0x").unwrap();
|
||||
let block =
|
||||
match retrieve_full_block_breakdown_with_retry(bitcoin_config, block_hash, ctx).await {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
@@ -982,7 +981,7 @@ pub async fn handle_new_bitcoin_block(
|
||||
};
|
||||
|
||||
let chain_update = match indexer_rw_lock.inner().write() {
|
||||
Ok(mut indexer) => indexer.handle_bitcoin_block(block_height, block, &ctx),
|
||||
Ok(mut indexer) => indexer.handle_bitcoin_block(block, &ctx),
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::warn!(
|
||||
|
||||
Reference in New Issue
Block a user