feat: introduce migration script

This commit is contained in:
Ludo Galabru
2023-07-07 06:45:39 -04:00
parent a13351d46a
commit 8c2b16cc48
3 changed files with 124 additions and 51 deletions

View File

@@ -6,7 +6,7 @@ use crate::db::{
delete_data_in_hord_db, find_last_block_inserted, find_lazy_block_at_block_height,
find_watched_satpoint_for_inscription, initialize_hord_db, open_readonly_hord_db_conn,
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db, retrieve_satoshi_point_using_lazy_storage,
open_readwrite_hord_db_conn_rocks_db, retrieve_satoshi_point_using_lazy_storage, find_all_inscriptions_in_block, remove_entry_from_inscriptions, insert_entry_in_locations,
};
use crate::hord::{
self, new_traversals_lazy_cache, retrieve_inscribed_satoshi_points_from_block,
@@ -26,6 +26,7 @@ use std::io::{BufReader, Read};
use std::path::PathBuf;
use std::process;
use std::sync::Arc;
use std::sync::mpsc::channel;
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
@@ -238,6 +239,8 @@ struct UpdateHordDbCommand {
/// Load config file path
#[clap(long = "config-path")]
pub config_path: Option<String>,
/// Transfers only
pub transfers_only: bool,
}
#[derive(Parser, PartialEq, Clone, Debug)]
@@ -507,31 +510,69 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
Command::Db(HordDbCommand::Rewrite(cmd)) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
// Delete data, if any
{
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
if cmd.transfers_only {
let mut inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn =
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;
delete_data_in_hord_db(
for cursor in cmd.start_block..cmd.end_block {
let inscriptions = find_all_inscriptions_in_block(&cursor, &inscriptions_db_conn, &ctx);
let transaction = inscriptions_db_conn_rw.transaction().unwrap();
for (_, entry) in inscriptions.iter() {
remove_entry_from_inscriptions(&entry.get_inscription_id(), &transaction, &ctx);
insert_entry_in_locations(&entry.get_inscription_id(), cursor,&entry.transfer_data, &transaction, &ctx)
}
transaction.commit().unwrap();
}
let bitcoin_config = BitcoinConfig {
username: config.network.bitcoind_rpc_username.clone(),
password: config.network.bitcoind_rpc_password.clone(),
rpc_url: config.network.bitcoind_rpc_url.clone(),
network: config.network.bitcoin_network.clone(),
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
};
let (tx, rx) = channel();
for cursor in cmd.start_block..cmd.end_block {
let block = fetch_and_standardize_block(cursor, &bitcoin_config, &ctx).await.unwrap();
let _ = tx.send(block);
}
let inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let mut storage = Storage::Sqlite(&inscriptions_db_conn_rw);
while let Ok(mut block) = rx.recv() {
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data(&mut block, &mut storage, &ctx).unwrap();
}
} else {
// Delete data, if any
{
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
delete_data_in_hord_db(
cmd.start_block,
cmd.end_block,
&blocks_db_rw,
&inscriptions_db_conn_rw,
&ctx,
)?;
}
// Update data
hord::perform_hord_db_update(
cmd.start_block,
cmd.end_block,
&blocks_db_rw,
&inscriptions_db_conn_rw,
&config.get_hord_config(),
&config,
None,
&ctx,
)?;
)
.await?;
}
// Update data
hord::perform_hord_db_update(
cmd.start_block,
cmd.end_block,
&config.get_hord_config(),
&config,
None,
&ctx,
)
.await?;
}
Command::Db(HordDbCommand::Check(cmd)) => {
let config = Config::default(false, false, false, &cmd.config_path)?;

View File

@@ -15,7 +15,7 @@ use hiro_system_kit::slog;
use rand::{thread_rng, Rng};
use rocksdb::DB;
use rusqlite::{Connection, OpenFlags, ToSql};
use rusqlite::{Connection, OpenFlags, ToSql, Transaction};
use std::io::Cursor;
use std::io::{Read, Write};
use threadpool::ThreadPool;
@@ -487,21 +487,24 @@ pub fn find_initial_inscription_transfer_data(
inscription_id: &str,
inscriptions_db_conn: &Connection,
_ctx: &Context,
) -> Result<Option<(TransactionIdentifier, usize, u64)>, String> {
) -> Result<Option<TransferData>, String> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT outpoint_to_watch, offset FROM locations WHERE inscription_id = ? ORDER BY block_height ASC, tx_index ASC LIMIT 1")
.prepare("SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE inscription_id = ? ORDER BY block_height ASC, tx_index ASC LIMIT 1")
.unwrap();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let outpoint_to_watch: String = row.get(0).unwrap();
let (transaction_identifier, output_index) = parse_outpoint_to_watch(&outpoint_to_watch);
let (transaction_identifier_location, output_index) =
parse_outpoint_to_watch(&outpoint_to_watch);
let inscription_offset_intra_output: u64 = row.get(1).unwrap();
return Ok(Some((
transaction_identifier,
let tx_index: u64 = row.get(2).unwrap();
return Ok(Some(TransferData {
transaction_identifier_location,
output_index,
inscription_offset_intra_output,
)));
tx_index,
}));
}
Ok(None)
}
@@ -534,6 +537,7 @@ pub struct TransferData {
pub inscription_offset_intra_output: u64,
pub transaction_identifier_location: TransactionIdentifier,
pub output_index: usize,
pub tx_index: u64,
}
pub fn find_all_transfers_in_block(
@@ -551,12 +555,14 @@ pub fn find_all_transfers_in_block(
let inscription_id: String = row.get(0).unwrap();
let inscription_offset_intra_output: u64 = row.get(1).unwrap();
let outpoint_to_watch: String = row.get(2).unwrap();
let tx_index: u64 = row.get(3).unwrap();
let (transaction_identifier_location, output_index) =
parse_outpoint_to_watch(&outpoint_to_watch);
let transfer = TransferData {
inscription_offset_intra_output,
transaction_identifier_location,
output_index,
tx_index,
};
results
.entry(inscription_id)
@@ -650,7 +656,7 @@ pub fn find_inscription_with_id(
};
let mut rows = stmt.query(args).unwrap();
if let Some((transaction_identifier_location, output_index, inscription_offset_intra_output)) =
if let Some(transfer_data) =
find_initial_inscription_transfer_data(inscription_id, inscriptions_db_conn, ctx)?
{
while let Ok(Some(row)) = rows.next() {
@@ -665,11 +671,7 @@ pub fn find_inscription_with_id(
inscription_input_index,
transaction_identifier_inscription,
transfers: 0,
transfer_data: TransferData {
inscription_offset_intra_output,
transaction_identifier_location,
output_index,
},
transfer_data,
};
return Ok(Some(traversal));
}
@@ -681,12 +683,12 @@ pub fn find_all_inscriptions_in_block(
block_height: &u64,
inscriptions_db_conn: &Connection,
ctx: &Context,
) -> BTreeMap<u64, Vec<(TransactionIdentifier, TraversalResult)>> {
) -> Vec<(TransactionIdentifier, TraversalResult)> {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT inscription_number, ordinal_number, inscription_id FROM inscriptions where block_height = ? ORDER BY inscription_number ASC")
.unwrap();
let mut results: BTreeMap<u64, Vec<(TransactionIdentifier, TraversalResult)>> = BTreeMap::new();
let mut results = vec![];
let mut rows = stmt.query(args).unwrap();
let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_conn, ctx);
@@ -710,15 +712,7 @@ pub fn find_all_inscriptions_in_block(
transaction_identifier_inscription: transaction_identifier_inscription.clone(),
transfer_data: transfer_data,
};
results
.entry(*block_height)
.and_modify(|v| {
v.push((
transaction_identifier_inscription.clone(),
traversal.clone(),
))
})
.or_insert(vec![(transaction_identifier_inscription, traversal)]);
results.push((transaction_identifier_inscription, traversal));
}
return results;
}
@@ -827,6 +821,38 @@ pub fn remove_entry_from_inscriptions(
}
}
pub fn remove_entry_from_locations(
inscription_id: &str,
inscriptions_db_rw_conn: &Transaction,
ctx: &Context,
) {
if let Err(e) = inscriptions_db_rw_conn.execute(
"DELETE FROM locations WHERE inscription_id = ?1",
rusqlite::params![&inscription_id],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}
pub fn insert_entry_in_locations(
inscription_id: &str,
block_height: u64,
transfer_data: &TransferData,
inscriptions_db_rw_conn: &Transaction,
ctx: &Context,
) {
let outpoint_to_watch = format_outpoint_to_watch(
&transfer_data.transaction_identifier_location,
transfer_data.output_index,
);
if let Err(e) = inscriptions_db_rw_conn.execute(
"INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&inscription_id, &outpoint_to_watch, &transfer_data.inscription_offset_intra_output, &block_height, &transfer_data.tx_index],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}
pub fn delete_data_in_hord_db(
start_block: u64,
end_block: u64,
@@ -1090,6 +1116,14 @@ impl TraversalResult {
let sat = Sat(self.ordinal_number);
self.ordinal_number - sat.height().starting_sat().n()
}
pub fn get_inscription_id(&self) -> String {
format!(
"{}i{}",
self.transaction_identifier_inscription.get_hash_bytes_str(),
self.inscription_input_index
)
}
}
pub fn format_satpoint_to_watch(
@@ -1280,6 +1314,7 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
inscription_offset_intra_output,
transaction_identifier_location: transaction_identifier.clone(),
output_index: inscription_output_index,
tx_index: 0,
},
});
}
@@ -1404,6 +1439,7 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
inscription_offset_intra_output,
transaction_identifier_location: transaction_identifier.clone(),
output_index: inscription_output_index,
tx_index: 0,
},
});
}
@@ -1423,6 +1459,7 @@ pub fn retrieve_satoshi_point_using_lazy_storage(
inscription_offset_intra_output,
transaction_identifier_location: transaction_identifier.clone(),
output_index: inscription_output_index,
tx_index: 0,
},
})
}

View File

@@ -128,12 +128,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
if let Some(ref inscriptions_db_conn) = inscriptions_db_conn {
// Evaluating every single block is required for also keeping track of transfers.
let local_traverals =
match find_all_inscriptions_in_block(&cursor, &inscriptions_db_conn, &ctx)
.remove(&cursor)
{
Some(entry) => entry,
None => vec![],
};
find_all_inscriptions_in_block(&cursor, &inscriptions_db_conn, &ctx);
for (transaction_identifier, traversal_result) in local_traverals.into_iter() {
traversals.insert(
(