mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-14 00:22:19 +08:00
fix: initial flow (#178)
* chore: update chainhook-sdk + cascade changes * fix: update archive url * feat: only create rocksdb if sqlite present * fix: use crossbeam channel instead of std * fix: improve error message * doc: update README * fix: build warnings * fix: block_archiving expiration * fix: archive url * fix: read content len from http header * chore: untar sqlite file * chore: bump versions
This commit is contained in:
1301
Cargo.lock
generated
1301
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -4,5 +4,5 @@ members = [
|
||||
"components/ordhook-core",
|
||||
"components/ordhook-sdk-js"
|
||||
]
|
||||
|
||||
default-members = ["components/ordhook-cli"]
|
||||
resolver = "2"
|
||||
|
||||
11
README.md
11
README.md
@@ -112,4 +112,15 @@ will spin up a HTTP API for managing events destinations.
|
||||
|
||||
A comprehensive OpenAPI specification explaining how to interact with this HTTP REST API can be found [here](https://github.com/hirosystems/chainhook/blob/develop/docs/chainhook-openapi.json).
|
||||
|
||||
---
|
||||
### Troubleshooting: Performance and System Requirements
|
||||
|
||||
The Ordinals Theory protocol is resource-intensive, demanding significant CPU, memory, and disk capabilities. As we continue to refine and optimize, keep in mind the following system requirements and recommendations to ensure optimal performance:
|
||||
|
||||
CPU: The ordhook tool efficiently utilizes multiple cores when detected at runtime, parallelizing tasks to boost performance.
|
||||
|
||||
Memory: A minimum of 16GB RAM is recommended.
|
||||
|
||||
Disk: To enhance I/O performance, SSD or NVMe storage is suggested.
|
||||
|
||||
OS Requirements: Ensure your system allows for a minimum of 4096 open file descriptors. Configuration may vary based on your operating system. On certain systems, this can be adjusted using the `ulimit` command or the `launchctl limit` command.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "ordhook-cli"
|
||||
version = "1.0.1"
|
||||
version = "1.0.2"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
@@ -15,7 +15,7 @@ serde = "1"
|
||||
serde_json = "1"
|
||||
serde_derive = "1"
|
||||
reqwest = { version = "0.11", features = ["stream", "json"] }
|
||||
hiro-system-kit = "0.1.0"
|
||||
hiro-system-kit = "0.3.1"
|
||||
clap = { version = "3.2.23", features = ["derive"], optional = true }
|
||||
clap_generate = { version = "3.0.3", optional = true }
|
||||
toml = { version = "0.5.6", features = ["preserve_order"], optional = true }
|
||||
|
||||
@@ -37,7 +37,7 @@ max_caching_memory_size_mb = 32000
|
||||
# Disable the following section if the state
|
||||
# must be built locally
|
||||
[bootstrap]
|
||||
download_url = "https://archive.hiro.so/mainnet/chainhooks/hord.sqlite"
|
||||
download_url = "https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest"
|
||||
|
||||
[logs]
|
||||
ordinals_internals = true
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "ordhook"
|
||||
version = "0.4.0"
|
||||
version = "0.5.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
@@ -12,9 +12,9 @@ redis = "0.21.5"
|
||||
serde-redis = "0.12.0"
|
||||
hex = "0.4.3"
|
||||
rand = "0.8.5"
|
||||
chainhook-sdk = { version = "=0.9.0", default-features = false, features = ["zeromq", "log"] }
|
||||
chainhook-sdk = { version = "=0.9.5", default-features = false, features = ["zeromq", "log"] }
|
||||
# chainhook-sdk = { version = "=0.9.0", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq", "log"] }
|
||||
hiro-system-kit = "0.1.0"
|
||||
hiro-system-kit = "0.3.1"
|
||||
reqwest = { version = "0.11", features = ["stream", "json"] }
|
||||
tokio = { version = "=1.24", features = ["full"] }
|
||||
futures-util = "0.3.24"
|
||||
|
||||
@@ -7,7 +7,7 @@ use chainhook_sdk::types::{
|
||||
use std::path::PathBuf;
|
||||
|
||||
const DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE: &str =
|
||||
"https://archive.hiro.so/mainnet/chainhooks/hord.sqlite";
|
||||
"https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest";
|
||||
|
||||
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
|
||||
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
|
||||
@@ -155,7 +155,7 @@ impl Config {
|
||||
}
|
||||
|
||||
pub fn expected_remote_ordinals_sqlite_url(&self) -> String {
|
||||
format!("{}.gz", self.expected_remote_ordinals_sqlite_base_url())
|
||||
format!("{}.tar.gz", self.expected_remote_ordinals_sqlite_base_url())
|
||||
}
|
||||
|
||||
pub fn devnet_default() -> Config {
|
||||
|
||||
@@ -14,12 +14,12 @@ use chainhook_sdk::{
|
||||
|
||||
use crate::{
|
||||
config::{Config, LogConfig},
|
||||
db::find_lazy_block_at_block_height,
|
||||
db::{find_lazy_block_at_block_height, open_readwrite_ordhook_db_conn_rocks_db},
|
||||
};
|
||||
|
||||
use crate::db::{
|
||||
find_last_block_inserted, find_latest_inscription_block_height, initialize_ordhook_db,
|
||||
open_readonly_ordhook_db_conn, open_readonly_ordhook_db_conn_rocks_db,
|
||||
open_readonly_ordhook_db_conn,
|
||||
};
|
||||
|
||||
use crate::db::LazyBlockTransaction;
|
||||
@@ -94,6 +94,26 @@ pub fn compute_next_satpoint_data(
|
||||
SatPosition::Output((output_index, (offset_cross_inputs - offset_intra_outputs)))
|
||||
}
|
||||
|
||||
pub fn should_sync_rocks_db(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
) -> Result<Option<(u64, u64)>, String> {
|
||||
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
|
||||
let last_compressed_block = find_last_block_inserted(&blocks_db) as u64;
|
||||
let last_indexed_block = match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)? {
|
||||
Some(last_indexed_block) => last_indexed_block,
|
||||
None => 0
|
||||
};
|
||||
|
||||
let res = if last_compressed_block < last_indexed_block {
|
||||
Some((last_compressed_block, last_indexed_block))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub fn should_sync_ordhook_db(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
@@ -110,7 +130,7 @@ pub fn should_sync_ordhook_db(
|
||||
}
|
||||
};
|
||||
|
||||
let blocks_db = open_readonly_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
|
||||
let mut start_block = find_last_block_inserted(&blocks_db) as u64;
|
||||
|
||||
if start_block == 0 {
|
||||
|
||||
@@ -1,18 +1,16 @@
|
||||
use std::{
|
||||
sync::mpsc::Sender,
|
||||
thread::{sleep, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crossbeam_channel::{Sender, TryRecvError};
|
||||
use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
|
||||
use crossbeam_channel::TryRecvError;
|
||||
use rocksdb::DB;
|
||||
|
||||
use crate::{
|
||||
config::Config,
|
||||
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
|
||||
db::{
|
||||
insert_entry_in_blocks, open_ordhook_db_conn_rocks_db_loop,
|
||||
insert_entry_in_blocks,
|
||||
open_readwrite_ordhook_db_conn_rocks_db, LazyBlock,
|
||||
},
|
||||
};
|
||||
@@ -30,10 +28,9 @@ pub fn start_block_archiving_processor(
|
||||
let ctx = ctx.clone();
|
||||
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
|
||||
.spawn(move || {
|
||||
let mut blocks_db_rw =
|
||||
let blocks_db_rw =
|
||||
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
|
||||
.unwrap();
|
||||
let mut empty_cycles = 0;
|
||||
let mut processed_blocks = 0;
|
||||
|
||||
loop {
|
||||
@@ -49,16 +46,7 @@ pub fn start_block_archiving_processor(
|
||||
}
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => {
|
||||
empty_cycles += 1;
|
||||
if empty_cycles == 30 {
|
||||
warn!(ctx.expect_logger(), "Block processor reached expiration");
|
||||
let _ = events_tx.send(PostProcessorEvent::Expired);
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
if empty_cycles > 120 {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
@@ -71,11 +59,6 @@ pub fn start_block_archiving_processor(
|
||||
|
||||
if processed_blocks % 10_000 == 0 {
|
||||
let _ = blocks_db_rw.flush_wal(true);
|
||||
blocks_db_rw = open_ordhook_db_conn_rocks_db_loop(
|
||||
true,
|
||||
&config.expected_cache_path(),
|
||||
&ctx,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -228,7 +228,7 @@ pub fn open_readonly_ordhook_db_conn_rocks_db(
|
||||
opts.set_disable_auto_compactions(true);
|
||||
opts.set_max_background_jobs(0);
|
||||
let db = DB::open_for_read_only(&opts, path, false)
|
||||
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
|
||||
.map_err(|e| format!("unable to open hord.rocksdb: {}", e.to_string()))?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
@@ -276,7 +276,7 @@ pub fn open_readwrite_ordhook_db_conn_rocks_db(
|
||||
let path = get_default_ordhook_db_file_path_rocks_db(&base_dir);
|
||||
let opts = rocks_db_default_options();
|
||||
let db = DB::open(&opts, path)
|
||||
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
|
||||
.map_err(|e| format!("unable to open hord.rocksdb: {}", e.to_string()))?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ use flate2::read::GzDecoder;
|
||||
use futures_util::StreamExt;
|
||||
use progressing::mapping::Bar as MappingBar;
|
||||
use progressing::Baring;
|
||||
use tar::Archive;
|
||||
use std::fs;
|
||||
use std::io::{self, Cursor};
|
||||
use std::io::{Read, Write};
|
||||
@@ -19,7 +20,7 @@ pub fn default_sqlite_sha_file_path(_network: &BitcoinNetwork) -> String {
|
||||
}
|
||||
|
||||
pub async fn download_sqlite_file(config: &Config, _ctx: &Context) -> Result<(), String> {
|
||||
let mut destination_path = config.expected_cache_path();
|
||||
let destination_path = config.expected_cache_path();
|
||||
std::fs::create_dir_all(&destination_path).unwrap_or_else(|e| {
|
||||
println!("{}", e.to_string());
|
||||
});
|
||||
@@ -46,22 +47,21 @@ pub async fn download_sqlite_file(config: &Config, _ctx: &Context) -> Result<(),
|
||||
|
||||
// Download chunks
|
||||
let (tx, rx) = flume::bounded(0);
|
||||
destination_path.push(default_sqlite_file_path(&config.network.bitcoin_network));
|
||||
|
||||
let decoder_thread = std::thread::spawn(move || {
|
||||
let input = ChannelRead::new(rx);
|
||||
let mut decoder = GzDecoder::new(input);
|
||||
let mut content = Vec::new();
|
||||
let _ = decoder.read_to_end(&mut content);
|
||||
let mut file = fs::File::create(&destination_path).unwrap();
|
||||
if let Err(e) = file.write_all(&content[..]) {
|
||||
let mut archive = Archive::new(&content[..]);
|
||||
if let Err(e) = archive.unpack(&destination_path) {
|
||||
println!("unable to write file: {}", e.to_string());
|
||||
std::process::exit(1);
|
||||
}
|
||||
});
|
||||
|
||||
if res.status() == reqwest::StatusCode::OK {
|
||||
let limit = 5_400_000_000;
|
||||
let limit = res.content_length().unwrap_or(10_000_000_000) as i64;
|
||||
let mut progress_bar = MappingBar::with_range(0i64, limit);
|
||||
progress_bar.set_len(60);
|
||||
let mut stdout = std::io::stdout();
|
||||
|
||||
@@ -234,14 +234,14 @@ pub async fn process_block_with_predicates(
|
||||
predicates: &Vec<&BitcoinChainhookSpecification>,
|
||||
event_observer_config: &EventObserverConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<u32, ()> {
|
||||
) -> Result<u32, String> {
|
||||
let chain_event =
|
||||
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
|
||||
new_blocks: vec![block],
|
||||
confirmed_blocks: vec![],
|
||||
});
|
||||
|
||||
let (predicates_triggered, _predicates_evaluated) =
|
||||
let (predicates_triggered, _predicates_evaluated, _) =
|
||||
evaluate_bitcoin_chainhooks_on_chain_event(&chain_event, predicates, ctx);
|
||||
|
||||
execute_predicates_action(predicates_triggered, &event_observer_config, &ctx).await
|
||||
@@ -251,7 +251,7 @@ pub async fn execute_predicates_action<'a>(
|
||||
hits: Vec<BitcoinTriggerChainhook<'a>>,
|
||||
config: &EventObserverConfig,
|
||||
ctx: &Context,
|
||||
) -> Result<u32, ()> {
|
||||
) -> Result<u32, String> {
|
||||
let mut actions_triggered = 0;
|
||||
let mut proofs = HashMap::new();
|
||||
for trigger in hits.into_iter() {
|
||||
|
||||
@@ -99,7 +99,7 @@ fn handle_get_predicates(
|
||||
|
||||
let serialized_predicates = predicates
|
||||
.iter()
|
||||
.map(|(p, _)| p.into_serialized_json())
|
||||
.map(|(p, s)| serialized_predicate_with_status(p, s))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Json(json!({
|
||||
@@ -311,3 +311,27 @@ pub fn load_predicates_from_redis(
|
||||
.map_err(|e| format!("unable to connect to redis: {}", e.to_string()))?;
|
||||
get_entries_from_predicates_db(&mut predicate_db_conn, ctx)
|
||||
}
|
||||
|
||||
fn serialized_predicate_with_status(
|
||||
predicate: &ChainhookSpecification,
|
||||
status: &PredicateStatus,
|
||||
) -> JsonValue {
|
||||
match (predicate, status) {
|
||||
(ChainhookSpecification::Stacks(spec), status) => json!({
|
||||
"chain": "stacks",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": status,
|
||||
"enabled": spec.enabled,
|
||||
}),
|
||||
(ChainhookSpecification::Bitcoin(spec), status) => json!({
|
||||
"chain": "bitcoin",
|
||||
"uuid": spec.uuid,
|
||||
"network": spec.network,
|
||||
"predicate": spec.predicate,
|
||||
"status": status,
|
||||
"enabled": spec.enabled,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ mod runloops;
|
||||
|
||||
use crate::config::{Config, PredicatesApi};
|
||||
use crate::core::pipeline::download_and_pipeline_blocks;
|
||||
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
|
||||
use crate::core::pipeline::processors::inscription_indexing::process_block;
|
||||
use crate::core::pipeline::processors::start_inscription_indexing_processor;
|
||||
use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor;
|
||||
@@ -11,7 +12,7 @@ use crate::core::protocol::inscription_parsing::{
|
||||
get_inscriptions_revealed_in_block, parse_inscriptions_in_standardized_block,
|
||||
};
|
||||
use crate::core::protocol::inscription_sequencing::SequenceCursor;
|
||||
use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db};
|
||||
use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db, should_sync_rocks_db};
|
||||
use crate::db::{
|
||||
delete_data_in_ordhook_db, insert_entry_in_blocks,
|
||||
update_inscriptions_with_block, update_locations_with_block,
|
||||
@@ -465,6 +466,38 @@ impl Service {
|
||||
&self,
|
||||
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
|
||||
) -> Result<(), String> {
|
||||
// First, make sure that rocksdb and sqlite are aligned.
|
||||
// If rocksdb.chain_tip.height <= sqlite.chain_tip.height
|
||||
// Perform some block compression until that height.
|
||||
if let Some((start_block, end_block)) = should_sync_rocks_db(&self.config, &self.ctx)? {
|
||||
let blocks_post_processor = start_block_archiving_processor(
|
||||
&self.config,
|
||||
&self.ctx,
|
||||
false,
|
||||
block_post_processor.clone(),
|
||||
);
|
||||
|
||||
self.ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"Compressing blocks (from #{start_block} to #{end_block})"
|
||||
)
|
||||
});
|
||||
|
||||
let ordhook_config = self.config.get_ordhook_config();
|
||||
let first_inscription_height = ordhook_config.first_inscription_height;
|
||||
let blocks = BlockHeights::BlockRange(start_block, end_block).get_sorted_entries();
|
||||
download_and_pipeline_blocks(
|
||||
&self.config,
|
||||
blocks.into(),
|
||||
first_inscription_height,
|
||||
Some(&blocks_post_processor),
|
||||
10_000,
|
||||
&self.ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Start predicate processor
|
||||
let mut last_block_processed = 0;
|
||||
while let Some((start_block, end_block, speed)) =
|
||||
|
||||
@@ -248,6 +248,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
|
||||
blocks: None,
|
||||
start_block: None,
|
||||
end_block: None,
|
||||
expired_at: None,
|
||||
expire_after_occurrence: None,
|
||||
predicate: chainhook_sdk::chainhooks::types::BitcoinPredicateType::OrdinalsProtocol(
|
||||
chainhook_sdk::chainhooks::types::OrdinalOperations::InscriptionFeed,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "ordhook-sdk-js"
|
||||
version = "0.4.0"
|
||||
version = "0.5.0"
|
||||
edition = "2021"
|
||||
exclude = ["index.node"]
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ max_caching_memory_size_mb = 32000
|
||||
# Disable the following section if the state
|
||||
# must be built locally
|
||||
[bootstrap]
|
||||
download_url = "https://archive.hiro.so/mainnet/chainhooks/hord.sqlite"
|
||||
download_url = "https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest"
|
||||
|
||||
[logs]
|
||||
ordinals_internals = true
|
||||
|
||||
Reference in New Issue
Block a user