feat: drafting lazy deserialization

This commit is contained in:
Ludo Galabru
2023-04-20 21:55:33 -04:00
parent a8f1926ce2
commit eaa2f71fce

View File

@@ -4,13 +4,13 @@ use std::{
};
use chainhook_types::{
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, StacksBlockData,
TransactionIdentifier,
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier,
};
use hiro_system_kit::slog;
use rocksdb::DB;
use rusqlite::{Connection, OpenFlags, ToSql};
use std::io::Cursor;
use threadpool::ThreadPool;
use crate::{
@@ -325,21 +325,53 @@ impl CompactedBlock {
Ok(())
}
fn serialize_to_lazy_format<W: Write>(&self, fd: &mut W) -> std::io::Result<()> {
let tx_len = self.0 .1.len() as u16;
fd.write(&tx_len.to_be_bytes())?;
for (_, inputs, outputs) in self.0 .1.iter() {
let inputs_len = inputs.len() as u8;
let outputs_len = outputs.len() as u8;
fd.write(&[inputs_len])?;
fd.write(&[outputs_len])?;
}
fd.write_all(&self.0 .0 .0)?;
fd.write(&self.0 .0 .1.to_be_bytes())?;
for (id, inputs, outputs) in self.0 .1.iter() {
fd.write_all(id)?;
for (txid, block, vout, value) in inputs.iter() {
fd.write_all(txid)?;
fd.write(&block.to_be_bytes())?;
fd.write(&vout.to_be_bytes())?;
fd.write(&value.to_be_bytes())?;
}
for value in outputs.iter() {
fd.write(&value.to_be_bytes())?;
}
}
Ok(())
}
fn deserialize<R: Read>(fd: &mut R) -> std::io::Result<CompactedBlock> {
let mut ci = [0u8; 8];
fd.read_exact(&mut ci)?;
let mut cv = [0u8; 8];
fd.read_exact(&mut cv)?;
let mut tx_len = [0u8; 8];
fd.read_exact(&mut tx_len)?;
let mut txs = vec![];
for _ in 0..usize::from_be_bytes(tx_len) {
let tx_len = {
let mut bytes = [0u8; 8];
fd.read_exact(&mut bytes).expect("corrupted data");
usize::from_be_bytes(bytes)
};
let mut txs = Vec::with_capacity(tx_len);
for _ in 0..tx_len {
let mut txid = [0u8; 8];
fd.read_exact(&mut txid)?;
let mut inputs_len = [0u8; 8];
fd.read_exact(&mut inputs_len)?;
let mut inputs = vec![];
for _ in 0..usize::from_be_bytes(inputs_len) {
let inputs_len = {
let mut bytes = [0u8; 8];
fd.read_exact(&mut bytes).expect("corrupted data");
usize::from_be_bytes(bytes)
};
let mut inputs = Vec::with_capacity(inputs_len);
for _ in 0..inputs_len {
let mut txin = [0u8; 8];
fd.read_exact(&mut txin)?;
let mut block = [0u8; 4];
@@ -355,10 +387,13 @@ impl CompactedBlock {
u64::from_be_bytes(value),
))
}
let mut outputs_len = [0u8; 8];
fd.read_exact(&mut outputs_len)?;
let mut outputs = vec![];
for _ in 0..usize::from_be_bytes(outputs_len) {
let outputs_len = {
let mut bytes = [0u8; 8];
fd.read_exact(&mut bytes).expect("corrupted data");
usize::from_be_bytes(bytes)
};
let mut outputs = Vec::with_capacity(outputs_len);
for _ in 0..outputs_len {
let mut v = [0u8; 8];
fd.read_exact(&mut v)?;
outputs.push(u64::from_be_bytes(v))
@@ -457,6 +492,13 @@ pub fn find_block_at_block_height(block_height: u32, blocks_db: &DB) -> Option<C
}
}
pub fn find_lazy_block_at_block_height(block_height: u32, blocks_db: &DB) -> Option<LazyBlock> {
match blocks_db.get(block_height.to_be_bytes()) {
Ok(Some(res)) => Some(LazyBlock::new(res)),
_ => None,
}
}
pub fn remove_entry_from_blocks(block_height: u32, blocks_db_rw: &DB, ctx: &Context) {
if let Err(e) = blocks_db_rw.delete(block_height.to_be_bytes()) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
@@ -472,6 +514,10 @@ pub fn delete_blocks_in_block_range(
for block_height in start_block..=end_block {
remove_entry_from_blocks(block_height, blocks_db_rw, ctx);
}
let start_block_bytes = (start_block - 1).to_be_bytes();
blocks_db_rw
.put(b"metadata::last_insert", start_block_bytes)
.expect("unable to insert metadata");
}
pub fn delete_blocks_in_block_range_sqlite(
@@ -1115,3 +1161,341 @@ pub fn retrieve_satoshi_point_using_local_storage(
transfers: hops,
})
}
pub fn retrieve_satoshi_point_using_lazy_storage(
blocks_db: &DB,
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
inscription_number: u64,
ctx: &Context,
) -> Result<TraversalResult, String> {
ctx.try_log(|logger| {
slog::info!(
logger,
"Computing ordinal number for Satoshi point {}:0:0 (block #{})",
transaction_identifier.hash,
block_identifier.index
)
});
let mut ordinal_offset = 0;
let mut ordinal_block_number = block_identifier.index as u32;
let txid = {
let bytes = hex::decode(&transaction_identifier.hash[2..]).unwrap();
[
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]
};
let mut tx_cursor = (txid, 0);
let mut hops: u32 = 0;
let mut local_block_cache = HashMap::new();
loop {
local_block_cache.clear();
hops += 1;
let lazy_block = match local_block_cache.get(&ordinal_block_number) {
Some(block) => block,
None => match find_lazy_block_at_block_height(ordinal_block_number, &blocks_db) {
Some(block) => {
local_block_cache.insert(ordinal_block_number, block);
local_block_cache.get(&ordinal_block_number).unwrap()
}
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
}
},
};
let coinbase_txid = lazy_block.get_coinbase_txid();
let txid = tx_cursor.0;
// ctx.try_log(|logger| {
// slog::info!(
// logger,
// "{ordinal_block_number}:{:?}:{:?}",
// hex::encode(&coinbase_txid),
// hex::encode(&txid)
// )
// });
// evaluate exit condition: did we reach the **final** coinbase transaction
if coinbase_txid.eq(&txid) {
let coinbase_value = &lazy_block.get_coinbase_sats();
if ordinal_offset.lt(coinbase_value) {
// Great!
break;
}
// loop over the transaction fees to detect the right range
let cut_off = ordinal_offset - coinbase_value;
let mut accumulated_fees = 0;
for tx in lazy_block.iter_tx() {
let mut total_in = 0;
for input in tx.inputs.iter() {
total_in += input.txin_value;
}
let mut total_out = 0;
for output_value in tx.outputs.iter() {
total_out += output_value;
}
let fee = total_in - total_out;
accumulated_fees += fee;
if accumulated_fees > cut_off {
// We are looking at the right transaction
// Retraverse the inputs to select the index to be picked
let mut sats_in = 0;
for input in tx.inputs.into_iter() {
sats_in += input.txin_value;
if sats_in >= total_out {
ordinal_offset = total_out - (sats_in - input.txin_value);
ordinal_block_number = input.block_height;
tx_cursor = (input.txin.clone(), input.vout as usize);
break;
}
}
break;
}
}
} else {
// isolate the target transaction
let lazy_tx = match lazy_block.find_and_serialize_transaction_with_txid(&txid) {
Some(entry) => entry,
None => unreachable!(),
};
let mut sats_out = 0;
for (index, output_value) in lazy_tx.outputs.iter().enumerate() {
if index == tx_cursor.1 {
break;
}
// ctx.try_log(|logger| {
// slog::info!(logger, "Adding {} from output #{}", output_value, index)
// });
sats_out += output_value;
}
sats_out += ordinal_offset;
let mut sats_in = 0;
for input in lazy_tx.inputs.into_iter() {
sats_in += input.txin_value;
// ctx.try_log(|logger| {
// slog::info!(
// logger,
// "Adding txin_value {txin_value} to sats_in {sats_in} (txin: {})",
// hex::encode(&txin)
// )
// });
if sats_out < sats_in {
ordinal_offset = sats_out - (sats_in - input.txin_value);
ordinal_block_number = input.block_height;
// ctx.try_log(|logger| slog::info!(logger, "Block {ordinal_block_number} / Tx {} / [in:{sats_in}, out:{sats_out}]: {block_height} -> {ordinal_block_number}:{ordinal_offset} -> {}:{vout}",
// hex::encode(&txid_n),
// hex::encode(&txin)));
tx_cursor = (input.txin.clone(), input.vout as usize);
break;
}
}
}
}
let height = Height(ordinal_block_number.into());
let ordinal_number = height.starting_sat().0 + ordinal_offset;
Ok(TraversalResult {
inscription_number,
ordinal_number,
transfers: hops,
})
}
#[derive(Debug)]
pub struct LazyBlock {
pub bytes: Vec<u8>,
pub tx_len: u16,
}
#[derive(Debug)]
pub struct LazyBlockTransaction {
pub txid: [u8; 8],
pub inputs: Vec<LazyBlockTransactionInput>,
pub outputs: Vec<u64>,
}
#[derive(Debug)]
pub struct LazyBlockTransactionInput {
pub txin: [u8; 8],
pub block_height: u32,
pub vout: u8,
pub txin_value: u64,
}
const TXID_LEN: usize = 8;
const SATS_LEN: usize = 8;
const INPUT_SIZE: usize = 8 + 4 + 1 + 8;
const OUTPUT_SIZE: usize = 8;
impl LazyBlock {
pub fn new(bytes: Vec<u8>) -> LazyBlock {
let tx_len = u16::from_be_bytes([bytes[0], bytes[1]]);
LazyBlock { bytes, tx_len }
}
pub fn get_coinbase_data_pos(&self) -> usize {
(2 + self.tx_len * 2) as usize
}
pub fn get_u64_at_pos(&self, pos: usize) -> u64 {
u64::from_be_bytes([
self.bytes[pos],
self.bytes[pos + 1],
self.bytes[pos + 2],
self.bytes[pos + 3],
self.bytes[pos + 4],
self.bytes[pos + 5],
self.bytes[pos + 6],
self.bytes[pos + 7],
])
}
pub fn get_coinbase_txid(&self) -> &[u8] {
let pos = self.get_coinbase_data_pos();
&self.bytes[pos..pos + TXID_LEN]
}
pub fn get_coinbase_sats(&self) -> u64 {
let pos = self.get_coinbase_data_pos() + TXID_LEN;
self.get_u64_at_pos(pos)
}
pub fn get_transactions_data_pos(&self) -> usize {
self.get_coinbase_data_pos() + TXID_LEN + SATS_LEN
}
pub fn get_transaction_format(&self, index: u16) -> (u8, u8, usize) {
let inputs_len_pos = (2 + index * 2) as usize;
let inputs = self.bytes[inputs_len_pos];
let outputs = self.bytes[inputs_len_pos + 1];
let size = TXID_LEN + (inputs as usize * INPUT_SIZE) + (outputs as usize * OUTPUT_SIZE);
(inputs, outputs, size)
}
pub fn get_lazy_transaction_at_pos(
&self,
cursor: &mut Cursor<&Vec<u8>>,
txid: [u8; 8],
inputs_len: u8,
outputs_len: u8,
) -> LazyBlockTransaction {
let mut inputs = Vec::with_capacity(inputs_len as usize);
for _ in 0..inputs_len {
let mut txin = [0u8; 8];
cursor.read_exact(&mut txin).expect("data corrupted");
let mut block_height = [0u8; 4];
cursor
.read_exact(&mut block_height)
.expect("data corrupted");
let mut vout = [0u8; 1];
cursor.read_exact(&mut vout).expect("data corrupted");
let mut txin_value = [0u8; 8];
cursor.read_exact(&mut txin_value).expect("data corrupted");
inputs.push(LazyBlockTransactionInput {
txin: txin,
block_height: u32::from_be_bytes(block_height),
vout: vout[0],
txin_value: u64::from_be_bytes(txin_value),
});
}
let mut outputs = Vec::with_capacity(outputs_len as usize);
for _ in 0..outputs_len {
let mut value = [0u8; 8];
cursor.read_exact(&mut value).expect("data corrupted");
outputs.push(u64::from_be_bytes(value))
}
LazyBlockTransaction {
txid,
inputs,
outputs,
}
}
pub fn find_and_serialize_transaction_with_txid(
&self,
searched_txid: &[u8],
) -> Option<LazyBlockTransaction> {
let mut entry = None;
let mut cursor = Cursor::new(&self.bytes);
let mut cumulated_offset = 0;
let mut i = 0;
while entry.is_none() {
let pos = self.get_transactions_data_pos() + cumulated_offset;
let (inputs_len, outputs_len, size) = self.get_transaction_format(i);
cursor.set_position(pos as u64);
let mut txid = [0u8; 8];
let _ = cursor.read_exact(&mut txid);
if searched_txid.eq(&txid) {
entry = Some(self.get_lazy_transaction_at_pos(
&mut cursor,
txid,
inputs_len,
outputs_len,
));
} else {
cumulated_offset += size;
i += 1;
if i >= self.tx_len {
break;
}
}
}
entry
}
pub fn iter_tx(&self) -> LazyBlockTransactionIterator {
LazyBlockTransactionIterator::new(&self)
}
}
pub struct LazyBlockTransactionIterator<'a> {
lazy_block: &'a LazyBlock,
tx_index: u16,
cumulated_offset: usize,
}
impl<'a> LazyBlockTransactionIterator<'a> {
pub fn new(lazy_block: &'a LazyBlock) -> LazyBlockTransactionIterator<'a> {
LazyBlockTransactionIterator {
lazy_block,
tx_index: 0,
cumulated_offset: 0,
}
}
}
impl<'a> Iterator for LazyBlockTransactionIterator<'a> {
type Item = LazyBlockTransaction;
fn next(&mut self) -> Option<LazyBlockTransaction> {
if self.tx_index >= self.lazy_block.tx_len {
return None;
}
let pos = self.lazy_block.get_transactions_data_pos() + self.cumulated_offset;
let (inputs_len, outputs_len, size) = self.lazy_block.get_transaction_format(self.tx_index);
let mut cursor = Cursor::new(&self.lazy_block.bytes);
cursor.set_position(pos as u64);
let mut txid = [0u8; 8];
let _ = cursor.read_exact(&mut txid);
self.cumulated_offset += size;
self.tx_index += 1;
Some(self.lazy_block.get_lazy_transaction_at_pos(
&mut cursor,
txid,
inputs_len,
outputs_len,
))
}
}