fix: tweak sqlite connections (#217)

* fix: attempt to optimize sqlite connections

* chore: add logs

* feat: introduce sequence metadata table
This commit is contained in:
Ludo Galabru
2023-11-27 17:55:36 -05:00
committed by GitHub
parent a84c9517cf
commit 334565ce13
6 changed files with 206 additions and 100 deletions

View File

@@ -26,9 +26,9 @@ use ordhook::db::{
delete_data_in_ordhook_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
find_latest_inscription_block_height, find_lazy_block_at_block_height,
get_default_ordhook_db_file_path, initialize_ordhook_db, open_readonly_ordhook_db_conn,
open_readonly_ordhook_db_conn_rocks_db, open_readwrite_ordhook_db_conn, open_ordhook_db_conn_rocks_db_loop,
get_default_ordhook_db_file_path, initialize_ordhook_db, open_ordhook_db_conn_rocks_db_loop,
open_readonly_ordhook_db_conn, open_readonly_ordhook_db_conn_rocks_db,
open_readwrite_ordhook_db_conn,
};
use ordhook::download::download_ordinals_dataset_if_required;
use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
@@ -790,25 +790,21 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
)
.await?;
if let Some(true) = cmd.debug {
let blocks_db = open_ordhook_db_conn_rocks_db_loop(false, &config.get_ordhook_config().db_path, ctx);
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
false,
&config.get_ordhook_config().db_path,
ctx,
);
for i in cmd.get_blocks().into_iter() {
let block = find_lazy_block_at_block_height(i as u32, 10, false, &blocks_db, ctx).expect("unable to retrieve block {i}");
info!(
ctx.expect_logger(),
"--------------------"
);
info!(
ctx.expect_logger(),
"Block: {i}"
);
let block =
find_lazy_block_at_block_height(i as u32, 10, false, &blocks_db, ctx)
.expect("unable to retrieve block {i}");
info!(ctx.expect_logger(), "--------------------");
info!(ctx.expect_logger(), "Block: {i}");
for tx in block.iter_tx() {
info!(
ctx.expect_logger(),
"Tx: {}",
ordhook::hex::encode(tx.txid)
);
info!(ctx.expect_logger(), "Tx: {}", ordhook::hex::encode(tx.txid));
}
}
}
}
}
RepairCommand::Inscriptions(cmd) => {
@@ -886,7 +882,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(OrdhookDbCommand::Drop(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
let blocks_db =
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
let inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), ctx)?;

View File

@@ -9,7 +9,7 @@ use std::{
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{insert_entry_in_blocks, LazyBlock, open_ordhook_db_conn_rocks_db_loop},
db::{insert_entry_in_blocks, open_ordhook_db_conn_rocks_db_loop, LazyBlock},
};
pub fn start_block_archiving_processor(

View File

@@ -12,6 +12,7 @@ use chainhook_sdk::{
},
utils::Context,
};
use crossbeam_channel::bounded;
use dashmap::DashMap;
use fxhash::FxHasher;
use rusqlite::{Connection, Transaction};
@@ -71,6 +72,13 @@ pub fn parallelize_inscription_data_computations(
ordhook_config: &OrdhookConfig,
ctx: &Context,
) -> Result<bool, String> {
ctx.try_log(|logger| {
info!(
logger,
"Inscriptions data computation for block #{} started", block.block_identifier.index
)
});
let (mut transactions_ids, l1_cache_hits) =
get_transactions_to_process(block, cache_l1, inscriptions_db_tx, ctx);
@@ -90,7 +98,7 @@ pub fn parallelize_inscription_data_computations(
}
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
let (traversal_tx, traversal_rx) = channel();
let (traversal_tx, traversal_rx) = bounded(64);
let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
@@ -207,6 +215,7 @@ pub fn parallelize_inscription_data_computations(
});
}
}
if traversals_received == expected_traversals {
break;
}
@@ -244,6 +253,12 @@ pub fn parallelize_inscription_data_computations(
}
}
}
ctx.try_log(|logger| {
info!(
logger,
"Inscriptions data computation for block #{} collected", block.block_identifier.index
)
});
// Collect eventual results for incoming blocks
for tx in tx_thread_pool.iter() {
@@ -273,10 +288,21 @@ pub fn parallelize_inscription_data_computations(
let _ = tx.send(None);
}
let ctx_moved = ctx.clone();
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
ctx_moved.try_log(|logger| info!(logger, "Cleanup: threadpool deallocation started",));
for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
ctx_moved.try_log(|logger| info!(logger, "Cleanup: threadpool deallocation ended",));
});
ctx.try_log(|logger| {
info!(
logger,
"Inscriptions data computation for block #{} ended", block.block_identifier.index
)
});
Ok(has_transactions_to_process)
@@ -373,28 +399,27 @@ impl<'a> SequenceCursor<'a> {
self.current_block_height = 0;
}
pub fn pick_next(&mut self, cursed: bool, block_height: u64) -> i64 {
pub fn pick_next(&mut self, cursed: bool, block_height: u64, ctx: &Context) -> i64 {
if block_height < self.current_block_height {
self.reset();
}
self.current_block_height = block_height;
match cursed {
true => self.pick_next_cursed(),
false => self.pick_next_blessed(),
true => self.pick_next_cursed(ctx),
false => self.pick_next_blessed(ctx),
}
}
fn pick_next_blessed(&mut self) -> i64 {
fn pick_next_blessed(&mut self, ctx: &Context) -> i64 {
match self.blessed {
None => {
match find_latest_inscription_number_at_block_height(
&self.current_block_height,
&None,
&self.inscriptions_db_conn,
&Context::empty(),
&ctx,
) {
Ok(Some(inscription_number)) => {
Some(inscription_number) => {
self.blessed = Some(inscription_number);
inscription_number + 1
}
@@ -405,16 +430,15 @@ impl<'a> SequenceCursor<'a> {
}
}
fn pick_next_cursed(&mut self) -> i64 {
fn pick_next_cursed(&mut self, ctx: &Context) -> i64 {
match self.cursed {
None => {
match find_latest_cursed_inscription_number_at_block_height(
&self.current_block_height,
&None,
&self.inscriptions_db_conn,
&Context::empty(),
&ctx,
) {
Ok(Some(inscription_number)) => {
Some(inscription_number) => {
self.cursed = Some(inscription_number);
inscription_number - 1
}
@@ -425,12 +449,12 @@ impl<'a> SequenceCursor<'a> {
}
}
pub fn increment_cursed(&mut self) {
self.cursed = Some(self.pick_next_cursed());
pub fn increment_cursed(&mut self, ctx: &Context) {
self.cursed = Some(self.pick_next_cursed(ctx));
}
pub fn increment_blessed(&mut self) {
self.blessed = Some(self.pick_next_blessed())
pub fn increment_blessed(&mut self, ctx: &Context) {
self.blessed = Some(self.pick_next_blessed(ctx))
}
}
@@ -523,13 +547,14 @@ pub fn augment_block_with_ordinals_inscriptions_data(
continue;
};
let is_curse = inscription_data.curse_type.is_some();
let inscription_number = sequence_cursor.pick_next(is_curse, block.block_identifier.index);
let inscription_number =
sequence_cursor.pick_next(is_curse, block.block_identifier.index, &ctx);
inscription_data.inscription_number = inscription_number;
if is_curse {
sequence_cursor.increment_cursed();
sequence_cursor.increment_cursed(ctx);
} else {
sequence_cursor.increment_blessed();
sequence_cursor.increment_blessed(ctx);
};
ctx.try_log(|logger| {
@@ -594,7 +619,8 @@ fn augment_transaction_with_ordinals_inscriptions_data(
};
// Do we need to curse the inscription?
let mut inscription_number = sequence_cursor.pick_next(is_cursed, block_identifier.index);
let mut inscription_number =
sequence_cursor.pick_next(is_cursed, block_identifier.index, ctx);
let mut curse_type_override = None;
if !is_cursed {
// Is this inscription re-inscribing an existing blessed inscription?
@@ -612,7 +638,8 @@ fn augment_transaction_with_ordinals_inscriptions_data(
});
is_cursed = true;
inscription_number = sequence_cursor.pick_next(is_cursed, block_identifier.index);
inscription_number =
sequence_cursor.pick_next(is_cursed, block_identifier.index, ctx);
curse_type_override = Some(OrdinalInscriptionCurseType::Reinscription)
}
};
@@ -683,9 +710,9 @@ fn augment_transaction_with_ordinals_inscriptions_data(
});
if is_cursed {
sequence_cursor.increment_cursed();
sequence_cursor.increment_cursed(ctx);
} else {
sequence_cursor.increment_blessed();
sequence_cursor.increment_blessed(ctx);
}
}
any_event

View File

@@ -244,7 +244,7 @@ pub fn compute_satoshi_number(
)
});
std::process::exit(1);
},
}
};
let mut sats_out = 0;

View File

@@ -1,7 +1,9 @@
use std::{
collections::BTreeMap,
io::{Read, Write},
path::PathBuf, thread::sleep, time::Duration,
path::PathBuf,
thread::sleep,
time::Duration,
};
use rand::{thread_rng, Rng};
@@ -127,6 +129,30 @@ pub fn initialize_ordhook_db(path: &PathBuf, ctx: &Context) -> Connection {
}
}
if let Err(e) = conn.execute(
"CREATE TABLE IF NOT EXISTS sequence_metadata (
block_height INTEGER NOT NULL,
latest_cursed_inscription_number INTEGER NOT NULL,
latest_inscription_number INTEGER NOT NULL
)",
[],
) {
ctx.try_log(|logger| {
warn!(
logger,
"Unable to create table sequence_metadata: {}",
e.to_string()
)
});
} else {
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS sequence_metadata_indexed_on_block_height ON sequence_metadata(block_height);",
[],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
}
}
conn
}
@@ -161,15 +187,7 @@ pub fn create_or_open_readwrite_db(cache_path: &PathBuf, ctx: &Context) -> Conne
};
std::thread::sleep(std::time::Duration::from_secs(1));
};
// db.profile(Some(trace_profile));
conn.busy_timeout(std::time::Duration::from_secs(300))
.expect("unable to set db timeout");
// let mmap_size: i64 = 256 * 1024 * 1024;
// let page_size: i64 = 16384;
// conn.pragma_update(None, "mmap_size", mmap_size).unwrap();
// conn.pragma_update(None, "page_size", page_size).unwrap();
// conn.pragma_update(None, "synchronous", &"NORMAL").unwrap();
conn
connection_with_defaults_pragma(conn)
}
fn open_existing_readonly_db(path: &PathBuf, ctx: &Context) -> Connection {
@@ -198,9 +216,19 @@ fn open_existing_readonly_db(path: &PathBuf, ctx: &Context) -> Connection {
};
std::thread::sleep(std::time::Duration::from_secs(1));
};
connection_with_defaults_pragma(conn)
}
fn connection_with_defaults_pragma(conn: Connection) -> Connection {
conn.busy_timeout(std::time::Duration::from_secs(300))
.expect("unable to set db timeout");
return conn;
conn.pragma_update(None, "mmap_size", 512 * 1024 * 1024)
.expect("unable to enable mmap_size");
conn.pragma_update(None, "cache_size", 512 * 1024 * 1024)
.expect("unable to enable cache_size");
conn.pragma_update(None, "journal_mode", &"WAL")
.expect("unable to enable wal");
conn
}
fn get_default_ordhook_db_file_path_rocks_db(base_dir: &PathBuf) -> PathBuf {
@@ -472,6 +500,36 @@ pub fn update_locations_with_block(
}
}
pub fn update_sequence_metadata_with_block(
block: &BitcoinBlockData,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
let mut latest_blessed = find_latest_inscription_number_at_block_height(
&block.block_identifier.index,
inscriptions_db_conn_rw,
ctx,
)
.unwrap_or(0);
let mut latest_cursed = find_latest_cursed_inscription_number_at_block_height(
&block.block_identifier.index,
inscriptions_db_conn_rw,
ctx,
)
.unwrap_or(0);
for inscription_data in get_inscriptions_revealed_in_block(&block).iter() {
latest_blessed = latest_blessed.max(inscription_data.inscription_number);
latest_cursed = latest_cursed.min(inscription_data.inscription_number);
}
while let Err(e) = inscriptions_db_conn_rw.execute(
"INSERT INTO sequence_metadata (block_height, latest_inscription_number, latest_cursed_inscription_number) VALUES (?1, ?2, ?3)",
rusqlite::params![&block.block_identifier.index, latest_blessed, latest_cursed],
) {
ctx.try_log(|logger| warn!(logger, "unable to update sequence_metadata: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
pub fn insert_new_inscriptions_from_block_in_locations(
block: &BitcoinBlockData,
inscriptions_db_conn_rw: &Connection,
@@ -805,56 +863,70 @@ pub fn find_all_inscription_transfers(
pub fn find_latest_inscription_number_at_block_height(
block_height: &u64,
latest_blessed_inscription_heigth: &Option<u64>,
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<i64>, String> {
let (query, hint) = match latest_blessed_inscription_heigth {
Some(hint) => {
(
"SELECT inscription_number FROM inscriptions WHERE block_height = ? ORDER BY inscription_number DESC LIMIT 1",
*hint
)
}
None => {
(
"SELECT inscription_number FROM inscriptions WHERE block_height < ? ORDER BY inscription_number DESC LIMIT 1",
*block_height
)
}
};
let entry = perform_query_one(query, &[&hint.to_sql().unwrap()], db_conn, ctx, |row| {
) -> Option<i64> {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let query = "SELECT latest_inscription_number FROM sequence_metadata WHERE block_height < ? ORDER BY block_height DESC LIMIT 1";
perform_query_one(query, args, db_conn, ctx, |row| {
let inscription_number: i64 = row.get(0).unwrap();
inscription_number
});
Ok(entry)
})
.or_else(|| compute_latest_inscription_number_at_block_height(block_height, db_conn, ctx))
}
pub fn find_latest_cursed_inscription_number_at_block_height(
block_height: &u64,
latest_cursed_inscription_heigth: &Option<u64>,
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<i64>, String> {
let (query, hint) = match latest_cursed_inscription_heigth {
Some(hint) => {
(
"SELECT inscription_number FROM inscriptions WHERE block_height = ? ORDER BY inscription_number ASC LIMIT 1",
*hint
)
}
None => {
(
"SELECT inscription_number FROM inscriptions WHERE block_height < ? ORDER BY inscription_number ASC LIMIT 1",
*block_height
)
}
};
let entry = perform_query_one(query, &[&hint.to_sql().unwrap()], db_conn, ctx, |row| {
) -> Option<i64> {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let query = "SELECT latest_cursed_inscription_number FROM sequence_metadata WHERE block_height < ? ORDER BY block_height DESC LIMIT 1";
perform_query_one(query, args, db_conn, ctx, |row| {
let inscription_number: i64 = row.get(0).unwrap();
inscription_number
})
.or_else(|| {
compute_latest_cursed_inscription_number_at_block_height(block_height, db_conn, ctx)
})
}
pub fn compute_latest_inscription_number_at_block_height(
block_height: &u64,
db_conn: &Connection,
ctx: &Context,
) -> Option<i64> {
ctx.try_log(|logger| {
warn!(
logger,
"Start computing latest_inscription_number at block height: {block_height}"
)
});
Ok(entry)
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let query = "SELECT inscription_number FROM inscriptions WHERE block_height < ? ORDER BY inscription_number DESC LIMIT 1";
perform_query_one(query, args, db_conn, ctx, |row| {
let inscription_number: i64 = row.get(0).unwrap();
inscription_number
})
}
pub fn compute_latest_cursed_inscription_number_at_block_height(
block_height: &u64,
db_conn: &Connection,
ctx: &Context,
) -> Option<i64> {
ctx.try_log(|logger| {
warn!(
logger,
"Start computing latest_cursed_inscription_number at block height: {block_height}"
)
});
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let query = "SELECT inscription_number FROM inscriptions WHERE block_height < ? ORDER BY inscription_number ASC LIMIT 1";
perform_query_one(query, args, db_conn, ctx, |row| {
let inscription_number: i64 = row.get(0).unwrap();
inscription_number
})
}
pub fn find_blessed_inscription_with_ordinal_number(
@@ -1069,6 +1141,13 @@ pub fn delete_inscriptions_in_block_range(
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
while let Err(e) = inscriptions_db_conn_rw.execute(
"DELETE FROM sequence_metadata WHERE block_height >= ?1 AND block_height <= ?2",
rusqlite::params![&start_block, &end_block],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
pub fn remove_entry_from_inscriptions(
@@ -1438,12 +1517,12 @@ impl LazyBlock {
let u16_max = u16::MAX as usize;
for tx in block.tx.iter().skip(1) {
let inputs_len = if tx.vin.len() > u16_max {
0
0
} else {
tx.vin.len() as u16
};
let outputs_len = if tx.vout.len() > u16_max {
0
0
} else {
tx.vout.len() as u16
};
@@ -1478,12 +1557,12 @@ impl LazyBlock {
buffer.write_all(&txid)?;
let inputs_len = if tx.vin.len() > u16_max {
0
0
} else {
tx.vin.len() as usize
};
let outputs_len = if tx.vout.len() > u16_max {
0
0
} else {
tx.vout.len() as usize
};

View File

@@ -15,9 +15,10 @@ use crate::core::protocol::inscription_parsing::{
use crate::core::protocol::inscription_sequencing::SequenceCursor;
use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db, should_sync_rocks_db};
use crate::db::{
delete_data_in_ordhook_db, insert_entry_in_blocks, open_readwrite_ordhook_db_conn,
open_ordhook_db_conn_rocks_db_loop, open_readwrite_ordhook_dbs,
update_inscriptions_with_block, update_locations_with_block, LazyBlock, LazyBlockTransaction,
delete_data_in_ordhook_db, insert_entry_in_blocks, open_ordhook_db_conn_rocks_db_loop,
open_readwrite_ordhook_db_conn, open_readwrite_ordhook_dbs, update_inscriptions_with_block,
update_locations_with_block, update_sequence_metadata_with_block, LazyBlock,
LazyBlockTransaction,
};
use crate::scan::bitcoin::process_block_with_predicates;
use crate::service::http_api::start_predicate_api_server;
@@ -624,6 +625,8 @@ fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ct
update_inscriptions_with_block(&block, &inscriptions_db_conn_rw, &ctx);
update_locations_with_block(&block, &inscriptions_db_conn_rw, &ctx);
update_sequence_metadata_with_block(&block, &inscriptions_db_conn_rw, &ctx);
}
}
}
@@ -730,6 +733,7 @@ pub fn chainhook_sidecar_mutate_blocks(
if cache.processed_by_sidecar {
update_inscriptions_with_block(&cache.block, &inscriptions_db_tx, &ctx);
update_locations_with_block(&cache.block, &inscriptions_db_tx, &ctx);
update_sequence_metadata_with_block(&cache.block, &inscriptions_db_tx, &ctx);
} else {
updated_blocks_ids.push(format!("{}", cache.block.block_identifier.index));