feat: ability to download hord.sqlite

This commit is contained in:
Ludo Galabru
2023-04-25 22:08:51 -04:00
parent 57cd5b770f
commit 3dafa53ac0
7 changed files with 311 additions and 117 deletions

View File

@@ -1,5 +1,6 @@
use crate::config::Config;
use chainhook_types::StacksNetwork;
use chainhook_event_observer::utils::Context;
use chainhook_types::{BitcoinNetwork, StacksNetwork};
use clarinet_files::FileLocation;
use flate2::read::GzDecoder;
use futures_util::StreamExt;
@@ -15,13 +16,21 @@ pub fn default_tsv_sha_file_path(network: &StacksNetwork) -> String {
format!("{:?}-stacks-events.sha256", network).to_lowercase()
}
pub fn default_sqlite_file_path(_network: &BitcoinNetwork) -> String {
format!("hord.sqlite").to_lowercase()
}
pub fn default_sqlite_sha_file_path(_network: &BitcoinNetwork) -> String {
format!("hord.sqlite.sha256").to_lowercase()
}
pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
let mut destination_path = config.expected_cache_path();
std::fs::create_dir_all(&destination_path).unwrap_or_else(|e| {
println!("{}", e.to_string());
});
let remote_sha_url = config.expected_remote_tsv_sha256();
let remote_sha_url = config.expected_remote_stacks_tsv_sha256();
let res = reqwest::get(&remote_sha_url)
.await
.or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))?
@@ -32,11 +41,10 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
let mut local_sha_file_path = destination_path.clone();
local_sha_file_path.push(default_tsv_sha_file_path(&config.network.stacks_network));
println!("1");
let local_sha_file = FileLocation::from_path(local_sha_file_path);
let _ = local_sha_file.write_content(&res.to_vec());
let file_url = config.expected_remote_tsv_url();
let file_url = config.expected_remote_stacks_tsv_url();
let res = reqwest::get(&file_url)
.await
.or(Err(format!("Failed to GET from '{}'", &file_url)))?;
@@ -76,6 +84,68 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
Ok(())
}
pub async fn download_sqlite_file(config: &Config) -> Result<(), String> {
let mut destination_path = config.expected_cache_path();
std::fs::create_dir_all(&destination_path).unwrap_or_else(|e| {
println!("{}", e.to_string());
});
let remote_sha_url = config.expected_remote_ordinals_sqlite_sha256();
let res = reqwest::get(&remote_sha_url)
.await
.or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))?
.bytes()
.await
.or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))?;
let mut local_sha_file_path = destination_path.clone();
local_sha_file_path.push(default_sqlite_sha_file_path(
&config.network.bitcoin_network,
));
let local_sha_file = FileLocation::from_path(local_sha_file_path);
let _ = local_sha_file.write_content(&res.to_vec());
let file_url = config.expected_remote_ordinals_sqlite_url();
let res = reqwest::get(&file_url)
.await
.or(Err(format!("Failed to GET from '{}'", &file_url)))?;
// Download chunks
let (tx, rx) = flume::bounded(0);
destination_path.push(default_sqlite_file_path(&config.network.bitcoin_network));
let decoder_thread = std::thread::spawn(move || {
let input = ChannelRead::new(rx);
let mut decoder = GzDecoder::new(input);
let mut content = Vec::new();
let _ = decoder.read_to_end(&mut content);
let mut file = fs::File::create(&destination_path).unwrap();
if let Err(e) = file.write_all(&content[..]) {
println!("unable to write file: {}", e.to_string());
std::process::exit(1);
}
});
if res.status() == reqwest::StatusCode::OK {
let mut stream = res.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item.or(Err(format!("Error while downloading file")))?;
tx.send_async(chunk.to_vec())
.await
.map_err(|e| format!("unable to download stacks event: {}", e.to_string()))?;
}
drop(tx);
}
tokio::task::spawn_blocking(|| decoder_thread.join())
.await
.unwrap()
.unwrap();
Ok(())
}
// Wrap a channel into something that impls `io::Read`
struct ChannelRead {
rx: flume::Receiver<Vec<u8>>,
@@ -105,3 +175,129 @@ impl Read for ChannelRead {
self.current.read(buf)
}
}
pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Context) -> bool {
if config.is_initial_ingestion_required() {
// Download default tsv.
if config.rely_on_remote_stacks_tsv() && config.should_download_remote_stacks_tsv() {
let url = config.expected_remote_stacks_tsv_url();
let mut tsv_file_path = config.expected_cache_path();
tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network));
let mut tsv_sha_file_path = config.expected_cache_path();
tsv_sha_file_path.push(default_tsv_sha_file_path(&config.network.stacks_network));
// Download archive if not already present in cache
// Load the local
let local_sha_file = FileLocation::from_path(tsv_sha_file_path).read_content();
let sha_url = config.expected_remote_stacks_tsv_sha256();
let remote_sha_file = match reqwest::get(&sha_url).await {
Ok(response) => response.bytes().await,
Err(e) => Err(e),
};
match (local_sha_file, remote_sha_file) {
(Ok(local), Ok(remote_response)) => {
println!("{:?}", local);
println!("{:?}", remote_response);
}
(Ok(local), _) => {
// println!("Local: {:?}", local)
}
(_, _) => {
// We will download the latest file
println!("error reading local / remote");
}
}
if !tsv_file_path.exists() {
info!(ctx.expect_logger(), "Downloading {}", url);
match download_tsv_file(&config).await {
Ok(_) => {}
Err(e) => {
error!(ctx.expect_logger(), "{}", e);
std::process::exit(1);
}
}
} else {
info!(
ctx.expect_logger(),
"Building in-memory chainstate from file {}",
tsv_file_path.display()
);
}
config.add_local_stacks_tsv_source(&tsv_file_path);
}
true
} else {
info!(
ctx.expect_logger(),
"Streaming blocks from stacks-node {}", config.network.stacks_node_rpc_url
);
false
}
}
pub async fn download_ordinals_dataset_if_required(config: &Config, ctx: &Context) -> bool {
if config.is_initial_ingestion_required() {
// Download default tsv.
if config.rely_on_remote_ordinals_sqlite()
&& config.should_download_remote_ordinals_sqlite()
{
let url = config.expected_remote_ordinals_sqlite_url();
let mut sqlite_file_path = config.expected_cache_path();
sqlite_file_path.push(default_sqlite_file_path(&config.network.bitcoin_network));
let mut tsv_sha_file_path = config.expected_cache_path();
tsv_sha_file_path.push(default_sqlite_sha_file_path(
&config.network.bitcoin_network,
));
// Download archive if not already present in cache
// Load the local
let local_sha_file = FileLocation::from_path(tsv_sha_file_path).read_content();
let sha_url = config.expected_remote_ordinals_sqlite_sha256();
let remote_sha_file = match reqwest::get(&sha_url).await {
Ok(response) => response.bytes().await,
Err(e) => Err(e),
};
match (local_sha_file, remote_sha_file) {
(Ok(local), Ok(remote_response)) => {
println!("{:?}", local);
println!("{:?}", remote_response);
}
(Ok(local), _) => {
// println!("Local: {:?}", local)
}
(_, _) => {
// We will download the latest file
println!("error reading local / remote");
}
}
if !sqlite_file_path.exists() {
info!(ctx.expect_logger(), "Downloading {}", url);
match download_sqlite_file(&config).await {
Ok(_) => {}
Err(e) => {
error!(ctx.expect_logger(), "{}", e);
std::process::exit(1);
}
}
} else {
info!(
ctx.expect_logger(),
"Basing ordinals evaluation on database {}",
sqlite_file_path.display()
);
}
// config.add_local_ordinals_sqlite_source(&sqlite_file_path);
}
true
} else {
info!(
ctx.expect_logger(),
"Streaming blocks from bitcoind {}", config.network.stacks_node_rpc_url
);
false
}
}

