fix: async/await regression

This commit is contained in:
Ludo Galabru
2023-03-02 03:04:08 -05:00
parent 1f45ec26da
commit 676aac196d
8 changed files with 70 additions and 250 deletions

View File

@@ -158,7 +158,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
};
let ordinal_index = initialize_ordinal_index(&event_observer_config).unwrap();
match OrdinalIndexUpdater::update(&ordinal_index) {
match OrdinalIndexUpdater::update(&ordinal_index).await {
Ok(_r) => {}
Err(e) => {}
}
@@ -171,7 +171,7 @@ pub async fn scan_bitcoin_chain_with_predicate(
raw_block,
&mut bitcoin_context,
ctx,
)?;
).await?;
let mut hits = vec![];
for tx in block.transactions.iter() {

View File

@@ -106,7 +106,7 @@ pub async fn retrieve_full_block(
Ok((block_height, block))
}
pub fn standardize_bitcoin_block(
pub async fn standardize_bitcoin_block(
indexer_config: &IndexerConfig,
block_height: u64,
block: Block,
@@ -115,7 +115,7 @@ pub fn standardize_bitcoin_block(
) -> Result<BitcoinBlockData, String> {
let mut transactions = vec![];
match OrdinalIndexUpdater::update(&mut bitcoin_context.ordinal_index) {
match OrdinalIndexUpdater::update(&mut bitcoin_context.ordinal_index).await {
Ok(_) => {
ctx.try_log(|logger| {
slog::info!(

View File

@@ -85,13 +85,13 @@ impl Indexer {
block: Block,
ctx: &Context,
) -> Result<Option<BitcoinChainEvent>, String> {
let block = bitcoin::standardize_bitcoin_block(
let block = hiro_system_kit::nestable_block_on(bitcoin::standardize_bitcoin_block(
&self.config,
block_height,
block,
&mut self.bitcoin_context,
ctx,
)?;
))?;
let event = self.bitcoin_blocks_pool.process_block(block, ctx);
event
}

View File

@@ -367,8 +367,8 @@ impl OrdinalIndex {
Ok(info)
}
pub fn update(&self) -> Result {
OrdinalIndexUpdater::update(self)
pub async fn update(&self) -> Result {
OrdinalIndexUpdater::update(self).await
}
pub fn is_reorged(&self) -> bool {

View File

@@ -50,7 +50,7 @@ pub struct OrdinalIndexUpdater {
}
impl OrdinalIndexUpdater {
pub fn update(index: &OrdinalIndex) -> Result {
pub async fn update(index: &OrdinalIndex) -> Result {
let wtx = index.begin_write()?;
let height = wtx
@@ -80,10 +80,10 @@ impl OrdinalIndexUpdater {
outputs_traversed: 0,
};
updater.update_index(index, wtx)
updater.update_index(index, wtx).await
}
fn update_index<'index>(
async fn update_index<'index>(
&mut self,
index: &'index OrdinalIndex,
mut wtx: WriteTransaction<'index>,
@@ -109,7 +109,7 @@ impl OrdinalIndexUpdater {
&mut wtx,
block,
&mut value_cache,
)?;
).await?;
uncommitted += 1;
@@ -305,12 +305,12 @@ impl OrdinalIndexUpdater {
Ok((outpoint_sender, value_receiver))
}
fn index_block(
async fn index_block(
&mut self,
index: &OrdinalIndex,
outpoint_sender: &mut Sender<OutPoint>,
value_receiver: &mut Receiver<u64>,
wtx: &mut WriteTransaction,
wtx: &mut WriteTransaction<'_>,
block: BlockData,
value_cache: &mut HashMap<OutPoint, u64>,
) -> Result<()> {
@@ -351,7 +351,7 @@ impl OrdinalIndexUpdater {
continue;
}
// We don't know the value of this tx input. Send this outpoint to background thread to be fetched
outpoint_sender.blocking_send(prev_output)?;
let _ = outpoint_sender.send(prev_output).await?;
}
}
}
@@ -409,7 +409,7 @@ impl OrdinalIndexUpdater {
)?;
for (tx, txid) in block.txdata.iter().skip(1).chain(block.txdata.first()) {
lost_sats += inscription_updater.index_transaction_inscriptions(tx, *txid, None)?;
lost_sats += inscription_updater.index_transaction_inscriptions(tx, *txid, None).await?;
}
statistic_to_count.insert(&Statistic::LostSats.key(), &lost_sats)?;
@@ -427,68 +427,68 @@ impl OrdinalIndexUpdater {
Ok(())
}
fn index_transaction_sats(
&mut self,
tx: &Transaction,
txid: Txid,
sat_to_satpoint: &mut Table<u64, &SatPointValue>,
input_sat_ranges: &mut VecDeque<(u64, u64)>,
sat_ranges_written: &mut u64,
outputs_traversed: &mut u64,
inscription_updater: &mut InscriptionUpdater,
) -> Result {
inscription_updater.index_transaction_inscriptions(tx, txid, Some(input_sat_ranges))?;
// fn index_transaction_sats(
// &mut self,
// tx: &Transaction,
// txid: Txid,
// sat_to_satpoint: &mut Table<u64, &SatPointValue>,
// input_sat_ranges: &mut VecDeque<(u64, u64)>,
// sat_ranges_written: &mut u64,
// outputs_traversed: &mut u64,
// inscription_updater: &mut InscriptionUpdater,
// ) -> Result {
// inscription_updater.index_transaction_inscriptions(tx, txid, Some(input_sat_ranges))?;
for (vout, output) in tx.output.iter().enumerate() {
let outpoint = OutPoint {
vout: vout.try_into().unwrap(),
txid,
};
let mut sats = Vec::new();
// for (vout, output) in tx.output.iter().enumerate() {
// let outpoint = OutPoint {
// vout: vout.try_into().unwrap(),
// txid,
// };
// let mut sats = Vec::new();
let mut remaining = output.value;
while remaining > 0 {
let range = input_sat_ranges.pop_front().ok_or_else(|| {
anyhow::anyhow!("insufficient inputs for transaction outputs")
})?;
// let mut remaining = output.value;
// while remaining > 0 {
// let range = input_sat_ranges.pop_front().ok_or_else(|| {
// anyhow::anyhow!("insufficient inputs for transaction outputs")
// })?;
if !Sat(range.0).is_common() {
sat_to_satpoint.insert(
&range.0,
&SatPoint {
outpoint,
offset: output.value - remaining,
}
.store(),
)?;
}
// if !Sat(range.0).is_common() {
// sat_to_satpoint.insert(
// &range.0,
// &SatPoint {
// outpoint,
// offset: output.value - remaining,
// }
// .store(),
// )?;
// }
let count = range.1 - range.0;
// let count = range.1 - range.0;
let assigned = if count > remaining {
self.sat_ranges_since_flush += 1;
let middle = range.0 + remaining;
input_sat_ranges.push_front((middle, range.1));
(range.0, middle)
} else {
range
};
// let assigned = if count > remaining {
// self.sat_ranges_since_flush += 1;
// let middle = range.0 + remaining;
// input_sat_ranges.push_front((middle, range.1));
// (range.0, middle)
// } else {
// range
// };
sats.extend_from_slice(&assigned.store());
// sats.extend_from_slice(&assigned.store());
remaining -= assigned.1 - assigned.0;
// remaining -= assigned.1 - assigned.0;
*sat_ranges_written += 1;
}
// *sat_ranges_written += 1;
// }
*outputs_traversed += 1;
// *outputs_traversed += 1;
self.range_cache.insert(outpoint.store(), sats);
self.outputs_inserted_since_flush += 1;
}
// self.range_cache.insert(outpoint.store(), sats);
// self.outputs_inserted_since_flush += 1;
// }
Ok(())
}
// Ok(())
// }
fn commit(&mut self, wtx: WriteTransaction, value_cache: HashMap<OutPoint, u64>) -> Result {
println!(

View File

@@ -81,7 +81,7 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> {
})
}
pub(super) fn index_transaction_inscriptions(
pub(super) async fn index_transaction_inscriptions(
&mut self,
tx: &Transaction,
txid: Txid,
@@ -114,7 +114,7 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> {
{
value.value()
} else {
self.value_receiver.blocking_recv().ok_or_else(|| {
self.value_receiver.recv().await.ok_or_else(|| {
anyhow::anyhow!(
"failed to get transaction for {}",
tx_in.previous_output.txid

View File

@@ -1,180 +0,0 @@
#[macro_use]
extern crate rocket;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate serde_json;
#[macro_use]
extern crate hiro_system_kit;
pub extern crate bitcoincore_rpc;
pub mod chainhooks;
pub mod indexer;
pub mod observer;
pub mod utils;
use crate::utils::Context;
use hiro_system_kit::log::setup_logger;
use hiro_system_kit::slog;
use crate::chainhooks::types::ChainhookConfig;
use clap::Parser;
use ctrlc;
use observer::{EventHandler, EventObserverConfig, ObserverCommand};
use std::collections::HashSet;
use std::fs::File;
use std::io::{BufReader, Read};
use std::path::PathBuf;
use std::sync::mpsc::channel;
/// Simple program to greet a person
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
/// Path of the config to load
#[clap(short, long)]
config_path: Option<String>,
}
#[rocket::main]
async fn main() {
let context = Context {
logger: Some(setup_logger()),
tracer: false,
};
let args = Args::parse();
let config_path = get_config_path_or_exit(&args.config_path, &context);
let config = EventObserverConfig::from_path(&config_path, &context);
let (command_tx, command_rx) = channel();
let tx_terminator = command_tx.clone();
ctrlc::set_handler(move || {
tx_terminator
.send(ObserverCommand::Terminate)
.expect("Could not send signal on channel.")
})
.expect("Error setting Ctrl-C handler");
let _ = observer::start_event_observer(config, command_tx, command_rx, None, context).await;
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct EventObserverConfigFile {
pub normalization_enabled: Option<bool>,
pub grpc_server_enabled: Option<bool>,
pub hooks_enabled: Option<bool>,
pub bitcoin_rpc_proxy_enabled: Option<bool>,
pub webhooks: Option<Vec<String>>,
pub ingestion_port: Option<u16>,
pub control_port: Option<u16>,
pub bitcoin_node_username: String,
pub bitcoin_node_password: String,
pub bitcoin_node_rpc_url: String,
pub stacks_node_rpc_url: String,
pub operators: Option<Vec<String>>,
pub cache_path: Option<String>,
}
impl EventObserverConfig {
pub fn from_path(path: &PathBuf, ctx: &Context) -> EventObserverConfig {
let path = match File::open(path) {
Ok(path) => path,
Err(_e) => {
ctx.try_log(|logger| {
slog::error!(
logger,
"Error: unable to locate Clarinet.toml in current directory"
)
});
std::process::exit(1);
}
};
let mut file_reader = BufReader::new(path);
let mut file_buffer = vec![];
file_reader.read_to_end(&mut file_buffer).unwrap();
let file: EventObserverConfigFile = match toml::from_slice(&file_buffer[..]) {
Ok(s) => s,
Err(e) => {
ctx.try_log(|logger| error!(logger, "Unable to read config {}", e));
std::process::exit(1);
}
};
EventObserverConfig::from_config_file(file, ctx)
}
pub fn from_config_file(
mut config_file: EventObserverConfigFile,
_ctx: &Context,
) -> EventObserverConfig {
let event_handlers = match config_file.webhooks.take() {
Some(webhooks) => webhooks
.into_iter()
.map(|h| EventHandler::WebHook(h))
.collect::<Vec<_>>(),
None => vec![],
};
let mut operators = HashSet::new();
if let Some(operator_keys) = config_file.operators.take() {
for operator_key in operator_keys.into_iter() {
operators.insert(operator_key);
}
}
let config = EventObserverConfig {
normalization_enabled: config_file.normalization_enabled.unwrap_or(true),
grpc_server_enabled: config_file.grpc_server_enabled.unwrap_or(false),
hooks_enabled: config_file.hooks_enabled.unwrap_or(false),
chainhook_config: Some(ChainhookConfig::new()),
bitcoin_rpc_proxy_enabled: config_file.bitcoin_rpc_proxy_enabled.unwrap_or(false),
event_handlers: event_handlers,
ingestion_port: config_file
.ingestion_port
.unwrap_or(observer::DEFAULT_INGESTION_PORT),
control_port: config_file
.control_port
.unwrap_or(observer::DEFAULT_CONTROL_PORT),
bitcoin_node_username: config_file.bitcoin_node_username.clone(),
bitcoin_node_password: config_file.bitcoin_node_password.clone(),
bitcoin_node_rpc_url: config_file.bitcoin_node_rpc_url.clone(),
stacks_node_rpc_url: config_file.stacks_node_rpc_url.clone(),
operators,
display_logs: true,
cache_path: config_file.cache_path.unwrap_or("cache".into()),
bitcoin_network: chainhook_types::BitcoinNetwork::Mainnet, // todo(lgalabru)
};
config
}
}
fn get_config_path_or_exit(path: &Option<String>, ctx: &Context) -> PathBuf {
if let Some(path) = path {
let manifest_path = PathBuf::from(path);
if !manifest_path.exists() {
ctx.try_log(|logger| slog::error!(logger, "Could not find Observer.toml"));
std::process::exit(1);
}
manifest_path
} else {
let mut current_dir = std::env::current_dir().unwrap();
loop {
current_dir.push("Observer.toml");
if current_dir.exists() {
break current_dir;
}
current_dir.pop();
if !current_dir.pop() {
ctx.try_log(|logger| slog::error!(logger, "Could not find Observer.toml"));
std::process::exit(1);
}
}
}
}

View File

@@ -234,7 +234,7 @@ pub async fn start_event_observer(
});
let ordinal_index = initialize_ordinal_index(&config).unwrap();
match OrdinalIndexUpdater::update(&ordinal_index) {
match OrdinalIndexUpdater::update(&ordinal_index).await {
Ok(_r) => {}
Err(e) => {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));