fix: only create brc-20 db connection and cache if required (#357)

* fix: unify db create

* fix: test

* fix: create on drop

* fix: build

* chore: remove warnings

* fix: add some logs

* fix: blocking thread
This commit is contained in:
Rafael Cárdenas
2024-09-04 08:59:21 -06:00
committed by GitHub
parent 7a65fdf107
commit 5692426e4b
8 changed files with 111 additions and 136 deletions

View File

@@ -17,7 +17,7 @@ use ordhook::chainhook_sdk::utils::BlockHeights;
use ordhook::chainhook_sdk::utils::Context;
use ordhook::config::Config;
use ordhook::core::meta_protocols::brc20::db::{
get_brc20_operations_on_block, open_readwrite_brc20_db_conn,
brc20_new_rw_db_conn, get_brc20_operations_on_block,
};
use ordhook::core::new_traversals_lazy_cache;
use ordhook::core::pipeline::download_and_pipeline_blocks;
@@ -42,10 +42,10 @@ use reqwest::Client as HttpClient;
use std::collections::HashSet;
use std::io::{BufReader, Read};
use std::path::PathBuf;
use std::process;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use std::{process, u64};
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
@@ -729,8 +729,8 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let maintenance_enabled =
std::env::var("ORDHOOK_MAINTENANCE").unwrap_or("0".into());
if maintenance_enabled.eq("1") {
info!(ctx.expect_logger(), "Entering maintenance mode (default duration = 7 days). Unset ORDHOOK_MAINTENANCE and reboot to resume operations");
sleep(Duration::from_secs(3600 * 24 * 7))
try_info!(ctx, "Entering maintenance mode. Unset ORDHOOK_MAINTENANCE and reboot to resume operations");
sleep(Duration::from_secs(u64::MAX))
}
let config = ConfigFile::default(
@@ -745,7 +745,6 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let last_known_block =
find_latest_inscription_block_height(&db_connections.ordhook, ctx)?;
if last_known_block.is_none() {
// Create rocksdb
open_ordhook_db_conn_rocks_db_loop(
true,
&config.expected_cache_path(),
@@ -756,12 +755,6 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
}
let ordhook_config = config.get_ordhook_config();
let version = env!("GIT_COMMIT");
info!(
ctx.expect_logger(),
"Starting service (git_commit = {})...", version
);
let start_block = match cmd.start_at_block {
Some(entry) => entry,
None => match last_known_block {
@@ -961,14 +954,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
config.resources.memory_available,
&ctx,
)?;
let brc_20_db_conn_rw = if config.meta_protocols.brc20 {
Some(open_readwrite_brc20_db_conn(
&config.expected_cache_path(),
ctx,
)?)
} else {
None
};
let brc_20_db_conn_rw = brc20_new_rw_db_conn(&config, ctx);
delete_data_in_ordhook_db(
cmd.start_block,

View File

@@ -7,7 +7,7 @@ use chainhook_sdk::{
use lru::LruCache;
use rusqlite::{Connection, Transaction};
use crate::core::meta_protocols::brc20::db::get_unsent_token_transfer;
use crate::{config::Config, core::meta_protocols::brc20::db::get_unsent_token_transfer};
use super::{
db::{
@@ -17,6 +17,15 @@ use super::{
verifier::{VerifiedBrc20BalanceData, VerifiedBrc20TokenDeployData, VerifiedBrc20TransferData},
};
/// If the given `config` has BRC-20 enabled, returns a BRC-20 memory cache.
pub fn brc20_new_cache(config: &Config) -> Option<Brc20MemoryCache> {
if config.meta_protocols.brc20 {
Some(Brc20MemoryCache::new(config.resources.brc20_lru_cache_size))
} else {
None
}
}
/// Keeps BRC20 DB rows before they're inserted into SQLite. Use `flush` to insert.
pub struct Brc20DbCache {
ledger_rows: Vec<Brc20DbLedgerRow>,

View File

@@ -1,11 +1,9 @@
use std::{collections::HashMap, path::PathBuf};
use crate::{
db::{
create_or_open_readwrite_db, open_existing_readonly_db, perform_query_one,
perform_query_set,
},
try_warn,
config::Config,
db::{create_or_open_readwrite_db, perform_query_one, perform_query_set},
try_error, try_warn,
};
use chainhook_sdk::{
types::{
@@ -44,6 +42,21 @@ pub struct Brc20DbLedgerRow {
pub operation: String,
}
/// If the given `config` has BRC-20 enabled, returns a read/write DB connection for BRC-20.
pub fn brc20_new_rw_db_conn(config: &Config, ctx: &Context) -> Option<Connection> {
if config.meta_protocols.brc20 {
match open_readwrite_brc20_db_conn(&config.expected_cache_path(), &ctx) {
Ok(db) => Some(db),
Err(e) => {
try_error!(ctx, "Unable to open readwrite brc20 connection: {e}");
None
}
}
} else {
None
}
}
pub fn get_default_brc20_db_file_path(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
destination_path.push("brc20.sqlite");
@@ -132,24 +145,12 @@ pub fn initialize_brc20_db(base_dir: Option<&PathBuf>, ctx: &Context) -> Connect
conn
}
pub fn open_readwrite_brc20_db_conn(
base_dir: &PathBuf,
ctx: &Context,
) -> Result<Connection, String> {
fn open_readwrite_brc20_db_conn(base_dir: &PathBuf, ctx: &Context) -> Result<Connection, String> {
let db_path = get_default_brc20_db_file_path(&base_dir);
let conn = create_or_open_readwrite_db(Some(&db_path), ctx);
Ok(conn)
}
pub fn open_readonly_brc20_db_conn(
base_dir: &PathBuf,
ctx: &Context,
) -> Result<Connection, String> {
let db_path = get_default_brc20_db_file_path(&base_dir);
let conn = open_existing_readonly_db(&db_path, ctx);
Ok(conn)
}
pub fn delete_activity_in_block_range(
start_block: u32,
end_block: u32,

View File

@@ -19,7 +19,10 @@ use std::hash::BuildHasherDefault;
use crate::{
core::{
meta_protocols::brc20::{cache::Brc20MemoryCache, db::open_readwrite_brc20_db_conn},
meta_protocols::brc20::{
cache::{brc20_new_cache, Brc20MemoryCache},
db::brc20_new_rw_db_conn,
},
pipeline::processors::block_archiving::store_compacted_blocks,
protocol::{
inscription_parsing::{
@@ -79,6 +82,9 @@ pub fn start_inscription_indexing_processor(
open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx).unwrap();
let mut sequence_cursor = SequenceCursor::new(&inscriptions_db_conn);
let mut brc20_cache = brc20_new_cache(&config);
let mut brc20_db_conn_rw = brc20_new_rw_db_conn(&config, &ctx);
loop {
let (compacted_blocks, mut blocks) = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
@@ -133,6 +139,8 @@ pub fn start_inscription_indexing_processor(
&mut sequence_cursor,
&cache_l2,
&mut inscriptions_db_conn_rw,
&mut brc20_cache,
&mut brc20_db_conn_rw,
&ordhook_config,
&post_processor,
&ctx,
@@ -169,26 +177,18 @@ pub fn process_blocks(
sequence_cursor: &mut SequenceCursor,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
inscriptions_db_conn_rw: &mut Connection,
brc20_cache: &mut Option<Brc20MemoryCache>,
brc20_db_conn_rw: &mut Option<Connection>,
ordhook_config: &OrdhookConfig,
post_processor: &Option<Sender<BitcoinBlockData>>,
ctx: &Context,
) -> Vec<BitcoinBlockData> {
let mut cache_l1 = BTreeMap::new();
let mut updated_blocks = vec![];
let mut brc20_db_conn_rw = match open_readwrite_brc20_db_conn(&ordhook_config.db_path, &ctx) {
Ok(dbs) => dbs,
Err(e) => {
panic!("Unable to open readwrite connection: {e}");
}
};
let mut brc20_cache = Brc20MemoryCache::new(ordhook_config.resources.brc20_lru_cache_size);
for _cursor in 0..next_blocks.len() {
let inscriptions_db_tx: rusqlite::Transaction<'_> =
inscriptions_db_conn_rw.transaction().unwrap();
let brc20_db_tx = brc20_db_conn_rw.transaction().unwrap();
let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap();
let brc20_db_tx = brc20_db_conn_rw.as_mut().map(|c| c.transaction().unwrap());
let mut block = next_blocks.remove(0);
@@ -214,8 +214,8 @@ pub fn process_blocks(
&mut cache_l1,
cache_l2,
&inscriptions_db_tx,
Some(&brc20_db_tx),
&mut brc20_cache,
brc20_db_tx.as_ref(),
brc20_cache.as_mut(),
ordhook_config,
ctx,
);
@@ -242,21 +242,20 @@ pub fn process_blocks(
block.block_identifier.index,
);
let _ = inscriptions_db_tx.rollback();
let _ = brc20_db_tx.rollback();
let _ = brc20_db_tx.map(|t| t.rollback());
} else {
match inscriptions_db_tx.commit() {
Ok(_) => match brc20_db_tx.commit() {
Ok(_) => {}
Err(_) => {
// delete_data_in_ordhook_db(
// block.block_identifier.index,
// block.block_identifier.index,
// ordhook_config,
// ctx,
// );
todo!()
Ok(_) => {
if let Some(brc20_db_tx) = brc20_db_tx {
match brc20_db_tx.commit() {
Ok(_) => {}
Err(_) => {
// TODO: Synchronize rollbacks and commits between BRC-20 and inscription DBs.
todo!()
}
}
}
},
}
Err(e) => {
try_error!(
ctx,
@@ -284,7 +283,7 @@ pub fn process_block(
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
inscriptions_db_tx: &Transaction,
brc20_db_tx: Option<&Transaction>,
brc20_cache: &mut Brc20MemoryCache,
brc20_cache: Option<&mut Brc20MemoryCache>,
ordhook_config: &OrdhookConfig,
ctx: &Context,
) -> Result<(), String> {
@@ -322,14 +321,15 @@ pub fn process_block(
// Handle transfers
let _ = augment_block_with_ordinals_transfer_data(block, inscriptions_db_tx, true, &inner_ctx);
if let Some(brc20_db_tx) = brc20_db_tx {
write_brc20_block_operations(
match (brc20_db_tx, brc20_cache) {
(Some(brc20_db_tx), Some(brc20_cache)) => write_brc20_block_operations(
block,
&mut brc20_operation_map,
brc20_cache,
&brc20_db_tx,
brc20_db_tx,
&ctx,
);
),
_ => {}
}
Ok(())

View File

@@ -560,7 +560,7 @@ pub fn update_ordinals_db_with_block(
let (tx, output_index, offset) =
parse_satpoint_to_watch(&inscription_data.satpoint_post_inscription);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
let insertion_res = locations_to_insert.insert(
let _ = locations_to_insert.insert(
(inscription_data.ordinal_number, outpoint_to_watch),
OrdinalLocation {
offset,
@@ -568,21 +568,13 @@ pub fn update_ordinals_db_with_block(
tx_index: inscription_data.tx_index,
},
);
if let Some(prev_location) = insertion_res {
try_warn!(
ctx,
"Ignoring location insertion from inscriptions: {}, {:?}",
inscription_data.ordinal_number,
prev_location
);
}
}
for transfer_data in get_inscriptions_transferred_in_block(&block).iter() {
let (tx, output_index, offset) =
parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
let insertion_res = locations_to_insert.insert(
let _ = locations_to_insert.insert(
(transfer_data.ordinal_number, outpoint_to_watch),
OrdinalLocation {
offset,
@@ -590,14 +582,6 @@ pub fn update_ordinals_db_with_block(
tx_index: transfer_data.tx_index,
},
);
if let Some(prev_location) = insertion_res {
try_warn!(
ctx,
"Ignoring location insertion from transfers: {}, {:?}",
transfer_data.ordinal_number,
prev_location
);
}
}
for ((ordinal_number, outpoint_to_watch), location_data) in locations_to_insert {

View File

@@ -54,10 +54,7 @@ pub async fn start_observers_http_server(
let _ = hiro_system_kit::thread_named("observers_api-events").spawn(move || loop {
let event = match observer_event_rx.recv() {
Ok(cmd) => cmd,
Err(e) => {
try_error!(&moved_ctx, "Error: broken channel {}", e.to_string());
break;
}
Err(_) => break
};
match event {
ObserverEvent::PredicateRegistered(spec) => {
@@ -136,10 +133,6 @@ pub async fn start_observers_http_server(
)
}
}
ObserverEvent::Terminate => {
try_info!(&moved_ctx, "Terminating runloop");
break;
}
_ => {}
}
});
@@ -155,6 +148,7 @@ async fn build_server(
let PredicatesApi::On(ref api_config) = config.http_api else {
unreachable!();
};
try_info!(ctx, "Listening on port {} for chainhook predicate registrations", api_config.http_port);
let moved_config = config.clone();
let moved_ctx = ctx.clone();
let moved_observer_commands_tx = observer_command_tx.clone();

View File

@@ -4,9 +4,9 @@ mod runloops;
use crate::config::{Config, PredicatesApi};
use crate::core::meta_protocols::brc20::brc20_activation_height;
use crate::core::meta_protocols::brc20::cache::Brc20MemoryCache;
use crate::core::meta_protocols::brc20::cache::{brc20_new_cache, Brc20MemoryCache};
use crate::core::meta_protocols::brc20::db::{
open_readwrite_brc20_db_conn, write_augmented_block_to_brc20_db,
brc20_new_rw_db_conn, write_augmented_block_to_brc20_db,
};
use crate::core::meta_protocols::brc20::parser::ParsedBrc20Operation;
use crate::core::meta_protocols::brc20::verifier::{
@@ -97,8 +97,8 @@ impl Service {
// Catch-up with chain tip
self.catch_up_with_chain_tip(false, check_blocks_integrity, block_post_processor)
.await?;
info!(
self.ctx.expect_logger(),
try_info!(
self.ctx,
"Database up to date, service will start streaming blocks"
);
@@ -206,6 +206,7 @@ impl Service {
Ok((observer_command_tx, observer_event_rx))
}
// TODO: Deprecated? Only used by ordhook-sdk-js.
pub fn start_main_runloop(
&self,
_observer_command_tx: &std::sync::mpsc::Sender<ObserverCommand>,
@@ -242,6 +243,8 @@ impl Service {
Ok(())
}
/// Starts the predicates HTTP server and the main Bitcoin processing runloop that will wait for ZMQ messages to arrive in
/// order to index blocks. This function will block the main thread indefinitely.
pub fn start_main_runloop_with_dynamic_predicates(
&self,
observer_command_tx: &std::sync::mpsc::Sender<ObserverCommand>,
@@ -269,17 +272,36 @@ impl Service {
let moved_config = self.config.clone();
let moved_ctx = self.ctx.clone();
let moved_observer_commands_tx = observer_command_tx.clone();
let moved_observer_event_rx = observer_event_rx.clone();
let _ = hiro_system_kit::thread_named("HTTP Observers API").spawn(move || {
let _ = hiro_system_kit::nestable_block_on(start_observers_http_server(
&moved_config,
&moved_observer_commands_tx,
observer_event_rx,
moved_observer_event_rx,
bitcoin_scan_op_tx,
&moved_ctx,
));
});
}
// Block the main thread indefinitely until the chainhook-sdk channel is closed.
loop {
let event = match observer_event_rx.recv() {
Ok(cmd) => cmd,
Err(e) => {
try_error!(self.ctx, "Error: broken channel {}", e.to_string());
break;
}
};
match event {
ObserverEvent::Terminate => {
try_info!(&self.ctx, "Terminating runloop");
break;
}
_ => {}
}
}
Ok(())
}
@@ -322,6 +344,7 @@ impl Service {
bitcoin_chain_event_notifier: Some(chain_event_notifier_tx),
};
let cache_l2 = Arc::new(new_traversals_lazy_cache(100_000));
let mut brc20_cache = brc20_new_cache(&self.config);
let ctx = self.ctx.clone();
let config = self.config.clone();
@@ -333,6 +356,7 @@ impl Service {
&mut blocks_to_mutate,
&blocks_ids_to_rollback,
&cache_l2,
&mut brc20_cache,
&config,
&ctx,
);
@@ -527,17 +551,7 @@ fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ct
return;
}
};
let brc_20_db_conn_rw = if config.meta_protocols.brc20 {
match open_readwrite_brc20_db_conn(&config.expected_cache_path(), ctx) {
Ok(dbs) => Some(dbs),
Err(e) => {
try_error!(ctx, "Unable to open readwrite brc20 connection: {e}");
return;
}
}
} else {
None
};
let brc20_db_conn_rw = brc20_new_rw_db_conn(config, ctx);
match command {
HandleBlock::UndoBlock(block) => {
@@ -551,7 +565,7 @@ fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ct
block.block_identifier.index,
&inscriptions_db_conn_rw,
&blocks_db_rw,
&brc_20_db_conn_rw,
&brc20_db_conn_rw,
ctx,
);
@@ -591,7 +605,7 @@ fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ct
update_sequence_metadata_with_block(&block, &inscriptions_db_conn_rw, &ctx);
if let Some(brc20_conn_rw) = brc_20_db_conn_rw {
if let Some(brc20_conn_rw) = brc20_db_conn_rw {
write_augmented_block_to_brc20_db(&block, &brc20_conn_rw, ctx);
}
}
@@ -637,6 +651,7 @@ pub fn chainhook_sidecar_mutate_blocks(
blocks_to_mutate: &mut Vec<BitcoinBlockDataCached>,
blocks_ids_to_rollback: &Vec<BlockIdentifier>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
brc20_cache: &mut Option<Brc20MemoryCache>,
config: &Config,
ctx: &Context,
) {
@@ -654,19 +669,7 @@ pub fn chainhook_sidecar_mutate_blocks(
return;
}
};
let mut brc_20_db_conn_rw = if config.meta_protocols.brc20 {
match open_readwrite_brc20_db_conn(&config.expected_cache_path(), ctx) {
Ok(db) => Some(db),
Err(e) => {
try_error!(ctx, "Unable to open readwrite brc20 connection: {e}");
return;
}
}
} else {
None
};
let mut brc20_cache = Brc20MemoryCache::new(config.resources.brc20_lru_cache_size);
let mut brc20_db_conn_rw = brc20_new_rw_db_conn(config, ctx);
for block_id_to_rollback in blocks_ids_to_rollback.iter() {
if let Err(e) = delete_data_in_ordhook_db(
@@ -674,7 +677,7 @@ pub fn chainhook_sidecar_mutate_blocks(
block_id_to_rollback.index,
&inscriptions_db_conn_rw,
&blocks_db_rw,
&brc_20_db_conn_rw,
&brc20_db_conn_rw,
&ctx,
) {
try_error!(
@@ -685,11 +688,7 @@ pub fn chainhook_sidecar_mutate_blocks(
}
}
let brc20_db_tx = if let Some(ref mut db_conn) = brc_20_db_conn_rw {
Some(db_conn.transaction().unwrap())
} else {
None
};
let brc20_db_tx = brc20_db_conn_rw.as_mut().map(|c| c.transaction().unwrap());
let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap();
let ordhook_config = config.get_ordhook_config();
@@ -736,7 +735,7 @@ pub fn chainhook_sidecar_mutate_blocks(
&cache_l2,
&inscriptions_db_tx,
brc20_db_tx.as_ref(),
&mut brc20_cache,
brc20_cache.as_mut(),
&ordhook_config,
&ctx,
);
@@ -766,6 +765,8 @@ pub fn chainhook_sidecar_mutate_blocks(
}
}
/// Writes BRC-20 data already included in the augmented `BitcoinBlockData` onto the BRC-20 database. Only called if BRC-20 is
/// enabled.
pub fn write_brc20_block_operations(
block: &mut BitcoinBlockData,
brc20_operation_map: &mut HashMap<String, ParsedBrc20Operation>,

View File

@@ -13,7 +13,7 @@ use crate::{
service::observers::{
open_readwrite_observers_db_conn_or_panic, update_observer_streaming_enabled,
},
try_error,
try_error, try_info,
};
pub fn start_bitcoin_scan_runloop(
@@ -22,8 +22,8 @@ pub fn start_bitcoin_scan_runloop(
observer_command_tx: Sender<ObserverCommand>,
ctx: &Context,
) {
try_info!(ctx, "Starting bitcoin scan runloop");
let bitcoin_scan_pool = ThreadPool::new(config.resources.expected_observers_count);
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();