mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-12 16:52:57 +08:00
feat: support for latest archives, add logs
This commit is contained in:
@@ -1,14 +0,0 @@
|
||||
[storage]
|
||||
driver = "redis"
|
||||
redis_uri = "redis://localhost:6379/"
|
||||
|
||||
[chainhooks]
|
||||
max_stacks_registrations = 500
|
||||
max_bitcoin_registrations = 500
|
||||
|
||||
[network]
|
||||
mode = "devnet"
|
||||
bitcoin_node_rpc_url = "http://0.0.0.0:18443"
|
||||
bitcoin_node_rpc_username = "devnet"
|
||||
bitcoin_node_rpc_password = "devnet"
|
||||
stacks_node_rpc_url = "http://0.0.0.0:20443"
|
||||
@@ -3,16 +3,15 @@ use chainhook_types::StacksNetwork;
|
||||
use flate2::read::GzDecoder;
|
||||
use futures_util::StreamExt;
|
||||
use std::fs;
|
||||
use std::io::Read;
|
||||
use std::io::{self, Cursor};
|
||||
use tar::Archive;
|
||||
use std::io::{Read, Write};
|
||||
|
||||
pub fn default_tsv_file_path(network: &StacksNetwork) -> String {
|
||||
format!("stacks-node-events-{:?}.tsv", network).to_lowercase()
|
||||
format!("{:?}-stacks-events.tsv", network).to_lowercase()
|
||||
}
|
||||
|
||||
pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
|
||||
let destination_path = config.expected_cache_path();
|
||||
let mut destination_path = config.expected_cache_path();
|
||||
let url = config.expected_remote_tsv_url();
|
||||
let res = reqwest::get(url)
|
||||
.await
|
||||
@@ -20,22 +19,24 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
|
||||
|
||||
// Download chunks
|
||||
let (tx, rx) = flume::bounded(0);
|
||||
destination_path.push(default_tsv_file_path(&config.network.stacks_network));
|
||||
|
||||
let mut from = destination_path.clone();
|
||||
let decoder_thread = std::thread::spawn(move || {
|
||||
let input = ChannelRead::new(rx);
|
||||
let gz = GzDecoder::new(input);
|
||||
let mut archive = Archive::new(gz);
|
||||
archive.unpack(&destination_path).unwrap();
|
||||
let mut decoder = GzDecoder::new(input);
|
||||
let mut content = Vec::new();
|
||||
let _ = decoder.read_to_end(&mut content);
|
||||
let mut file = fs::File::create(&destination_path).unwrap();
|
||||
file.write_all(&content[..]).unwrap();
|
||||
});
|
||||
|
||||
if res.status() == reqwest::StatusCode::OK {
|
||||
let mut stream = res.bytes_stream();
|
||||
while let Some(item) = stream.next().await {
|
||||
let chunk = item
|
||||
.or(Err(format!("Error while downloading file")))
|
||||
.unwrap();
|
||||
tx.send_async(chunk.to_vec()).await.unwrap();
|
||||
let chunk = item.or(Err(format!("Error while downloading file")))?;
|
||||
tx.send_async(chunk.to_vec())
|
||||
.await
|
||||
.map_err(|e| format!("unable to download stacks event: {}", e.to_string()))?;
|
||||
}
|
||||
drop(tx);
|
||||
}
|
||||
@@ -45,12 +46,6 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
from.push("stacks-node-events.tsv");
|
||||
let mut to = from.clone();
|
||||
to.pop();
|
||||
to.push(default_tsv_file_path(&config.network.stacks_network));
|
||||
let _ = fs::rename(from, to);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,14 @@
|
||||
use crate::block::DigestingCommand;
|
||||
use crate::config::Config;
|
||||
use crate::config::generator::generate_config;
|
||||
use crate::config::Config;
|
||||
use crate::node::Node;
|
||||
use crate::scan::bitcoin::scan_bitcoin_chain_with_predicate;
|
||||
use crate::scan::stacks::scan_stacks_chain_with_predicate;
|
||||
|
||||
use chainhook_event_observer::chainhooks::types::ChainhookFullSpecification;
|
||||
use chainhook_event_observer::indexer::ordinals::db::{
|
||||
build_bitcoin_traversal_local_storage,
|
||||
find_inscriptions_at_wached_outpoint, initialize_ordinal_state_storage,
|
||||
open_readonly_ordinals_db_conn,
|
||||
build_bitcoin_traversal_local_storage, find_inscriptions_at_wached_outpoint,
|
||||
initialize_ordinal_state_storage, open_readonly_ordinals_db_conn,
|
||||
retrieve_satoshi_point_using_local_storage,
|
||||
};
|
||||
use chainhook_event_observer::observer::BitcoinConfig;
|
||||
@@ -318,7 +317,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
|
||||
.map_err(|e| format!("unable to write file {}\n{}", file_path.display(), e))?;
|
||||
println!("Created file Chainhook.toml");
|
||||
}
|
||||
}
|
||||
},
|
||||
Command::Predicates(subcmd) => match subcmd {
|
||||
PredicatesCommand::New(_cmd) => {
|
||||
// let manifest = clarinet_files::get_manifest_location(None);
|
||||
|
||||
@@ -3,7 +3,7 @@ pub fn generate_config() -> String {
|
||||
r#"[storage]
|
||||
driver = "redis"
|
||||
redis_uri = "redis://localhost:6379/"
|
||||
cache_path = "chainhook_cache"
|
||||
cache_path = "cache"
|
||||
|
||||
[chainhooks]
|
||||
max_stacks_registrations = 500
|
||||
@@ -15,6 +15,9 @@ bitcoin_node_rpc_url = "http://localhost:8332"
|
||||
bitcoin_node_rpc_username = "devnet"
|
||||
bitcoin_node_rpc_password = "devnet"
|
||||
stacks_node_rpc_url = "http://localhost:20443"
|
||||
|
||||
[[event_source]]
|
||||
tsv_file_url = "https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest.gz"
|
||||
"#
|
||||
);
|
||||
return conf;
|
||||
|
||||
@@ -8,10 +8,10 @@ use std::fs::File;
|
||||
use std::io::{BufReader, Read};
|
||||
use std::path::PathBuf;
|
||||
|
||||
const DEFAULT_MAINNET_TSV_ARCHIVE: &str = "https://storage.googleapis.com/hirosystems-archive/mainnet/api/mainnet-blockchain-api-latest.tar.gz";
|
||||
const DEFAULT_TESTNET_TSV_ARCHIVE: &str = "https://storage.googleapis.com/hirosystems-archive/testnet/api/testnet-blockchain-api-latest.tar.gz";
|
||||
// const DEFAULT_MAINNET_TSV_ARCHIVE: &str = "https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest.gz";
|
||||
// const DEFAULT_TESTNET_TSV_ARCHIVE: &str = "https://archive.hiro.so/testnet/stacks-blockchain-api/testnet-stacks-blockchain-api-latest.gz";
|
||||
const DEFAULT_MAINNET_TSV_ARCHIVE: &str =
|
||||
"https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest.gz";
|
||||
const DEFAULT_TESTNET_TSV_ARCHIVE: &str =
|
||||
"https://archive.hiro.so/testnet/stacks-blockchain-api/testnet-stacks-blockchain-api-latest.gz";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Config {
|
||||
@@ -99,6 +99,20 @@ impl Config {
|
||||
_ => return Err("network.mode not supported".to_string()),
|
||||
};
|
||||
|
||||
let mut event_sources = vec![];
|
||||
for source in config_file.event_source.unwrap_or(vec![]).iter_mut() {
|
||||
if let Some(dst) = source.tsv_file_path.take() {
|
||||
let mut file_path = PathBuf::new();
|
||||
file_path.push(dst);
|
||||
event_sources.push(EventSourceConfig::TsvPath(TsvPathConfig { file_path }));
|
||||
continue;
|
||||
}
|
||||
if let Some(file_url) = source.tsv_file_url.take() {
|
||||
event_sources.push(EventSourceConfig::TsvUrl(TsvUrlConfig { file_url }));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let config = Config {
|
||||
storage: StorageConfig {
|
||||
driver: StorageDriver::Redis(RedisConfig {
|
||||
@@ -106,9 +120,7 @@ impl Config {
|
||||
}),
|
||||
cache_path: config_file.storage.cache_path.unwrap_or("cache".into()),
|
||||
},
|
||||
event_sources: vec![EventSourceConfig::StacksNode(StacksNodeConfig {
|
||||
host: config_file.network.stacks_node_rpc_url.to_string(),
|
||||
})],
|
||||
event_sources,
|
||||
chainhooks: ChainhooksConfig {
|
||||
max_stacks_registrations: config_file
|
||||
.chainhooks
|
||||
|
||||
@@ -153,7 +153,14 @@ pub async fn scan_stacks_chain_with_predicate(
|
||||
};
|
||||
let proofs = HashMap::new();
|
||||
|
||||
let mut actions_triggered = 0;
|
||||
let mut blocks_scanned = 0;
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Starting predicate evaluation on blocks"
|
||||
);
|
||||
for (_block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
|
||||
blocks_scanned += 1;
|
||||
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
|
||||
&indexer.config,
|
||||
&blob,
|
||||
@@ -183,15 +190,24 @@ pub async fn scan_stacks_chain_with_predicate(
|
||||
Err(e) => {
|
||||
error!(ctx.expect_logger(), "unable to handle action {}", e);
|
||||
}
|
||||
Ok(StacksChainhookOccurrence::Http(request)) => {
|
||||
send_request(request, &ctx).await;
|
||||
Ok(action) => {
|
||||
actions_triggered += 1;
|
||||
match action {
|
||||
StacksChainhookOccurrence::Http(request) => {
|
||||
send_request(request, &ctx).await;
|
||||
}
|
||||
StacksChainhookOccurrence::File(path, bytes) => {
|
||||
file_append(path, bytes, &ctx);
|
||||
}
|
||||
StacksChainhookOccurrence::Data(_payload) => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok(StacksChainhookOccurrence::File(path, bytes)) => {
|
||||
file_append(path, bytes, &ctx);
|
||||
}
|
||||
Ok(StacksChainhookOccurrence::Data(_payload)) => unreachable!(),
|
||||
}
|
||||
}
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -215,6 +231,12 @@ async fn download_dataset_if_required(config: &mut Config, ctx: &Context) -> boo
|
||||
process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Building in-memory chainstate from file {}",
|
||||
destination_path.display()
|
||||
);
|
||||
}
|
||||
config.add_local_tsv_source(&destination_path);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user