feat: complete migration to lazy blocks

This commit is contained in:
Ludo Galabru
2023-05-11 09:19:52 -04:00
parent e8ee3ab036
commit fa5058471a
3 changed files with 151 additions and 303 deletions

View File

@@ -13,10 +13,9 @@ use chainhook_event_observer::chainhooks::types::{
StacksPrintEventBasedPredicate,
};
use chainhook_event_observer::hord::db::{
delete_blocks_in_block_range_sqlite, delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db,
find_block_at_block_height, find_block_at_block_height_sqlite, find_last_block_inserted,
find_watched_satpoint_for_inscription, initialize_hord_db, insert_entry_in_blocks,
insert_entry_in_blocks_lazy_block, open_readonly_hord_db_conn,
delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db,
find_block_at_block_height, find_last_block_inserted, find_watched_satpoint_for_inscription,
initialize_hord_db, insert_entry_in_blocks, open_readonly_hord_db_conn,
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db, retrieve_satoshi_point_using_lazy_storage, LazyBlock,
};
@@ -214,9 +213,6 @@ enum DbCommand {
/// Check integrity
#[clap(name = "check", bin_name = "check")]
Check(CheckHordDbCommand),
/// Legacy command
#[clap(name = "init", bin_name = "init")]
Init(InitHordDbCommand),
/// Patch DB
#[clap(name = "patch", bin_name = "patch")]
Patch(PatchHordDbCommand),
@@ -741,84 +737,6 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
},
Command::Hord(HordCommand::Db(subcmd)) => match subcmd {
DbCommand::Init(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let sqlite_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;
// Migrate if required
if find_block_at_block_height_sqlite(1, &sqlite_db_conn_rw).is_some() {
let blocks_db =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
for i in 0..=300000 {
match find_block_at_block_height_sqlite(i, &sqlite_db_conn_rw) {
Some(block) => {
insert_entry_in_blocks(i, &block, &blocks_db, &ctx);
info!(ctx.expect_logger(), "Block #{} inserted", i);
}
None => {
error!(ctx.expect_logger(), "Block #{} missing", i);
}
}
}
let _ = blocks_db.flush();
delete_blocks_in_block_range_sqlite(0, 300000, &sqlite_db_conn_rw, &ctx);
for i in 300001..=500000 {
match find_block_at_block_height_sqlite(i, &sqlite_db_conn_rw) {
Some(block) => {
insert_entry_in_blocks(i, &block, &blocks_db, &ctx);
info!(ctx.expect_logger(), "Block #{} inserted", i);
}
None => {
info!(ctx.expect_logger(), "Block #{} missing", i);
}
}
}
let _ = blocks_db.flush();
delete_blocks_in_block_range_sqlite(300001, 500000, &sqlite_db_conn_rw, &ctx);
for i in 500001..=783986 {
match find_block_at_block_height_sqlite(i, &sqlite_db_conn_rw) {
Some(block) => {
insert_entry_in_blocks(i, &block, &blocks_db, &ctx);
info!(ctx.expect_logger(), "Block #{} inserted", i);
}
None => {
info!(ctx.expect_logger(), "Block #{} missing", i);
}
}
}
let _ = blocks_db.flush();
delete_blocks_in_block_range_sqlite(500001, 783986, &sqlite_db_conn_rw, &ctx);
}
// Sync
for _ in 0..5 {
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
if start_block == 0 {
info!(
ctx.expect_logger(),
"Initializing hord indexing from block #{}", start_block
);
} else {
info!(
ctx.expect_logger(),
"Resuming hord indexing from block #{}", start_block
);
}
perform_hord_db_update(start_block, end_block, 10, &config, &ctx).await?;
} else {
info!(ctx.expect_logger(), "Database hord up to date");
}
}
// Start node
let mut service = Service::new(config, ctx);
return service.run(vec![]).await;
}
DbCommand::Sync(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
@@ -910,24 +828,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
);
}
DbCommand::Patch(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let sqlite_db_conn =
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;
let blocks_db =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
for i in 0..774940 {
match find_block_at_block_height_sqlite(i, &sqlite_db_conn) {
Some(block) => {
insert_entry_in_blocks(i, &block, &blocks_db, &ctx);
println!("Block #{} inserted", i);
}
None => {
println!("Block #{} missing", i)
}
}
}
unimplemented!()
}
DbCommand::Migrate(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
@@ -945,7 +846,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
.serialize_to_lazy_format(&mut bytes)
.expect("unable to convert to lazy block");
let lazy_block = LazyBlock::new(bytes);
insert_entry_in_blocks_lazy_block(i, &lazy_block, &blocks_db_rw, &ctx);
insert_entry_in_blocks(i, &lazy_block, &blocks_db_rw, &ctx);
println!("Block #{} migrated to lazy block", i);
}
None => {

View File

@@ -167,37 +167,6 @@ fn open_existing_readonly_db(path: &PathBuf, ctx: &Context) -> Connection {
return conn;
}
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
// #[repr(C)]
// pub struct T {
// ci: [u8; 4],
// cv: u64,
// t: Vec<Tx>,
// }
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
// #[repr(C, packed)]
// pub struct Tx {
// t: [u8; 4],
// i: TxIn,
// o: TxOut,
// }
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
// #[repr(C, packed)]
// pub struct TxIn {
// i: [u8; 4],
// b: u32,
// o: u16,
// v: u64
// }
// #[derive(zerocopy::FromBytes, zerocopy::AsBytes)]
// #[repr(C, packed)]
// pub struct TxOut {
// v: u64,
// }
#[derive(Debug, Serialize, Deserialize)]
#[repr(C)]
pub struct CompactedBlock(
@@ -213,123 +182,12 @@ impl CompactedBlock {
fn empty() -> CompactedBlock {
CompactedBlock((([0, 0, 0, 0, 0, 0, 0, 0], 0), vec![]))
}
pub fn from_full_block(block: &BitcoinBlockFullBreakdown) -> CompactedBlock {
let mut txs = vec![];
let mut coinbase_value = 0;
let coinbase_txid = {
let txid = hex::decode(block.tx[0].txid.to_string()).unwrap();
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
]
};
for coinbase_output in block.tx[0].vout.iter() {
coinbase_value += coinbase_output.value.to_sat();
}
for tx in block.tx.iter().skip(1) {
let mut inputs = vec![];
for input in tx.vin.iter() {
let txin = hex::decode(input.txid.unwrap().to_string()).unwrap();
inputs.push((
[
txin[0], txin[1], txin[2], txin[3], txin[4], txin[5], txin[6], txin[7],
],
input.prevout.as_ref().unwrap().height as u32,
input.vout.unwrap() as u16,
input.prevout.as_ref().unwrap().value.to_sat(),
));
}
let mut outputs = vec![];
for output in tx.vout.iter() {
outputs.push(output.value.to_sat());
}
let txid = hex::decode(tx.txid.to_string()).unwrap();
txs.push((
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
],
inputs,
outputs,
));
}
CompactedBlock(((coinbase_txid, coinbase_value), txs))
}
pub fn from_standardized_block(block: &BitcoinBlockData) -> CompactedBlock {
let mut txs = vec![];
let mut coinbase_value = 0;
let coinbase_txid = {
let txid =
hex::decode(&block.transactions[0].transaction_identifier.hash[2..]).unwrap();
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
]
};
for coinbase_output in block.transactions[0].metadata.outputs.iter() {
coinbase_value += coinbase_output.value;
}
for tx in block.transactions.iter().skip(1) {
let mut inputs = vec![];
for input in tx.metadata.inputs.iter() {
let txin = hex::decode(&input.previous_output.txid[2..]).unwrap();
inputs.push((
[
txin[0], txin[1], txin[2], txin[3], txin[4], txin[5], txin[6], txin[7],
],
input.previous_output.block_height as u32,
input.previous_output.vout as u16,
input.previous_output.value,
));
}
let mut outputs = vec![];
for output in tx.metadata.outputs.iter() {
outputs.push(output.value);
}
let txid = hex::decode(&tx.transaction_identifier.hash[2..]).unwrap();
txs.push((
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
],
inputs,
outputs,
));
}
CompactedBlock(((coinbase_txid, coinbase_value), txs))
}
pub fn from_hex_bytes(bytes: &str) -> CompactedBlock {
let bytes = hex_simd::decode_to_vec(&bytes).unwrap_or(vec![]);
let value = serde_cbor::from_slice(&bytes[..]).unwrap_or(CompactedBlock::empty());
value
}
pub fn from_cbor_bytes(bytes: &[u8]) -> CompactedBlock {
serde_cbor::from_slice(&bytes[..]).unwrap()
}
fn serialize<W: Write>(&self, fd: &mut W) -> std::io::Result<()> {
fd.write_all(&self.0 .0 .0)?;
fd.write(&self.0 .0 .1.to_be_bytes())?;
fd.write(&self.0 .1.len().to_be_bytes())?;
for (id, inputs, outputs) in self.0 .1.iter() {
fd.write_all(id)?;
fd.write(&inputs.len().to_be_bytes())?;
for (txid, block, vout, value) in inputs.iter() {
fd.write_all(txid)?;
fd.write(&block.to_be_bytes())?;
fd.write(&vout.to_be_bytes())?;
fd.write(&value.to_be_bytes())?;
}
fd.write(&outputs.len().to_be_bytes())?;
for value in outputs.iter() {
fd.write(&value.to_be_bytes())?;
}
}
Ok(())
}
pub fn serialize_to_lazy_format<W: Write>(&self, fd: &mut W) -> std::io::Result<()> {
// Number of transactions in the block (not including coinbase)
let tx_len = self.0 .1.len() as u16;
@@ -478,46 +336,7 @@ pub fn archive_hord_db_conn_rocks_db(base_dir: &PathBuf, _ctx: &Context) {
let _ = std::fs::rename(from, to);
}
// Legacy - to remove after migrations
pub fn find_block_at_block_height_sqlite(
block_height: u32,
hord_db_conn: &Connection,
) -> Option<CompactedBlock> {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = hord_db_conn
.prepare("SELECT compacted_bytes FROM blocks WHERE id = ?")
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
let hex_bytes: String = row.get(0).unwrap();
Ok(CompactedBlock::from_hex_bytes(&hex_bytes))
})
.unwrap();
for result in result_iter {
return Some(result.unwrap());
}
return None;
}
pub fn insert_entry_in_blocks(
block_height: u32,
compacted_block: &CompactedBlock,
blocks_db_rw: &DB,
_ctx: &Context,
) {
let mut bytes = vec![];
let _ = compacted_block.serialize(&mut bytes);
let block_height_bytes = block_height.to_be_bytes();
blocks_db_rw
.put(&block_height_bytes, bytes)
.expect("unable to insert blocks");
blocks_db_rw
.put(b"metadata::last_insert", block_height_bytes)
.expect("unable to insert metadata");
}
pub fn insert_entry_in_blocks_lazy_block(
block_height: u32,
lazy_block: &LazyBlock,
blocks_db_rw: &DB,
@@ -609,20 +428,6 @@ pub fn delete_blocks_in_block_range(
.expect("unable to insert metadata");
}
pub fn delete_blocks_in_block_range_sqlite(
start_block: u32,
end_block: u32,
rw_hord_db_conn: &Connection,
ctx: &Context,
) {
if let Err(e) = rw_hord_db_conn.execute(
"DELETE FROM blocks WHERE id >= ?1 AND id <= ?2",
rusqlite::params![&start_block, &end_block],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}
pub fn store_new_inscription(
inscription_data: &OrdinalInscriptionRevealData,
block_identifier: &BlockIdentifier,
@@ -1000,7 +805,8 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
let block_compressed_tx_moved = block_compressed_tx.clone();
let block_height = block_data.height as u64;
compress_block_data_pool.execute(move || {
let compressed_block = CompactedBlock::from_full_block(&block_data);
let compressed_block =
LazyBlock::from_full_block(&block_data).expect("unable to serialize block");
let block_index = block_data.height as u32;
let _ = block_compressed_tx_moved.send(Some((
block_index,
@@ -1745,6 +1551,141 @@ impl LazyBlock {
pub fn iter_tx(&self) -> LazyBlockTransactionIterator {
LazyBlockTransactionIterator::new(&self)
}
pub fn from_full_block(block: &BitcoinBlockFullBreakdown) -> std::io::Result<LazyBlock> {
let mut buffer = vec![];
// Number of transactions in the block (not including coinbase)
let tx_len = block.tx.len() as u16 - 1;
buffer.write(&tx_len.to_be_bytes())?;
// For each transaction:
for tx in block.tx.iter().skip(1) {
let inputs_len = tx.vin.len() as u16;
let outputs_len = tx.vout.len() as u16;
// Number of inputs
buffer.write(&inputs_len.to_be_bytes())?;
// Number of outputs
buffer.write(&outputs_len.to_be_bytes())?;
}
// Coinbase transaction txid - 8 first bytes
let coinbase_txid = {
let txid = hex::decode(block.tx[0].txid.to_string()).unwrap();
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
]
};
buffer.write_all(&coinbase_txid)?;
// Coinbase transaction value
let mut coinbase_value = 0;
for coinbase_output in block.tx[0].vout.iter() {
coinbase_value += coinbase_output.value.to_sat();
}
buffer.write(&coinbase_value.to_be_bytes())?;
// For each transaction:
for tx in block.tx.iter().skip(1) {
// txid - 8 first bytes
let txid = {
let txid = hex::decode(tx.txid.to_string()).unwrap();
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
]
};
buffer.write_all(&txid)?;
// For each transaction input:
for input in tx.vin.iter() {
// txin - 8 first bytes
let txin = {
let txid = hex::decode(input.txid.unwrap().to_string()).unwrap();
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
]
};
buffer.write_all(&txin)?;
// txin's block height
let block_height = input.prevout.as_ref().unwrap().height as u32;
buffer.write(&block_height.to_be_bytes())?;
// txin's vout index
let vout = input.vout.unwrap() as u16;
buffer.write(&vout.to_be_bytes())?;
// txin's sats value
let sats = input.prevout.as_ref().unwrap().value.to_sat();
buffer.write(&sats.to_be_bytes())?;
}
// For each transaction output:
for output in tx.vout.iter() {
let sats = output.value.to_sat();
buffer.write(&sats.to_be_bytes())?;
}
}
Ok(Self::new(buffer))
}
pub fn from_standardized_block(block: &BitcoinBlockData) -> std::io::Result<LazyBlock> {
let mut buffer = vec![];
// Number of transactions in the block (not including coinbase)
let tx_len = block.transactions.len() as u16 - 1;
buffer.write(&tx_len.to_be_bytes())?;
// For each transaction:
for tx in block.transactions.iter().skip(1) {
let inputs_len = tx.metadata.inputs.len() as u16;
let outputs_len = tx.metadata.outputs.len() as u16;
// Number of inputs
buffer.write(&inputs_len.to_be_bytes())?;
// Number of outputs
buffer.write(&outputs_len.to_be_bytes())?;
}
// Coinbase transaction txid - 8 first bytes
let coinbase_txid = {
let txid =
hex::decode(&block.transactions[0].transaction_identifier.hash[2..]).unwrap();
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
]
};
buffer.write_all(&coinbase_txid)?;
// Coinbase transaction value
let mut coinbase_value = 0;
for coinbase_output in block.transactions[0].metadata.outputs.iter() {
coinbase_value += coinbase_output.value;
}
buffer.write(&coinbase_value.to_be_bytes())?;
// For each transaction:
for tx in block.transactions.iter().skip(1) {
// txid - 8 first bytes
let txid = {
let txid = hex::decode(&tx.transaction_identifier.hash[2..]).unwrap();
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
]
};
buffer.write_all(&txid)?;
// For each transaction input:
for input in tx.metadata.inputs.iter() {
// txin - 8 first bytes
let txin = {
let txid = hex::decode(&input.previous_output.txid[2..]).unwrap();
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
]
};
buffer.write_all(&txin)?;
// txin's block height
let block_height = input.previous_output.block_height as u32;
buffer.write(&block_height.to_be_bytes())?;
// txin's vout index
let vout = input.previous_output.vout as u16;
buffer.write(&vout.to_be_bytes())?;
// txin's sats value
let sats = input.previous_output.value;
buffer.write(&sats.to_be_bytes())?;
}
// For each transaction output:
for output in tx.metadata.outputs.iter() {
let sats = output.value;
buffer.write(&sats.to_be_bytes())?;
}
}
Ok(Self::new(buffer))
}
}
pub struct LazyBlockTransactionIterator<'a> {

View File

@@ -28,7 +28,7 @@ use crate::{
db::{
find_inscription_with_ordinal_number, find_inscriptions_at_wached_outpoint,
insert_entry_in_blocks, retrieve_satoshi_point_using_lazy_storage,
store_new_inscription, update_transfered_inscription, CompactedBlock,
store_new_inscription, update_transfered_inscription,
},
ord::height::Height,
},
@@ -38,7 +38,7 @@ use crate::{
use self::db::{
find_inscription_with_id, find_latest_inscription_number_at_block_height,
open_readonly_hord_db_conn_rocks_db, remove_entry_from_blocks, remove_entry_from_inscriptions,
LazyBlockTransaction, TraversalResult, WatchedSatpoint,
LazyBlock, LazyBlockTransaction, TraversalResult, WatchedSatpoint,
};
use self::inscription::InscriptionParser;
use self::ord::inscription_id::InscriptionId;
@@ -308,7 +308,13 @@ pub fn update_hord_db_and_augment_bitcoin_block(
)
});
let compacted_block = CompactedBlock::from_standardized_block(&new_block);
let compacted_block = LazyBlock::from_standardized_block(&new_block).map_err(|e| {
format!(
"unable to serialize block {}: {}",
new_block.block_identifier.index,
e.to_string()
)
})?;
insert_entry_in_blocks(
new_block.block_identifier.index as u32,
&compacted_block,