feat: introduce rocksdb storage for Stacks

This commit is contained in:
Ludo Galabru
2023-06-04 23:43:56 -04:00
parent 825f2c9cbe
commit 4564e8818a
2 changed files with 156 additions and 0 deletions

View File

@@ -15,6 +15,7 @@ pub mod cli;
pub mod config;
pub mod scan;
pub mod service;
pub mod storage;
fn main() {
cli::main();

View File

@@ -0,0 +1,155 @@
use std::path::PathBuf;
use chainhook_event_observer::{rocksdb::Options, rocksdb::DB, utils::Context};
use chainhook_types::{BlockIdentifier, StacksBlockData};
fn get_db_default_options() -> Options {
let mut opts = Options::default();
opts.create_if_missing(true);
// opts.prepare_for_bulk_load();
// opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
// opts.set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
// opts.increase_parallelism(parallelism)
// Per rocksdb's documentation:
// If cache_index_and_filter_blocks is false (which is default),
// the number of index/filter blocks is controlled by option max_open_files.
// If you are certain that your ulimit will always be bigger than number of files in the database,
// we recommend setting max_open_files to -1, which means infinity.
// This option will preload all filter and index blocks and will not need to maintain LRU of files.
// Setting max_open_files to -1 will get you the best possible performance.
opts.set_max_open_files(2048);
opts
}
fn get_default_stacks_db_file_path(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
destination_path.push("stacks.rocksdb");
destination_path
}
pub fn open_readonly_stacks_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Result<DB, String> {
let path = get_default_stacks_db_file_path(&base_dir);
let opts = get_db_default_options();
let db = DB::open_for_read_only(&opts, path, false)
.map_err(|e| format!("unable to open stacks.rocksdb: {}", e.to_string()))?;
Ok(db)
}
pub fn open_readwrite_stacks_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Result<DB, String> {
let path = get_default_stacks_db_file_path(&base_dir);
let opts = get_db_default_options();
let db = DB::open(&opts, path)
.map_err(|e| format!("unable to open stacks.rocksdb: {}", e.to_string()))?;
Ok(db)
}
fn get_default_bitcoin_db_file_path(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
destination_path.push("bitcoin.rocksdb");
destination_path
}
pub fn open_readonly_bitcoin_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Result<DB, String> {
let path = get_default_bitcoin_db_file_path(&base_dir);
let opts = get_db_default_options();
let db = DB::open_for_read_only(&opts, path, false)
.map_err(|e| format!("unable to open bitcoin.rocksdb: {}", e.to_string()))?;
Ok(db)
}
pub fn open_readwrite_bitcoin_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Result<DB, String> {
let path = get_default_bitcoin_db_file_path(&base_dir);
let opts = get_db_default_options();
let db = DB::open(&opts, path)
.map_err(|e| format!("unable to open bitcoin.rocksdb: {}", e.to_string()))?;
Ok(db)
}
fn get_block_key(block_identifier: &BlockIdentifier) -> [u8; 12] {
let mut key = [0u8; 12];
key[..2].copy_from_slice(b"b:");
key[2..10].copy_from_slice(&block_identifier.index.to_be_bytes());
key[10..].copy_from_slice(b":d");
key
}
fn get_last_insert_key() -> [u8; 3] {
*b"m:t"
}
pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, _ctx: &Context) {
let key = get_block_key(&block.block_identifier);
let block_bytes = json!(block);
stacks_db_rw
.put(&key, &block_bytes.to_string().as_bytes())
.expect("unable to insert blocks");
stacks_db_rw
.put(
get_last_insert_key(),
block.block_identifier.index.to_be_bytes(),
)
.expect("unable to insert metadata");
}
pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option<u64> {
stacks_db
.get(get_last_insert_key())
.unwrap_or(None)
.and_then(|bytes| Some(u64::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7]])))
}
pub fn insert_entries_in_stacks_blocks(
blocks: &Vec<StacksBlockData>,
stacks_db_rw: &DB,
ctx: &Context,
) {
for block in blocks.iter() {
insert_entry_in_stacks_blocks(block, stacks_db_rw, ctx);
}
}
pub fn get_stacks_block_at_block_height(
block_height: u64,
retry: u8,
stacks_db: &DB,
) -> Result<Option<StacksBlockData>, String> {
let mut attempt = 0;
loop {
match stacks_db.get(get_block_key(&BlockIdentifier { hash: "".to_string(), index: block_height })) {
Ok(Some(entry)) => return Ok(Some({
let spec: StacksBlockData = serde_json::from_slice(&entry[..])
.map_err(|e| format!("unable to deserialize Stacks chainhook {}", e.to_string()))?;
spec
})),
Ok(None) => return Ok(None),
_ => {
attempt += 1;
std::thread::sleep(std::time::Duration::from_secs(2));
if attempt > retry {
return Ok(None); // TODO
}
}
}
}
}
pub fn is_stacks_block_present(
block_identifier: &BlockIdentifier,
retry: u8,
stacks_db: &DB,
) -> bool {
let mut attempt = 0;
loop {
match stacks_db.get(get_block_key(block_identifier)) {
Ok(Some(_)) => return true,
Ok(None) => return false,
_ => {
attempt += 1;
std::thread::sleep(std::time::Duration::from_secs(2));
if attempt > retry {
return false;
}
}
}
}
}