fix: initial flow (#178)

* chore: update chainhook-sdk + cascade changes

* fix: update archive url

* feat: only create rocksdb if sqlite present

* fix: use crossbeam channel instead of std

* fix: improve error message

* doc: update README

* fix: build warnings

* fix: block_archiving expiration

* fix: archive url

* fix: read content len from http header

* chore: untar sqlite file

* chore: bump versions
This commit is contained in:
Ludo Galabru
2023-09-20 00:14:54 -04:00
committed by GitHub
parent ac3d4580f9
commit 8bb24beb9a
17 changed files with 671 additions and 794 deletions

1301
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,5 +4,5 @@ members = [
"components/ordhook-core",
"components/ordhook-sdk-js"
]
default-members = ["components/ordhook-cli"]
resolver = "2"

View File

@@ -112,4 +112,15 @@ will spin up a HTTP API for managing events destinations.
A comprehensive OpenAPI specification explaining how to interact with this HTTP REST API can be found [here](https://github.com/hirosystems/chainhook/blob/develop/docs/chainhook-openapi.json).
---
### Troubleshooting: Performance and System Requirements
The Ordinals Theory protocol is resource-intensive, demanding significant CPU, memory, and disk capabilities. As we continue to refine and optimize, keep in mind the following system requirements and recommendations to ensure optimal performance:
CPU: The ordhook tool efficiently utilizes multiple cores when detected at runtime, parallelizing tasks to boost performance.
Memory: A minimum of 16GB RAM is recommended.
Disk: To enhance I/O performance, SSD or NVMe storage is suggested.
OS Requirements: Ensure your system allows for a minimum of 4096 open file descriptors. Configuration may vary based on your operating system. On certain systems, this can be adjusted using the `ulimit` command or the `launchctl limit` command.

View File

@@ -1,6 +1,6 @@
[package]
name = "ordhook-cli"
version = "1.0.1"
version = "1.0.2"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -15,7 +15,7 @@ serde = "1"
serde_json = "1"
serde_derive = "1"
reqwest = { version = "0.11", features = ["stream", "json"] }
hiro-system-kit = "0.1.0"
hiro-system-kit = "0.3.1"
clap = { version = "3.2.23", features = ["derive"], optional = true }
clap_generate = { version = "3.0.3", optional = true }
toml = { version = "0.5.6", features = ["preserve_order"], optional = true }

View File

@@ -37,7 +37,7 @@ max_caching_memory_size_mb = 32000
# Disable the following section if the state
# must be built locally
[bootstrap]
download_url = "https://archive.hiro.so/mainnet/chainhooks/hord.sqlite"
download_url = "https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest"
[logs]
ordinals_internals = true

View File

@@ -1,6 +1,6 @@
[package]
name = "ordhook"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
[dependencies]
@@ -12,9 +12,9 @@ redis = "0.21.5"
serde-redis = "0.12.0"
hex = "0.4.3"
rand = "0.8.5"
chainhook-sdk = { version = "=0.9.0", default-features = false, features = ["zeromq", "log"] }
chainhook-sdk = { version = "=0.9.5", default-features = false, features = ["zeromq", "log"] }
# chainhook-sdk = { version = "=0.9.0", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq", "log"] }
hiro-system-kit = "0.1.0"
hiro-system-kit = "0.3.1"
reqwest = { version = "0.11", features = ["stream", "json"] }
tokio = { version = "=1.24", features = ["full"] }
futures-util = "0.3.24"

View File

@@ -7,7 +7,7 @@ use chainhook_sdk::types::{
use std::path::PathBuf;
const DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE: &str =
"https://archive.hiro.so/mainnet/chainhooks/hord.sqlite";
"https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest";
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
@@ -155,7 +155,7 @@ impl Config {
}
pub fn expected_remote_ordinals_sqlite_url(&self) -> String {
format!("{}.gz", self.expected_remote_ordinals_sqlite_base_url())
format!("{}.tar.gz", self.expected_remote_ordinals_sqlite_base_url())
}
pub fn devnet_default() -> Config {

View File

@@ -14,12 +14,12 @@ use chainhook_sdk::{
use crate::{
config::{Config, LogConfig},
db::find_lazy_block_at_block_height,
db::{find_lazy_block_at_block_height, open_readwrite_ordhook_db_conn_rocks_db},
};
use crate::db::{
find_last_block_inserted, find_latest_inscription_block_height, initialize_ordhook_db,
open_readonly_ordhook_db_conn, open_readonly_ordhook_db_conn_rocks_db,
open_readonly_ordhook_db_conn,
};
use crate::db::LazyBlockTransaction;
@@ -94,6 +94,26 @@ pub fn compute_next_satpoint_data(
SatPosition::Output((output_index, (offset_cross_inputs - offset_intra_outputs)))
}
pub fn should_sync_rocks_db(
config: &Config,
ctx: &Context,
) -> Result<Option<(u64, u64)>, String> {
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
let last_compressed_block = find_last_block_inserted(&blocks_db) as u64;
let last_indexed_block = match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)? {
Some(last_indexed_block) => last_indexed_block,
None => 0
};
let res = if last_compressed_block < last_indexed_block {
Some((last_compressed_block, last_indexed_block))
} else {
None
};
Ok(res)
}
pub fn should_sync_ordhook_db(
config: &Config,
ctx: &Context,
@@ -110,7 +130,7 @@ pub fn should_sync_ordhook_db(
}
};
let blocks_db = open_readonly_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let mut start_block = find_last_block_inserted(&blocks_db) as u64;
if start_block == 0 {

View File

@@ -1,18 +1,16 @@
use std::{
sync::mpsc::Sender,
thread::{sleep, JoinHandle},
time::Duration,
};
use crossbeam_channel::{Sender, TryRecvError};
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
use crossbeam_channel::TryRecvError;
use rocksdb::DB;
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{
insert_entry_in_blocks, open_ordhook_db_conn_rocks_db_loop,
insert_entry_in_blocks,
open_readwrite_ordhook_db_conn_rocks_db, LazyBlock,
},
};
@@ -30,10 +28,9 @@ pub fn start_block_archiving_processor(
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
.spawn(move || {
let mut blocks_db_rw =
let blocks_db_rw =
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
.unwrap();
let mut empty_cycles = 0;
let mut processed_blocks = 0;
loop {
@@ -49,16 +46,7 @@ pub fn start_block_archiving_processor(
}
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 30 {
warn!(ctx.expect_logger(), "Block processor reached expiration");
let _ = events_tx.send(PostProcessorEvent::Expired);
break;
}
sleep(Duration::from_secs(1));
if empty_cycles > 120 {
break;
}
continue;
}
_ => {
@@ -71,11 +59,6 @@ pub fn start_block_archiving_processor(
if processed_blocks % 10_000 == 0 {
let _ = blocks_db_rw.flush_wal(true);
blocks_db_rw = open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
&ctx,
);
}
}

View File

@@ -228,7 +228,7 @@ pub fn open_readonly_ordhook_db_conn_rocks_db(
opts.set_disable_auto_compactions(true);
opts.set_max_background_jobs(0);
let db = DB::open_for_read_only(&opts, path, false)
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
.map_err(|e| format!("unable to open hord.rocksdb: {}", e.to_string()))?;
Ok(db)
}
@@ -276,7 +276,7 @@ pub fn open_readwrite_ordhook_db_conn_rocks_db(
let path = get_default_ordhook_db_file_path_rocks_db(&base_dir);
let opts = rocks_db_default_options();
let db = DB::open(&opts, path)
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
.map_err(|e| format!("unable to open hord.rocksdb: {}", e.to_string()))?;
Ok(db)
}

View File

@@ -6,6 +6,7 @@ use flate2::read::GzDecoder;
use futures_util::StreamExt;
use progressing::mapping::Bar as MappingBar;
use progressing::Baring;
use tar::Archive;
use std::fs;
use std::io::{self, Cursor};
use std::io::{Read, Write};
@@ -19,7 +20,7 @@ pub fn default_sqlite_sha_file_path(_network: &BitcoinNetwork) -> String {
}
pub async fn download_sqlite_file(config: &Config, _ctx: &Context) -> Result<(), String> {
let mut destination_path = config.expected_cache_path();
let destination_path = config.expected_cache_path();
std::fs::create_dir_all(&destination_path).unwrap_or_else(|e| {
println!("{}", e.to_string());
});
@@ -46,22 +47,21 @@ pub async fn download_sqlite_file(config: &Config, _ctx: &Context) -> Result<(),
// Download chunks
let (tx, rx) = flume::bounded(0);
destination_path.push(default_sqlite_file_path(&config.network.bitcoin_network));
let decoder_thread = std::thread::spawn(move || {
let input = ChannelRead::new(rx);
let mut decoder = GzDecoder::new(input);
let mut content = Vec::new();
let _ = decoder.read_to_end(&mut content);
let mut file = fs::File::create(&destination_path).unwrap();
if let Err(e) = file.write_all(&content[..]) {
let mut archive = Archive::new(&content[..]);
if let Err(e) = archive.unpack(&destination_path) {
println!("unable to write file: {}", e.to_string());
std::process::exit(1);
}
});
if res.status() == reqwest::StatusCode::OK {
let limit = 5_400_000_000;
let limit = res.content_length().unwrap_or(10_000_000_000) as i64;
let mut progress_bar = MappingBar::with_range(0i64, limit);
progress_bar.set_len(60);
let mut stdout = std::io::stdout();

View File

@@ -234,14 +234,14 @@ pub async fn process_block_with_predicates(
predicates: &Vec<&BitcoinChainhookSpecification>,
event_observer_config: &EventObserverConfig,
ctx: &Context,
) -> Result<u32, ()> {
) -> Result<u32, String> {
let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});
let (predicates_triggered, _predicates_evaluated) =
let (predicates_triggered, _predicates_evaluated, _) =
evaluate_bitcoin_chainhooks_on_chain_event(&chain_event, predicates, ctx);
execute_predicates_action(predicates_triggered, &event_observer_config, &ctx).await
@@ -251,7 +251,7 @@ pub async fn execute_predicates_action<'a>(
hits: Vec<BitcoinTriggerChainhook<'a>>,
config: &EventObserverConfig,
ctx: &Context,
) -> Result<u32, ()> {
) -> Result<u32, String> {
let mut actions_triggered = 0;
let mut proofs = HashMap::new();
for trigger in hits.into_iter() {

View File

@@ -99,7 +99,7 @@ fn handle_get_predicates(
let serialized_predicates = predicates
.iter()
.map(|(p, _)| p.into_serialized_json())
.map(|(p, s)| serialized_predicate_with_status(p, s))
.collect::<Vec<_>>();
Json(json!({
@@ -311,3 +311,27 @@ pub fn load_predicates_from_redis(
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?;
get_entries_from_predicates_db(&mut predicate_db_conn, ctx)
}
fn serialized_predicate_with_status(
predicate: &ChainhookSpecification,
status: &PredicateStatus,
) -> JsonValue {
match (predicate, status) {
(ChainhookSpecification::Stacks(spec), status) => json!({
"chain": "stacks",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": status,
"enabled": spec.enabled,
}),
(ChainhookSpecification::Bitcoin(spec), status) => json!({
"chain": "bitcoin",
"uuid": spec.uuid,
"network": spec.network,
"predicate": spec.predicate,
"status": status,
"enabled": spec.enabled,
}),
}
}

View File

@@ -4,6 +4,7 @@ mod runloops;
use crate::config::{Config, PredicatesApi};
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
use crate::core::pipeline::processors::inscription_indexing::process_block;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor;
@@ -11,7 +12,7 @@ use crate::core::protocol::inscription_parsing::{
get_inscriptions_revealed_in_block, parse_inscriptions_in_standardized_block,
};
use crate::core::protocol::inscription_sequencing::SequenceCursor;
use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db};
use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db, should_sync_rocks_db};
use crate::db::{
delete_data_in_ordhook_db, insert_entry_in_blocks,
update_inscriptions_with_block, update_locations_with_block,
@@ -465,6 +466,38 @@ impl Service {
&self,
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
) -> Result<(), String> {
// First, make sure that rocksdb and sqlite are aligned.
// If rocksdb.chain_tip.height <= sqlite.chain_tip.height
// Perform some block compression until that height.
if let Some((start_block, end_block)) = should_sync_rocks_db(&self.config, &self.ctx)? {
let blocks_post_processor = start_block_archiving_processor(
&self.config,
&self.ctx,
false,
block_post_processor.clone(),
);
self.ctx.try_log(|logger| {
info!(
logger,
"Compressing blocks (from #{start_block} to #{end_block})"
)
});
let ordhook_config = self.config.get_ordhook_config();
let first_inscription_height = ordhook_config.first_inscription_height;
let blocks = BlockHeights::BlockRange(start_block, end_block).get_sorted_entries();
download_and_pipeline_blocks(
&self.config,
blocks.into(),
first_inscription_height,
Some(&blocks_post_processor),
10_000,
&self.ctx,
)
.await?;
}
// Start predicate processor
let mut last_block_processed = 0;
while let Some((start_block, end_block, speed)) =

View File

@@ -248,6 +248,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
blocks: None,
start_block: None,
end_block: None,
expired_at: None,
expire_after_occurrence: None,
predicate: chainhook_sdk::chainhooks::types::BitcoinPredicateType::OrdinalsProtocol(
chainhook_sdk::chainhooks::types::OrdinalOperations::InscriptionFeed,

View File

@@ -1,6 +1,6 @@
[package]
name = "ordhook-sdk-js"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
exclude = ["index.node"]

View File

@@ -82,7 +82,7 @@ max_caching_memory_size_mb = 32000
# Disable the following section if the state
# must be built locally
[bootstrap]
download_url = "https://archive.hiro.so/mainnet/chainhooks/hord.sqlite"
download_url = "https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest"
[logs]
ordinals_internals = true