mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-01-13 08:40:17 +08:00
feat: ability to download hord.sqlite
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use crate::config::Config;
|
||||
use chainhook_types::StacksNetwork;
|
||||
use chainhook_event_observer::utils::Context;
|
||||
use chainhook_types::{BitcoinNetwork, StacksNetwork};
|
||||
use clarinet_files::FileLocation;
|
||||
use flate2::read::GzDecoder;
|
||||
use futures_util::StreamExt;
|
||||
@@ -15,13 +16,21 @@ pub fn default_tsv_sha_file_path(network: &StacksNetwork) -> String {
|
||||
format!("{:?}-stacks-events.sha256", network).to_lowercase()
|
||||
}
|
||||
|
||||
pub fn default_sqlite_file_path(_network: &BitcoinNetwork) -> String {
|
||||
format!("hord.sqlite").to_lowercase()
|
||||
}
|
||||
|
||||
pub fn default_sqlite_sha_file_path(_network: &BitcoinNetwork) -> String {
|
||||
format!("hord.sqlite.sha256").to_lowercase()
|
||||
}
|
||||
|
||||
pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
|
||||
let mut destination_path = config.expected_cache_path();
|
||||
std::fs::create_dir_all(&destination_path).unwrap_or_else(|e| {
|
||||
println!("{}", e.to_string());
|
||||
});
|
||||
|
||||
let remote_sha_url = config.expected_remote_tsv_sha256();
|
||||
let remote_sha_url = config.expected_remote_stacks_tsv_sha256();
|
||||
let res = reqwest::get(&remote_sha_url)
|
||||
.await
|
||||
.or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))?
|
||||
@@ -32,11 +41,10 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
|
||||
let mut local_sha_file_path = destination_path.clone();
|
||||
local_sha_file_path.push(default_tsv_sha_file_path(&config.network.stacks_network));
|
||||
|
||||
println!("1");
|
||||
let local_sha_file = FileLocation::from_path(local_sha_file_path);
|
||||
let _ = local_sha_file.write_content(&res.to_vec());
|
||||
|
||||
let file_url = config.expected_remote_tsv_url();
|
||||
let file_url = config.expected_remote_stacks_tsv_url();
|
||||
let res = reqwest::get(&file_url)
|
||||
.await
|
||||
.or(Err(format!("Failed to GET from '{}'", &file_url)))?;
|
||||
@@ -76,6 +84,68 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn download_sqlite_file(config: &Config) -> Result<(), String> {
|
||||
let mut destination_path = config.expected_cache_path();
|
||||
std::fs::create_dir_all(&destination_path).unwrap_or_else(|e| {
|
||||
println!("{}", e.to_string());
|
||||
});
|
||||
|
||||
let remote_sha_url = config.expected_remote_ordinals_sqlite_sha256();
|
||||
let res = reqwest::get(&remote_sha_url)
|
||||
.await
|
||||
.or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))?
|
||||
.bytes()
|
||||
.await
|
||||
.or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))?;
|
||||
|
||||
let mut local_sha_file_path = destination_path.clone();
|
||||
local_sha_file_path.push(default_sqlite_sha_file_path(
|
||||
&config.network.bitcoin_network,
|
||||
));
|
||||
|
||||
let local_sha_file = FileLocation::from_path(local_sha_file_path);
|
||||
let _ = local_sha_file.write_content(&res.to_vec());
|
||||
|
||||
let file_url = config.expected_remote_ordinals_sqlite_url();
|
||||
let res = reqwest::get(&file_url)
|
||||
.await
|
||||
.or(Err(format!("Failed to GET from '{}'", &file_url)))?;
|
||||
|
||||
// Download chunks
|
||||
let (tx, rx) = flume::bounded(0);
|
||||
destination_path.push(default_sqlite_file_path(&config.network.bitcoin_network));
|
||||
|
||||
let decoder_thread = std::thread::spawn(move || {
|
||||
let input = ChannelRead::new(rx);
|
||||
let mut decoder = GzDecoder::new(input);
|
||||
let mut content = Vec::new();
|
||||
let _ = decoder.read_to_end(&mut content);
|
||||
let mut file = fs::File::create(&destination_path).unwrap();
|
||||
if let Err(e) = file.write_all(&content[..]) {
|
||||
println!("unable to write file: {}", e.to_string());
|
||||
std::process::exit(1);
|
||||
}
|
||||
});
|
||||
|
||||
if res.status() == reqwest::StatusCode::OK {
|
||||
let mut stream = res.bytes_stream();
|
||||
while let Some(item) = stream.next().await {
|
||||
let chunk = item.or(Err(format!("Error while downloading file")))?;
|
||||
tx.send_async(chunk.to_vec())
|
||||
.await
|
||||
.map_err(|e| format!("unable to download stacks event: {}", e.to_string()))?;
|
||||
}
|
||||
drop(tx);
|
||||
}
|
||||
|
||||
tokio::task::spawn_blocking(|| decoder_thread.join())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Wrap a channel into something that impls `io::Read`
|
||||
struct ChannelRead {
|
||||
rx: flume::Receiver<Vec<u8>>,
|
||||
@@ -105,3 +175,129 @@ impl Read for ChannelRead {
|
||||
self.current.read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Context) -> bool {
|
||||
if config.is_initial_ingestion_required() {
|
||||
// Download default tsv.
|
||||
if config.rely_on_remote_stacks_tsv() && config.should_download_remote_stacks_tsv() {
|
||||
let url = config.expected_remote_stacks_tsv_url();
|
||||
let mut tsv_file_path = config.expected_cache_path();
|
||||
tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network));
|
||||
let mut tsv_sha_file_path = config.expected_cache_path();
|
||||
tsv_sha_file_path.push(default_tsv_sha_file_path(&config.network.stacks_network));
|
||||
|
||||
// Download archive if not already present in cache
|
||||
// Load the local
|
||||
let local_sha_file = FileLocation::from_path(tsv_sha_file_path).read_content();
|
||||
let sha_url = config.expected_remote_stacks_tsv_sha256();
|
||||
|
||||
let remote_sha_file = match reqwest::get(&sha_url).await {
|
||||
Ok(response) => response.bytes().await,
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
match (local_sha_file, remote_sha_file) {
|
||||
(Ok(local), Ok(remote_response)) => {
|
||||
println!("{:?}", local);
|
||||
println!("{:?}", remote_response);
|
||||
}
|
||||
(Ok(local), _) => {
|
||||
// println!("Local: {:?}", local)
|
||||
}
|
||||
(_, _) => {
|
||||
// We will download the latest file
|
||||
println!("error reading local / remote");
|
||||
}
|
||||
}
|
||||
|
||||
if !tsv_file_path.exists() {
|
||||
info!(ctx.expect_logger(), "Downloading {}", url);
|
||||
match download_tsv_file(&config).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!(ctx.expect_logger(), "{}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Building in-memory chainstate from file {}",
|
||||
tsv_file_path.display()
|
||||
);
|
||||
}
|
||||
config.add_local_stacks_tsv_source(&tsv_file_path);
|
||||
}
|
||||
true
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Streaming blocks from stacks-node {}", config.network.stacks_node_rpc_url
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn download_ordinals_dataset_if_required(config: &Config, ctx: &Context) -> bool {
|
||||
if config.is_initial_ingestion_required() {
|
||||
// Download default tsv.
|
||||
if config.rely_on_remote_ordinals_sqlite()
|
||||
&& config.should_download_remote_ordinals_sqlite()
|
||||
{
|
||||
let url = config.expected_remote_ordinals_sqlite_url();
|
||||
let mut sqlite_file_path = config.expected_cache_path();
|
||||
sqlite_file_path.push(default_sqlite_file_path(&config.network.bitcoin_network));
|
||||
let mut tsv_sha_file_path = config.expected_cache_path();
|
||||
tsv_sha_file_path.push(default_sqlite_sha_file_path(
|
||||
&config.network.bitcoin_network,
|
||||
));
|
||||
|
||||
// Download archive if not already present in cache
|
||||
// Load the local
|
||||
let local_sha_file = FileLocation::from_path(tsv_sha_file_path).read_content();
|
||||
let sha_url = config.expected_remote_ordinals_sqlite_sha256();
|
||||
|
||||
let remote_sha_file = match reqwest::get(&sha_url).await {
|
||||
Ok(response) => response.bytes().await,
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
match (local_sha_file, remote_sha_file) {
|
||||
(Ok(local), Ok(remote_response)) => {
|
||||
println!("{:?}", local);
|
||||
println!("{:?}", remote_response);
|
||||
}
|
||||
(Ok(local), _) => {
|
||||
// println!("Local: {:?}", local)
|
||||
}
|
||||
(_, _) => {
|
||||
// We will download the latest file
|
||||
println!("error reading local / remote");
|
||||
}
|
||||
}
|
||||
|
||||
if !sqlite_file_path.exists() {
|
||||
info!(ctx.expect_logger(), "Downloading {}", url);
|
||||
match download_sqlite_file(&config).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!(ctx.expect_logger(), "{}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Basing ordinals evaluation on database {}",
|
||||
sqlite_file_path.display()
|
||||
);
|
||||
}
|
||||
// config.add_local_ordinals_sqlite_source(&sqlite_file_path);
|
||||
}
|
||||
true
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Streaming blocks from bitcoind {}", config.network.stacks_node_rpc_url
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,10 +12,12 @@ use std::path::PathBuf;
|
||||
|
||||
use crate::service::{DEFAULT_CONTROL_PORT, DEFAULT_INGESTION_PORT};
|
||||
|
||||
const DEFAULT_MAINNET_TSV_ARCHIVE: &str =
|
||||
const DEFAULT_MAINNET_STACKS_TSV_ARCHIVE: &str =
|
||||
"https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest";
|
||||
const DEFAULT_TESTNET_TSV_ARCHIVE: &str =
|
||||
const DEFAULT_TESTNET_STACKS_TSV_ARCHIVE: &str =
|
||||
"https://archive.hiro.so/testnet/stacks-blockchain-api/testnet-stacks-blockchain-api-latest";
|
||||
const DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE: &str =
|
||||
"https://archive.hiro.so/mainnet/chainhooks/hord-latest.sqlite";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Config {
|
||||
@@ -50,17 +52,19 @@ pub struct TikvConfig {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum EventSourceConfig {
|
||||
TsvPath(TsvPathConfig),
|
||||
TsvUrl(TsvUrlConfig),
|
||||
StacksTsvPath(PathConfig),
|
||||
StacksTsvUrl(UrlConfig),
|
||||
OrdinalsSqlitePath(PathConfig),
|
||||
OrdinalsSqliteUrl(UrlConfig),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TsvPathConfig {
|
||||
pub struct PathConfig {
|
||||
pub file_path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TsvUrlConfig {
|
||||
pub struct UrlConfig {
|
||||
pub file_url: String,
|
||||
}
|
||||
|
||||
@@ -125,11 +129,11 @@ impl Config {
|
||||
if let Some(dst) = source.tsv_file_path.take() {
|
||||
let mut file_path = PathBuf::new();
|
||||
file_path.push(dst);
|
||||
event_sources.push(EventSourceConfig::TsvPath(TsvPathConfig { file_path }));
|
||||
event_sources.push(EventSourceConfig::StacksTsvPath(PathConfig { file_path }));
|
||||
continue;
|
||||
}
|
||||
if let Some(file_url) = source.tsv_file_url.take() {
|
||||
event_sources.push(EventSourceConfig::TsvUrl(TsvUrlConfig { file_url }));
|
||||
event_sources.push(EventSourceConfig::StacksTsvUrl(UrlConfig { file_url }));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -174,15 +178,32 @@ impl Config {
|
||||
pub fn is_initial_ingestion_required(&self) -> bool {
|
||||
for source in self.event_sources.iter() {
|
||||
match source {
|
||||
EventSourceConfig::TsvUrl(_) | EventSourceConfig::TsvPath(_) => return true,
|
||||
EventSourceConfig::StacksTsvUrl(_) | EventSourceConfig::StacksTsvPath(_) => {
|
||||
return true
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn add_local_tsv_source(&mut self, file_path: &PathBuf) {
|
||||
pub fn add_local_stacks_tsv_source(&mut self, file_path: &PathBuf) {
|
||||
self.event_sources
|
||||
.push(EventSourceConfig::TsvPath(TsvPathConfig {
|
||||
.push(EventSourceConfig::StacksTsvPath(PathConfig {
|
||||
file_path: file_path.clone(),
|
||||
}));
|
||||
}
|
||||
|
||||
pub fn add_ordinals_sqlite_remote_source_url(&mut self, file_url: &str) {
|
||||
self.event_sources
|
||||
.push(EventSourceConfig::OrdinalsSqliteUrl(UrlConfig {
|
||||
file_url: file_url.to_string(),
|
||||
}));
|
||||
}
|
||||
|
||||
pub fn add_local_ordinals_sqlite_source(&mut self, file_path: &PathBuf) {
|
||||
self.event_sources
|
||||
.push(EventSourceConfig::OrdinalsSqlitePath(PathConfig {
|
||||
file_path: file_path.clone(),
|
||||
}));
|
||||
}
|
||||
@@ -201,9 +222,9 @@ impl Config {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn expected_local_tsv_file(&self) -> &PathBuf {
|
||||
pub fn expected_local_stacks_tsv_file(&self) -> &PathBuf {
|
||||
for source in self.event_sources.iter() {
|
||||
if let EventSourceConfig::TsvPath(config) = source {
|
||||
if let EventSourceConfig::StacksTsvPath(config) = source {
|
||||
return &config.file_path;
|
||||
}
|
||||
}
|
||||
@@ -216,40 +237,80 @@ impl Config {
|
||||
destination_path
|
||||
}
|
||||
|
||||
fn expected_remote_tsv_base_url(&self) -> &String {
|
||||
fn expected_remote_ordinals_sqlite_base_url(&self) -> &String {
|
||||
for source in self.event_sources.iter() {
|
||||
if let EventSourceConfig::TsvUrl(config) = source {
|
||||
if let EventSourceConfig::OrdinalsSqliteUrl(config) = source {
|
||||
return &config.file_url;
|
||||
}
|
||||
}
|
||||
panic!("expected remote-tsv source")
|
||||
}
|
||||
|
||||
pub fn expected_remote_tsv_sha256(&self) -> String {
|
||||
format!("{}.sha256", self.expected_remote_tsv_base_url())
|
||||
}
|
||||
|
||||
pub fn expected_remote_tsv_url(&self) -> String {
|
||||
format!("{}.gz", self.expected_remote_tsv_base_url())
|
||||
}
|
||||
|
||||
pub fn rely_on_remote_tsv(&self) -> bool {
|
||||
fn expected_remote_stacks_tsv_base_url(&self) -> &String {
|
||||
for source in self.event_sources.iter() {
|
||||
if let EventSourceConfig::TsvUrl(_config) = source {
|
||||
if let EventSourceConfig::StacksTsvUrl(config) = source {
|
||||
return &config.file_url;
|
||||
}
|
||||
}
|
||||
panic!("expected remote-tsv source")
|
||||
}
|
||||
|
||||
pub fn expected_remote_stacks_tsv_sha256(&self) -> String {
|
||||
format!("{}.sha256", self.expected_remote_stacks_tsv_base_url())
|
||||
}
|
||||
|
||||
pub fn expected_remote_stacks_tsv_url(&self) -> String {
|
||||
format!("{}.gz", self.expected_remote_stacks_tsv_base_url())
|
||||
}
|
||||
|
||||
pub fn expected_remote_ordinals_sqlite_sha256(&self) -> String {
|
||||
format!("{}.sha256", self.expected_remote_ordinals_sqlite_base_url())
|
||||
}
|
||||
|
||||
pub fn expected_remote_ordinals_sqlite_url(&self) -> String {
|
||||
format!("{}.gz", self.expected_remote_ordinals_sqlite_base_url())
|
||||
}
|
||||
|
||||
pub fn rely_on_remote_stacks_tsv(&self) -> bool {
|
||||
for source in self.event_sources.iter() {
|
||||
if let EventSourceConfig::StacksTsvUrl(_config) = source {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub fn should_download_remote_tsv(&self) -> bool {
|
||||
pub fn rely_on_remote_ordinals_sqlite(&self) -> bool {
|
||||
for source in self.event_sources.iter() {
|
||||
if let EventSourceConfig::OrdinalsSqliteUrl(_config) = source {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub fn should_download_remote_stacks_tsv(&self) -> bool {
|
||||
let mut rely_on_remote_tsv = false;
|
||||
let mut remote_tsv_present_locally = false;
|
||||
for source in self.event_sources.iter() {
|
||||
if let EventSourceConfig::TsvUrl(_config) = source {
|
||||
if let EventSourceConfig::StacksTsvUrl(_config) = source {
|
||||
rely_on_remote_tsv = true;
|
||||
}
|
||||
if let EventSourceConfig::TsvPath(_config) = source {
|
||||
if let EventSourceConfig::StacksTsvPath(_config) = source {
|
||||
remote_tsv_present_locally = true;
|
||||
}
|
||||
}
|
||||
rely_on_remote_tsv == true && remote_tsv_present_locally == false
|
||||
}
|
||||
|
||||
pub fn should_download_remote_ordinals_sqlite(&self) -> bool {
|
||||
let mut rely_on_remote_tsv = false;
|
||||
let mut remote_tsv_present_locally = false;
|
||||
for source in self.event_sources.iter() {
|
||||
if let EventSourceConfig::OrdinalsSqliteUrl(_config) = source {
|
||||
rely_on_remote_tsv = true;
|
||||
}
|
||||
if let EventSourceConfig::OrdinalsSqlitePath(_config) = source {
|
||||
remote_tsv_present_locally = true;
|
||||
}
|
||||
}
|
||||
@@ -308,8 +369,8 @@ impl Config {
|
||||
}),
|
||||
cache_path: default_cache_path(),
|
||||
},
|
||||
event_sources: vec![EventSourceConfig::TsvUrl(TsvUrlConfig {
|
||||
file_url: DEFAULT_TESTNET_TSV_ARCHIVE.into(),
|
||||
event_sources: vec![EventSourceConfig::StacksTsvUrl(UrlConfig {
|
||||
file_url: DEFAULT_TESTNET_STACKS_TSV_ARCHIVE.into(),
|
||||
})],
|
||||
chainhooks: ChainhooksConfig {
|
||||
max_stacks_registrations: 10,
|
||||
@@ -338,9 +399,14 @@ impl Config {
|
||||
}),
|
||||
cache_path: default_cache_path(),
|
||||
},
|
||||
event_sources: vec![EventSourceConfig::TsvUrl(TsvUrlConfig {
|
||||
file_url: DEFAULT_MAINNET_TSV_ARCHIVE.into(),
|
||||
})],
|
||||
event_sources: vec![
|
||||
EventSourceConfig::StacksTsvUrl(UrlConfig {
|
||||
file_url: DEFAULT_MAINNET_STACKS_TSV_ARCHIVE.into(),
|
||||
}),
|
||||
EventSourceConfig::OrdinalsSqliteUrl(UrlConfig {
|
||||
file_url: DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.into(),
|
||||
}),
|
||||
],
|
||||
chainhooks: ChainhooksConfig {
|
||||
max_stacks_registrations: 10,
|
||||
max_bitcoin_registrations: 10,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::archive::download_ordinals_dataset_if_required;
|
||||
use crate::config::Config;
|
||||
use chainhook_event_observer::bitcoincore_rpc::RpcApi;
|
||||
use chainhook_event_observer::bitcoincore_rpc::{Auth, Client};
|
||||
@@ -32,6 +33,8 @@ pub async fn scan_bitcoin_chainstate_via_http_using_predicate(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
let _ = download_ordinals_dataset_if_required(config, ctx).await;
|
||||
|
||||
let auth = Auth::UserPass(
|
||||
config.network.bitcoind_rpc_username.clone(),
|
||||
config.network.bitcoind_rpc_password.clone(),
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
process,
|
||||
};
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
|
||||
use crate::{
|
||||
archive,
|
||||
archive::download_stacks_dataset_if_required,
|
||||
block::{Record, RecordKind},
|
||||
config::Config,
|
||||
};
|
||||
@@ -21,7 +18,6 @@ use chainhook_event_observer::{
|
||||
utils::{file_append, send_request, AbstractStacksBlock},
|
||||
};
|
||||
use chainhook_types::BlockIdentifier;
|
||||
use clarinet_files::FileLocation;
|
||||
|
||||
pub async fn scan_stacks_chainstate_via_csv_using_predicate(
|
||||
predicate_spec: &StacksChainhookSpecification,
|
||||
@@ -38,9 +34,9 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
|
||||
}
|
||||
};
|
||||
|
||||
let _ = download_dataset_if_required(config, ctx).await;
|
||||
let _ = download_stacks_dataset_if_required(config, ctx).await;
|
||||
|
||||
let seed_tsv_path = config.expected_local_tsv_file().clone();
|
||||
let seed_tsv_path = config.expected_local_stacks_tsv_file().clone();
|
||||
|
||||
let (record_tx, record_rx) = std::sync::mpsc::channel();
|
||||
|
||||
@@ -196,69 +192,3 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
|
||||
|
||||
Ok(last_block_scanned)
|
||||
}
|
||||
|
||||
async fn download_dataset_if_required(config: &mut Config, ctx: &Context) -> bool {
|
||||
if config.is_initial_ingestion_required() {
|
||||
// Download default tsv.
|
||||
if config.rely_on_remote_tsv() && config.should_download_remote_tsv() {
|
||||
let url = config.expected_remote_tsv_url();
|
||||
let mut tsv_file_path = config.expected_cache_path();
|
||||
tsv_file_path.push(archive::default_tsv_file_path(
|
||||
&config.network.stacks_network,
|
||||
));
|
||||
let mut tsv_sha_file_path = config.expected_cache_path();
|
||||
tsv_sha_file_path.push(archive::default_tsv_sha_file_path(
|
||||
&config.network.stacks_network,
|
||||
));
|
||||
|
||||
// Download archive if not already present in cache
|
||||
// Load the local
|
||||
let local_sha_file = FileLocation::from_path(tsv_sha_file_path).read_content();
|
||||
let sha_url = config.expected_remote_tsv_sha256();
|
||||
|
||||
let remote_sha_file = match reqwest::get(&sha_url).await {
|
||||
Ok(response) => response.bytes().await,
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
match (local_sha_file, remote_sha_file) {
|
||||
(Ok(local), Ok(remote_response)) => {
|
||||
println!("{:?}", local);
|
||||
println!("{:?}", remote_response);
|
||||
}
|
||||
(Ok(local), _) => {
|
||||
// println!("Local: {:?}", local)
|
||||
println!("Here 2");
|
||||
}
|
||||
(_, _) => {
|
||||
// We will download the latest file
|
||||
println!("error reading local / remote");
|
||||
}
|
||||
}
|
||||
|
||||
if !tsv_file_path.exists() {
|
||||
info!(ctx.expect_logger(), "Downloading {}", url);
|
||||
match archive::download_tsv_file(&config).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!(ctx.expect_logger(), "{}", e);
|
||||
process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Building in-memory chainstate from file {}",
|
||||
tsv_file_path.display()
|
||||
);
|
||||
}
|
||||
config.add_local_tsv_source(&tsv_file_path);
|
||||
}
|
||||
true
|
||||
} else {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Streaming blocks from stacks-node {}", config.network.stacks_node_rpc_url
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -604,7 +604,8 @@ pub fn find_latest_inscription_number_at_block_height(
|
||||
"SELECT inscription_number FROM inscriptions WHERE block_height < ? ORDER BY inscription_number DESC LIMIT 1",
|
||||
)
|
||||
.map_err(|e| format!("unable to query inscriptions: {}", e.to_string()))?;
|
||||
let mut rows = stmt.query(args)
|
||||
let mut rows = stmt
|
||||
.query(args)
|
||||
.map_err(|e| format!("unable to query inscriptions: {}", e.to_string()))?;
|
||||
while let Ok(Some(row)) = rows.next() {
|
||||
let inscription_number: u64 = row.get(0).unwrap();
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::observer::BitcoinConfig;
|
||||
use crate::utils::Context;
|
||||
use bitcoincore_rpc::bitcoin::hashes::hex::FromHex;
|
||||
use bitcoincore_rpc::bitcoin::hashes::Hash;
|
||||
use bitcoincore_rpc::bitcoin::{self, Address, Amount, BlockHash, Script};
|
||||
use bitcoincore_rpc::bitcoin::{self, Address, Amount, BlockHash};
|
||||
use bitcoincore_rpc_json::{
|
||||
GetRawTransactionResultVinScriptSig, GetRawTransactionResultVoutScriptPubKey,
|
||||
};
|
||||
@@ -19,14 +19,13 @@ use chainhook_types::bitcoin::{OutPoint, TxIn, TxOut};
|
||||
use chainhook_types::{
|
||||
BitcoinBlockData, BitcoinBlockMetadata, BitcoinNetwork, BitcoinTransactionData,
|
||||
BitcoinTransactionMetadata, BlockCommitmentData, BlockHeader, BlockIdentifier,
|
||||
KeyRegistrationData, LockSTXData, PoxReward,
|
||||
StacksBaseChainOperation, StacksBlockCommitmentData, TransactionIdentifier, TransferSTXData,
|
||||
KeyRegistrationData, LockSTXData, PoxReward, StacksBaseChainOperation,
|
||||
StacksBlockCommitmentData, TransactionIdentifier, TransferSTXData,
|
||||
};
|
||||
use hiro_system_kit::slog;
|
||||
|
||||
use serde::Deserialize;
|
||||
|
||||
|
||||
#[derive(Clone, PartialEq, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct BitcoinBlockFullBreakdown {
|
||||
|
||||
@@ -19,7 +19,6 @@ use crate::indexer::bitcoin::{
|
||||
download_and_parse_block_with_retry, standardize_bitcoin_block, BitcoinBlockFullBreakdown,
|
||||
NewBitcoinBlock,
|
||||
};
|
||||
use crate::indexer::fork_scratch_pad::ForkScratchPad;
|
||||
use crate::indexer::{self, Indexer, IndexerConfig};
|
||||
use crate::utils::{send_request, Context};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user