mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-15 00:49:30 +08:00
fix: async/await regression
This commit is contained in:
@@ -158,7 +158,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
};
|
||||
|
||||
let ordinal_index = initialize_ordinal_index(&event_observer_config).unwrap();
|
||||
match OrdinalIndexUpdater::update(&ordinal_index) {
|
||||
match OrdinalIndexUpdater::update(&ordinal_index).await {
|
||||
Ok(_r) => {}
|
||||
Err(e) => {}
|
||||
}
|
||||
@@ -171,7 +171,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
|
||||
raw_block,
|
||||
&mut bitcoin_context,
|
||||
ctx,
|
||||
)?;
|
||||
).await?;
|
||||
|
||||
let mut hits = vec![];
|
||||
for tx in block.transactions.iter() {
|
||||
|
||||
@@ -106,7 +106,7 @@ pub async fn retrieve_full_block(
|
||||
Ok((block_height, block))
|
||||
}
|
||||
|
||||
pub fn standardize_bitcoin_block(
|
||||
pub async fn standardize_bitcoin_block(
|
||||
indexer_config: &IndexerConfig,
|
||||
block_height: u64,
|
||||
block: Block,
|
||||
@@ -115,7 +115,7 @@ pub fn standardize_bitcoin_block(
|
||||
) -> Result<BitcoinBlockData, String> {
|
||||
let mut transactions = vec![];
|
||||
|
||||
match OrdinalIndexUpdater::update(&mut bitcoin_context.ordinal_index) {
|
||||
match OrdinalIndexUpdater::update(&mut bitcoin_context.ordinal_index).await {
|
||||
Ok(_) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::info!(
|
||||
|
||||
@@ -85,13 +85,13 @@ impl Indexer {
|
||||
block: Block,
|
||||
ctx: &Context,
|
||||
) -> Result<Option<BitcoinChainEvent>, String> {
|
||||
let block = bitcoin::standardize_bitcoin_block(
|
||||
let block = hiro_system_kit::nestable_block_on(bitcoin::standardize_bitcoin_block(
|
||||
&self.config,
|
||||
block_height,
|
||||
block,
|
||||
&mut self.bitcoin_context,
|
||||
ctx,
|
||||
)?;
|
||||
))?;
|
||||
let event = self.bitcoin_blocks_pool.process_block(block, ctx);
|
||||
event
|
||||
}
|
||||
|
||||
@@ -367,8 +367,8 @@ impl OrdinalIndex {
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
pub fn update(&self) -> Result {
|
||||
OrdinalIndexUpdater::update(self)
|
||||
pub async fn update(&self) -> Result {
|
||||
OrdinalIndexUpdater::update(self).await
|
||||
}
|
||||
|
||||
pub fn is_reorged(&self) -> bool {
|
||||
|
||||
@@ -50,7 +50,7 @@ pub struct OrdinalIndexUpdater {
|
||||
}
|
||||
|
||||
impl OrdinalIndexUpdater {
|
||||
pub fn update(index: &OrdinalIndex) -> Result {
|
||||
pub async fn update(index: &OrdinalIndex) -> Result {
|
||||
let wtx = index.begin_write()?;
|
||||
|
||||
let height = wtx
|
||||
@@ -80,10 +80,10 @@ impl OrdinalIndexUpdater {
|
||||
outputs_traversed: 0,
|
||||
};
|
||||
|
||||
updater.update_index(index, wtx)
|
||||
updater.update_index(index, wtx).await
|
||||
}
|
||||
|
||||
fn update_index<'index>(
|
||||
async fn update_index<'index>(
|
||||
&mut self,
|
||||
index: &'index OrdinalIndex,
|
||||
mut wtx: WriteTransaction<'index>,
|
||||
@@ -109,7 +109,7 @@ impl OrdinalIndexUpdater {
|
||||
&mut wtx,
|
||||
block,
|
||||
&mut value_cache,
|
||||
)?;
|
||||
).await?;
|
||||
|
||||
uncommitted += 1;
|
||||
|
||||
@@ -305,12 +305,12 @@ impl OrdinalIndexUpdater {
|
||||
Ok((outpoint_sender, value_receiver))
|
||||
}
|
||||
|
||||
fn index_block(
|
||||
async fn index_block(
|
||||
&mut self,
|
||||
index: &OrdinalIndex,
|
||||
outpoint_sender: &mut Sender<OutPoint>,
|
||||
value_receiver: &mut Receiver<u64>,
|
||||
wtx: &mut WriteTransaction,
|
||||
wtx: &mut WriteTransaction<'_>,
|
||||
block: BlockData,
|
||||
value_cache: &mut HashMap<OutPoint, u64>,
|
||||
) -> Result<()> {
|
||||
@@ -351,7 +351,7 @@ impl OrdinalIndexUpdater {
|
||||
continue;
|
||||
}
|
||||
// We don't know the value of this tx input. Send this outpoint to background thread to be fetched
|
||||
outpoint_sender.blocking_send(prev_output)?;
|
||||
let _ = outpoint_sender.send(prev_output).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -409,7 +409,7 @@ impl OrdinalIndexUpdater {
|
||||
)?;
|
||||
|
||||
for (tx, txid) in block.txdata.iter().skip(1).chain(block.txdata.first()) {
|
||||
lost_sats += inscription_updater.index_transaction_inscriptions(tx, *txid, None)?;
|
||||
lost_sats += inscription_updater.index_transaction_inscriptions(tx, *txid, None).await?;
|
||||
}
|
||||
|
||||
statistic_to_count.insert(&Statistic::LostSats.key(), &lost_sats)?;
|
||||
@@ -427,68 +427,68 @@ impl OrdinalIndexUpdater {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn index_transaction_sats(
|
||||
&mut self,
|
||||
tx: &Transaction,
|
||||
txid: Txid,
|
||||
sat_to_satpoint: &mut Table<u64, &SatPointValue>,
|
||||
input_sat_ranges: &mut VecDeque<(u64, u64)>,
|
||||
sat_ranges_written: &mut u64,
|
||||
outputs_traversed: &mut u64,
|
||||
inscription_updater: &mut InscriptionUpdater,
|
||||
) -> Result {
|
||||
inscription_updater.index_transaction_inscriptions(tx, txid, Some(input_sat_ranges))?;
|
||||
// fn index_transaction_sats(
|
||||
// &mut self,
|
||||
// tx: &Transaction,
|
||||
// txid: Txid,
|
||||
// sat_to_satpoint: &mut Table<u64, &SatPointValue>,
|
||||
// input_sat_ranges: &mut VecDeque<(u64, u64)>,
|
||||
// sat_ranges_written: &mut u64,
|
||||
// outputs_traversed: &mut u64,
|
||||
// inscription_updater: &mut InscriptionUpdater,
|
||||
// ) -> Result {
|
||||
// inscription_updater.index_transaction_inscriptions(tx, txid, Some(input_sat_ranges))?;
|
||||
|
||||
for (vout, output) in tx.output.iter().enumerate() {
|
||||
let outpoint = OutPoint {
|
||||
vout: vout.try_into().unwrap(),
|
||||
txid,
|
||||
};
|
||||
let mut sats = Vec::new();
|
||||
// for (vout, output) in tx.output.iter().enumerate() {
|
||||
// let outpoint = OutPoint {
|
||||
// vout: vout.try_into().unwrap(),
|
||||
// txid,
|
||||
// };
|
||||
// let mut sats = Vec::new();
|
||||
|
||||
let mut remaining = output.value;
|
||||
while remaining > 0 {
|
||||
let range = input_sat_ranges.pop_front().ok_or_else(|| {
|
||||
anyhow::anyhow!("insufficient inputs for transaction outputs")
|
||||
})?;
|
||||
// let mut remaining = output.value;
|
||||
// while remaining > 0 {
|
||||
// let range = input_sat_ranges.pop_front().ok_or_else(|| {
|
||||
// anyhow::anyhow!("insufficient inputs for transaction outputs")
|
||||
// })?;
|
||||
|
||||
if !Sat(range.0).is_common() {
|
||||
sat_to_satpoint.insert(
|
||||
&range.0,
|
||||
&SatPoint {
|
||||
outpoint,
|
||||
offset: output.value - remaining,
|
||||
}
|
||||
.store(),
|
||||
)?;
|
||||
}
|
||||
// if !Sat(range.0).is_common() {
|
||||
// sat_to_satpoint.insert(
|
||||
// &range.0,
|
||||
// &SatPoint {
|
||||
// outpoint,
|
||||
// offset: output.value - remaining,
|
||||
// }
|
||||
// .store(),
|
||||
// )?;
|
||||
// }
|
||||
|
||||
let count = range.1 - range.0;
|
||||
// let count = range.1 - range.0;
|
||||
|
||||
let assigned = if count > remaining {
|
||||
self.sat_ranges_since_flush += 1;
|
||||
let middle = range.0 + remaining;
|
||||
input_sat_ranges.push_front((middle, range.1));
|
||||
(range.0, middle)
|
||||
} else {
|
||||
range
|
||||
};
|
||||
// let assigned = if count > remaining {
|
||||
// self.sat_ranges_since_flush += 1;
|
||||
// let middle = range.0 + remaining;
|
||||
// input_sat_ranges.push_front((middle, range.1));
|
||||
// (range.0, middle)
|
||||
// } else {
|
||||
// range
|
||||
// };
|
||||
|
||||
sats.extend_from_slice(&assigned.store());
|
||||
// sats.extend_from_slice(&assigned.store());
|
||||
|
||||
remaining -= assigned.1 - assigned.0;
|
||||
// remaining -= assigned.1 - assigned.0;
|
||||
|
||||
*sat_ranges_written += 1;
|
||||
}
|
||||
// *sat_ranges_written += 1;
|
||||
// }
|
||||
|
||||
*outputs_traversed += 1;
|
||||
// *outputs_traversed += 1;
|
||||
|
||||
self.range_cache.insert(outpoint.store(), sats);
|
||||
self.outputs_inserted_since_flush += 1;
|
||||
}
|
||||
// self.range_cache.insert(outpoint.store(), sats);
|
||||
// self.outputs_inserted_since_flush += 1;
|
||||
// }
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
fn commit(&mut self, wtx: WriteTransaction, value_cache: HashMap<OutPoint, u64>) -> Result {
|
||||
println!(
|
||||
|
||||
@@ -81,7 +81,7 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> {
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn index_transaction_inscriptions(
|
||||
pub(super) async fn index_transaction_inscriptions(
|
||||
&mut self,
|
||||
tx: &Transaction,
|
||||
txid: Txid,
|
||||
@@ -114,7 +114,7 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> {
|
||||
{
|
||||
value.value()
|
||||
} else {
|
||||
self.value_receiver.blocking_recv().ok_or_else(|| {
|
||||
self.value_receiver.recv().await.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"failed to get transaction for {}",
|
||||
tx_in.previous_output.txid
|
||||
|
||||
@@ -1,180 +0,0 @@
|
||||
#[macro_use]
|
||||
extern crate rocket;
|
||||
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
|
||||
#[macro_use]
|
||||
extern crate serde_json;
|
||||
|
||||
#[macro_use]
|
||||
extern crate hiro_system_kit;
|
||||
|
||||
pub extern crate bitcoincore_rpc;
|
||||
|
||||
pub mod chainhooks;
|
||||
pub mod indexer;
|
||||
pub mod observer;
|
||||
pub mod utils;
|
||||
|
||||
use crate::utils::Context;
|
||||
use hiro_system_kit::log::setup_logger;
|
||||
use hiro_system_kit::slog;
|
||||
|
||||
use crate::chainhooks::types::ChainhookConfig;
|
||||
use clap::Parser;
|
||||
use ctrlc;
|
||||
use observer::{EventHandler, EventObserverConfig, ObserverCommand};
|
||||
use std::collections::HashSet;
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, Read};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
/// Simple program to greet a person
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author, version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// Path of the config to load
|
||||
#[clap(short, long)]
|
||||
config_path: Option<String>,
|
||||
}
|
||||
|
||||
#[rocket::main]
|
||||
async fn main() {
|
||||
let context = Context {
|
||||
logger: Some(setup_logger()),
|
||||
tracer: false,
|
||||
};
|
||||
|
||||
let args = Args::parse();
|
||||
let config_path = get_config_path_or_exit(&args.config_path, &context);
|
||||
let config = EventObserverConfig::from_path(&config_path, &context);
|
||||
let (command_tx, command_rx) = channel();
|
||||
let tx_terminator = command_tx.clone();
|
||||
|
||||
ctrlc::set_handler(move || {
|
||||
tx_terminator
|
||||
.send(ObserverCommand::Terminate)
|
||||
.expect("Could not send signal on channel.")
|
||||
})
|
||||
.expect("Error setting Ctrl-C handler");
|
||||
|
||||
let _ = observer::start_event_observer(config, command_tx, command_rx, None, context).await;
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub struct EventObserverConfigFile {
|
||||
pub normalization_enabled: Option<bool>,
|
||||
pub grpc_server_enabled: Option<bool>,
|
||||
pub hooks_enabled: Option<bool>,
|
||||
pub bitcoin_rpc_proxy_enabled: Option<bool>,
|
||||
pub webhooks: Option<Vec<String>>,
|
||||
pub ingestion_port: Option<u16>,
|
||||
pub control_port: Option<u16>,
|
||||
pub bitcoin_node_username: String,
|
||||
pub bitcoin_node_password: String,
|
||||
pub bitcoin_node_rpc_url: String,
|
||||
pub stacks_node_rpc_url: String,
|
||||
pub operators: Option<Vec<String>>,
|
||||
pub cache_path: Option<String>,
|
||||
}
|
||||
|
||||
impl EventObserverConfig {
|
||||
pub fn from_path(path: &PathBuf, ctx: &Context) -> EventObserverConfig {
|
||||
let path = match File::open(path) {
|
||||
Ok(path) => path,
|
||||
Err(_e) => {
|
||||
ctx.try_log(|logger| {
|
||||
slog::error!(
|
||||
logger,
|
||||
"Error: unable to locate Clarinet.toml in current directory"
|
||||
)
|
||||
});
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
let mut file_reader = BufReader::new(path);
|
||||
let mut file_buffer = vec![];
|
||||
file_reader.read_to_end(&mut file_buffer).unwrap();
|
||||
|
||||
let file: EventObserverConfigFile = match toml::from_slice(&file_buffer[..]) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| error!(logger, "Unable to read config {}", e));
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
EventObserverConfig::from_config_file(file, ctx)
|
||||
}
|
||||
|
||||
pub fn from_config_file(
|
||||
mut config_file: EventObserverConfigFile,
|
||||
_ctx: &Context,
|
||||
) -> EventObserverConfig {
|
||||
let event_handlers = match config_file.webhooks.take() {
|
||||
Some(webhooks) => webhooks
|
||||
.into_iter()
|
||||
.map(|h| EventHandler::WebHook(h))
|
||||
.collect::<Vec<_>>(),
|
||||
None => vec![],
|
||||
};
|
||||
let mut operators = HashSet::new();
|
||||
if let Some(operator_keys) = config_file.operators.take() {
|
||||
for operator_key in operator_keys.into_iter() {
|
||||
operators.insert(operator_key);
|
||||
}
|
||||
}
|
||||
|
||||
let config = EventObserverConfig {
|
||||
normalization_enabled: config_file.normalization_enabled.unwrap_or(true),
|
||||
grpc_server_enabled: config_file.grpc_server_enabled.unwrap_or(false),
|
||||
hooks_enabled: config_file.hooks_enabled.unwrap_or(false),
|
||||
chainhook_config: Some(ChainhookConfig::new()),
|
||||
bitcoin_rpc_proxy_enabled: config_file.bitcoin_rpc_proxy_enabled.unwrap_or(false),
|
||||
event_handlers: event_handlers,
|
||||
ingestion_port: config_file
|
||||
.ingestion_port
|
||||
.unwrap_or(observer::DEFAULT_INGESTION_PORT),
|
||||
control_port: config_file
|
||||
.control_port
|
||||
.unwrap_or(observer::DEFAULT_CONTROL_PORT),
|
||||
bitcoin_node_username: config_file.bitcoin_node_username.clone(),
|
||||
bitcoin_node_password: config_file.bitcoin_node_password.clone(),
|
||||
bitcoin_node_rpc_url: config_file.bitcoin_node_rpc_url.clone(),
|
||||
stacks_node_rpc_url: config_file.stacks_node_rpc_url.clone(),
|
||||
operators,
|
||||
display_logs: true,
|
||||
cache_path: config_file.cache_path.unwrap_or("cache".into()),
|
||||
bitcoin_network: chainhook_types::BitcoinNetwork::Mainnet, // todo(lgalabru)
|
||||
};
|
||||
config
|
||||
}
|
||||
}
|
||||
|
||||
fn get_config_path_or_exit(path: &Option<String>, ctx: &Context) -> PathBuf {
|
||||
if let Some(path) = path {
|
||||
let manifest_path = PathBuf::from(path);
|
||||
if !manifest_path.exists() {
|
||||
ctx.try_log(|logger| slog::error!(logger, "Could not find Observer.toml"));
|
||||
std::process::exit(1);
|
||||
}
|
||||
manifest_path
|
||||
} else {
|
||||
let mut current_dir = std::env::current_dir().unwrap();
|
||||
loop {
|
||||
current_dir.push("Observer.toml");
|
||||
|
||||
if current_dir.exists() {
|
||||
break current_dir;
|
||||
}
|
||||
current_dir.pop();
|
||||
|
||||
if !current_dir.pop() {
|
||||
ctx.try_log(|logger| slog::error!(logger, "Could not find Observer.toml"));
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -234,7 +234,7 @@ pub async fn start_event_observer(
|
||||
});
|
||||
|
||||
let ordinal_index = initialize_ordinal_index(&config).unwrap();
|
||||
match OrdinalIndexUpdater::update(&ordinal_index) {
|
||||
match OrdinalIndexUpdater::update(&ordinal_index).await {
|
||||
Ok(_r) => {}
|
||||
Err(e) => {
|
||||
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
|
||||
|
||||
Reference in New Issue
Block a user