View File

@@ -12,10 +12,12 @@ use std::path::PathBuf;
use crate::service::{DEFAULT_CONTROL_PORT, DEFAULT_INGESTION_PORT};
const DEFAULT_MAINNET_TSV_ARCHIVE: &str =
const DEFAULT_MAINNET_STACKS_TSV_ARCHIVE: &str =
"https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest";
const DEFAULT_TESTNET_TSV_ARCHIVE: &str =
const DEFAULT_TESTNET_STACKS_TSV_ARCHIVE: &str =
"https://archive.hiro.so/testnet/stacks-blockchain-api/testnet-stacks-blockchain-api-latest";
const DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE: &str =
"https://archive.hiro.so/mainnet/chainhooks/hord-latest.sqlite";
#[derive(Clone, Debug)]
pub struct Config {
@@ -50,17 +52,19 @@ pub struct TikvConfig {
#[derive(Clone, Debug)]
pub enum EventSourceConfig {
TsvPath(TsvPathConfig),
TsvUrl(TsvUrlConfig),
StacksTsvPath(PathConfig),
StacksTsvUrl(UrlConfig),
OrdinalsSqlitePath(PathConfig),
OrdinalsSqliteUrl(UrlConfig),
}
#[derive(Clone, Debug)]
pub struct TsvPathConfig {
pub struct PathConfig {
pub file_path: PathBuf,
}
#[derive(Clone, Debug)]
pub struct TsvUrlConfig {
pub struct UrlConfig {
pub file_url: String,
}
@@ -125,11 +129,11 @@ impl Config {
if let Some(dst) = source.tsv_file_path.take() {
let mut file_path = PathBuf::new();
file_path.push(dst);
event_sources.push(EventSourceConfig::TsvPath(TsvPathConfig { file_path }));
event_sources.push(EventSourceConfig::StacksTsvPath(PathConfig { file_path }));
continue;
}
if let Some(file_url) = source.tsv_file_url.take() {
event_sources.push(EventSourceConfig::TsvUrl(TsvUrlConfig { file_url }));
event_sources.push(EventSourceConfig::StacksTsvUrl(UrlConfig { file_url }));
continue;
}
}
@@ -174,15 +178,32 @@ impl Config {
pub fn is_initial_ingestion_required(&self) -> bool {
for source in self.event_sources.iter() {
match source {
EventSourceConfig::TsvUrl(_) | EventSourceConfig::TsvPath(_) => return true,
EventSourceConfig::StacksTsvUrl(_) | EventSourceConfig::StacksTsvPath(_) => {
return true
}
_ => {}
}
}
return false;
}
pub fn add_local_tsv_source(&mut self, file_path: &PathBuf) {
pub fn add_local_stacks_tsv_source(&mut self, file_path: &PathBuf) {
self.event_sources
.push(EventSourceConfig::TsvPath(TsvPathConfig {
.push(EventSourceConfig::StacksTsvPath(PathConfig {
file_path: file_path.clone(),
}));
}
pub fn add_ordinals_sqlite_remote_source_url(&mut self, file_url: &str) {
self.event_sources
.push(EventSourceConfig::OrdinalsSqliteUrl(UrlConfig {
file_url: file_url.to_string(),
}));
}
pub fn add_local_ordinals_sqlite_source(&mut self, file_path: &PathBuf) {
self.event_sources
.push(EventSourceConfig::OrdinalsSqlitePath(PathConfig {
file_path: file_path.clone(),
}));
}
@@ -201,9 +222,9 @@ impl Config {
}
}
pub fn expected_local_tsv_file(&self) -> &PathBuf {
pub fn expected_local_stacks_tsv_file(&self) -> &PathBuf {
for source in self.event_sources.iter() {
if let EventSourceConfig::TsvPath(config) = source {
if let EventSourceConfig::StacksTsvPath(config) = source {
return &config.file_path;
}
}
@@ -216,40 +237,80 @@ impl Config {
destination_path
}
fn expected_remote_tsv_base_url(&self) -> &String {
fn expected_remote_ordinals_sqlite_base_url(&self) -> &String {
for source in self.event_sources.iter() {
if let EventSourceConfig::TsvUrl(config) = source {
if let EventSourceConfig::OrdinalsSqliteUrl(config) = source {
return &config.file_url;
}
}
panic!("expected remote-tsv source")
}
pub fn expected_remote_tsv_sha256(&self) -> String {
format!("{}.sha256", self.expected_remote_tsv_base_url())
}
pub fn expected_remote_tsv_url(&self) -> String {
format!("{}.gz", self.expected_remote_tsv_base_url())
}
pub fn rely_on_remote_tsv(&self) -> bool {
fn expected_remote_stacks_tsv_base_url(&self) -> &String {
for source in self.event_sources.iter() {
if let EventSourceConfig::TsvUrl(_config) = source {
if let EventSourceConfig::StacksTsvUrl(config) = source {
return &config.file_url;
}
}
panic!("expected remote-tsv source")
}
pub fn expected_remote_stacks_tsv_sha256(&self) -> String {
format!("{}.sha256", self.expected_remote_stacks_tsv_base_url())
}
pub fn expected_remote_stacks_tsv_url(&self) -> String {
format!("{}.gz", self.expected_remote_stacks_tsv_base_url())
}
pub fn expected_remote_ordinals_sqlite_sha256(&self) -> String {
format!("{}.sha256", self.expected_remote_ordinals_sqlite_base_url())
}
pub fn expected_remote_ordinals_sqlite_url(&self) -> String {
format!("{}.gz", self.expected_remote_ordinals_sqlite_base_url())
}
pub fn rely_on_remote_stacks_tsv(&self) -> bool {
for source in self.event_sources.iter() {
if let EventSourceConfig::StacksTsvUrl(_config) = source {
return true;
}
}
false
}
pub fn should_download_remote_tsv(&self) -> bool {
pub fn rely_on_remote_ordinals_sqlite(&self) -> bool {
for source in self.event_sources.iter() {
if let EventSourceConfig::OrdinalsSqliteUrl(_config) = source {
return true;
}
}
false
}
pub fn should_download_remote_stacks_tsv(&self) -> bool {
let mut rely_on_remote_tsv = false;
let mut remote_tsv_present_locally = false;
for source in self.event_sources.iter() {
if let EventSourceConfig::TsvUrl(_config) = source {
if let EventSourceConfig::StacksTsvUrl(_config) = source {
rely_on_remote_tsv = true;
}
if let EventSourceConfig::TsvPath(_config) = source {
if let EventSourceConfig::StacksTsvPath(_config) = source {
remote_tsv_present_locally = true;
}
}
rely_on_remote_tsv == true && remote_tsv_present_locally == false
}
pub fn should_download_remote_ordinals_sqlite(&self) -> bool {
let mut rely_on_remote_tsv = false;
let mut remote_tsv_present_locally = false;
for source in self.event_sources.iter() {
if let EventSourceConfig::OrdinalsSqliteUrl(_config) = source {
rely_on_remote_tsv = true;
}
if let EventSourceConfig::OrdinalsSqlitePath(_config) = source {
remote_tsv_present_locally = true;
}
}
@@ -308,8 +369,8 @@ impl Config {
}),
cache_path: default_cache_path(),
},
event_sources: vec![EventSourceConfig::TsvUrl(TsvUrlConfig {
file_url: DEFAULT_TESTNET_TSV_ARCHIVE.into(),
event_sources: vec![EventSourceConfig::StacksTsvUrl(UrlConfig {
file_url: DEFAULT_TESTNET_STACKS_TSV_ARCHIVE.into(),
})],
chainhooks: ChainhooksConfig {
max_stacks_registrations: 10,
@@ -338,9 +399,14 @@ impl Config {
}),
cache_path: default_cache_path(),
},
event_sources: vec![EventSourceConfig::TsvUrl(TsvUrlConfig {
file_url: DEFAULT_MAINNET_TSV_ARCHIVE.into(),
})],
event_sources: vec![
EventSourceConfig::StacksTsvUrl(UrlConfig {
file_url: DEFAULT_MAINNET_STACKS_TSV_ARCHIVE.into(),
}),
EventSourceConfig::OrdinalsSqliteUrl(UrlConfig {
file_url: DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.into(),
}),
],
chainhooks: ChainhooksConfig {
max_stacks_registrations: 10,
max_bitcoin_registrations: 10,

View File

@@ -1,3 +1,4 @@
use crate::archive::download_ordinals_dataset_if_required;
use crate::config::Config;
use chainhook_event_observer::bitcoincore_rpc::RpcApi;
use chainhook_event_observer::bitcoincore_rpc::{Auth, Client};
@@ -32,6 +33,8 @@ pub async fn scan_bitcoin_chainstate_via_http_using_predicate(
config: &Config,
ctx: &Context,
) -> Result<(), String> {
let _ = download_ordinals_dataset_if_required(config, ctx).await;
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),

View File

@@ -1,10 +1,7 @@
use std::{
collections::{HashMap, VecDeque},
process,
};
use std::collections::{HashMap, VecDeque};
use crate::{
archive,
archive::download_stacks_dataset_if_required,
block::{Record, RecordKind},
config::Config,
};
@@ -21,7 +18,6 @@ use chainhook_event_observer::{
utils::{file_append, send_request, AbstractStacksBlock},
};
use chainhook_types::BlockIdentifier;
use clarinet_files::FileLocation;
pub async fn scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec: &StacksChainhookSpecification,
@@ -38,9 +34,9 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
}
};
let _ = download_dataset_if_required(config, ctx).await;
let _ = download_stacks_dataset_if_required(config, ctx).await;
let seed_tsv_path = config.expected_local_tsv_file().clone();
let seed_tsv_path = config.expected_local_stacks_tsv_file().clone();
let (record_tx, record_rx) = std::sync::mpsc::channel();
@@ -196,69 +192,3 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
Ok(last_block_scanned)
}
async fn download_dataset_if_required(config: &mut Config, ctx: &Context) -> bool {
if config.is_initial_ingestion_required() {
// Download default tsv.
if config.rely_on_remote_tsv() && config.should_download_remote_tsv() {
let url = config.expected_remote_tsv_url();
let mut tsv_file_path = config.expected_cache_path();
tsv_file_path.push(archive::default_tsv_file_path(
&config.network.stacks_network,
));
let mut tsv_sha_file_path = config.expected_cache_path();
tsv_sha_file_path.push(archive::default_tsv_sha_file_path(
&config.network.stacks_network,
));
// Download archive if not already present in cache
// Load the local
let local_sha_file = FileLocation::from_path(tsv_sha_file_path).read_content();
let sha_url = config.expected_remote_tsv_sha256();
let remote_sha_file = match reqwest::get(&sha_url).await {
Ok(response) => response.bytes().await,
Err(e) => Err(e),
};
match (local_sha_file, remote_sha_file) {
(Ok(local), Ok(remote_response)) => {
println!("{:?}", local);
println!("{:?}", remote_response);
}
(Ok(local), _) => {
// println!("Local: {:?}", local)
println!("Here 2");
}
(_, _) => {
// We will download the latest file
println!("error reading local / remote");
}
}
if !tsv_file_path.exists() {
info!(ctx.expect_logger(), "Downloading {}", url);
match archive::download_tsv_file(&config).await {
Ok(_) => {}
Err(e) => {
error!(ctx.expect_logger(), "{}", e);
process::exit(1);
}
}
} else {
info!(
ctx.expect_logger(),
"Building in-memory chainstate from file {}",
tsv_file_path.display()
);
}
config.add_local_tsv_source(&tsv_file_path);
}
true
} else {
info!(
ctx.expect_logger(),
"Streaming blocks from stacks-node {}", config.network.stacks_node_rpc_url
);
false
}
}

View File

@@ -604,7 +604,8 @@ pub fn find_latest_inscription_number_at_block_height(
"SELECT inscription_number FROM inscriptions WHERE block_height < ? ORDER BY inscription_number DESC LIMIT 1",
)
.map_err(|e| format!("unable to query inscriptions: {}", e.to_string()))?;
let mut rows = stmt.query(args)
let mut rows = stmt
.query(args)
.map_err(|e| format!("unable to query inscriptions: {}", e.to_string()))?;
while let Ok(Some(row)) = rows.next() {
let inscription_number: u64 = row.get(0).unwrap();

View File

@@ -10,7 +10,7 @@ use crate::observer::BitcoinConfig;
use crate::utils::Context;
use bitcoincore_rpc::bitcoin::hashes::hex::FromHex;
use bitcoincore_rpc::bitcoin::hashes::Hash;
use bitcoincore_rpc::bitcoin::{self, Address, Amount, BlockHash, Script};
use bitcoincore_rpc::bitcoin::{self, Address, Amount, BlockHash};
use bitcoincore_rpc_json::{
GetRawTransactionResultVinScriptSig, GetRawTransactionResultVoutScriptPubKey,
};
@@ -19,14 +19,13 @@ use chainhook_types::bitcoin::{OutPoint, TxIn, TxOut};
use chainhook_types::{
BitcoinBlockData, BitcoinBlockMetadata, BitcoinNetwork, BitcoinTransactionData,
BitcoinTransactionMetadata, BlockCommitmentData, BlockHeader, BlockIdentifier,
KeyRegistrationData, LockSTXData, PoxReward,
StacksBaseChainOperation, StacksBlockCommitmentData, TransactionIdentifier, TransferSTXData,
KeyRegistrationData, LockSTXData, PoxReward, StacksBaseChainOperation,
StacksBlockCommitmentData, TransactionIdentifier, TransferSTXData,
};
use hiro_system_kit::slog;
use serde::Deserialize;
#[derive(Clone, PartialEq, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BitcoinBlockFullBreakdown {

View File

@@ -19,7 +19,6 @@ use crate::indexer::bitcoin::{
download_and_parse_block_with_retry, standardize_bitcoin_block, BitcoinBlockFullBreakdown,
NewBitcoinBlock,
};
use crate::indexer::fork_scratch_pad::ForkScratchPad;
use crate::indexer::{self, Indexer, IndexerConfig};
use crate::utils::{send_request, Context};