feat: handle stacks unconfirmed state scans

This commit is contained in:
Ludo Galabru
2023-06-05 22:33:08 -04:00
parent 158633ca7e
commit f6d050fbce
4 changed files with 224 additions and 96 deletions

View File

@@ -147,9 +147,13 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
let mut last_block_scanned = BlockIdentifier::default();
let mut err_count = 0;
for cursor in start_block..=end_block {
let block_data = match get_stacks_block_at_block_height(cursor, 3, stacks_db_conn) {
let block_data = match get_stacks_block_at_block_height(cursor, true, 3, stacks_db_conn) {
Ok(Some(block)) => block,
Ok(None) => unimplemented!(),
Ok(None) => match get_stacks_block_at_block_height(cursor, false, 3, stacks_db_conn) {
Ok(Some(block)) => block,
Ok(None) => unimplemented!(),
Err(_) => unimplemented!(),
},
Err(_) => unimplemented!(),
};
last_block_scanned = block_data.block_identifier.clone();

View File

@@ -1,14 +1,18 @@
mod http_api;
mod runloops;
use crate::config::{Config, PredicatesApi};
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::scan::stacks::{
consolidate_local_stacks_chainstate_using_csv,
scan_stacks_chainstate_via_rocksdb_using_predicate,
};
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
use crate::storage::{
insert_entries_in_stacks_blocks, open_readonly_stacks_db_conn, open_readwrite_stacks_db_conn,
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks,
insert_unconfirmed_entry_in_stacks_blocks, open_readonly_stacks_db_conn,
open_readwrite_stacks_db_conn,
};
use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
@@ -135,102 +139,33 @@ impl Service {
// Stacks scan operation threadpool
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
let stacks_scan_pool =
ThreadPool::new(self.config.limits.max_number_of_concurrent_stacks_scans);
let ctx = self.ctx.clone();
let config = self.config.clone();
let observer_command_tx_moved = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("Stacks scan runloop")
.spawn(move || {
while let Ok(mut predicate_spec) = stacks_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx_moved.clone();
stacks_scan_pool.execute(move || {
let stacks_db_conn = match open_readonly_stacks_db_conn(
&moved_config.expected_cache_path(),
&moved_ctx,
) {
Ok(db_conn) => db_conn,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"unable to store stacks block: {}",
e.to_string()
);
unimplemented!()
}
};
let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
&stacks_db_conn,
&moved_config,
&moved_ctx,
);
let last_block_scanned = match hiro_system_kit::nestable_block_on(op) {
Ok(last_block_scanned) => last_block_scanned,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Stacks chainstate: {e}",
);
return;
}
};
info!(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}",
last_block_scanned.index
);
predicate_spec.end_block = Some(last_block_scanned.index);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
));
});
}
let res = stacks_scan_pool.join();
res
start_stacks_scan_runloop(
&config,
stacks_scan_op_rx,
observer_command_tx_moved,
&ctx,
);
})
.expect("unable to spawn thread");
// Bitcoin scan operation threadpool
let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded();
let bitcoin_scan_pool =
ThreadPool::new(self.config.limits.max_number_of_concurrent_bitcoin_scans);
let ctx = self.ctx.clone();
let config = self.config.clone();
let moved_observer_command_tx = observer_command_tx.clone();
let observer_command_tx_moved = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("Bitcoin scan runloop")
.spawn(move || {
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = moved_observer_command_tx.clone();
bitcoin_scan_pool.execute(move || {
let op = scan_bitcoin_chainstate_via_http_using_predicate(
&predicate_spec,
&moved_config,
&moved_ctx,
);
match hiro_system_kit::nestable_block_on(op) {
Ok(_) => {}
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
return;
}
};
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Bitcoin(predicate_spec),
));
});
}
let res = bitcoin_scan_pool.join();
res
start_bitcoin_scan_runloop(
&config,
bitcoin_scan_op_rx,
observer_command_tx_moved,
&ctx,
);
})
.expect("unable to spawn thread");
@@ -358,18 +293,28 @@ impl Service {
match &chain_event {
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
stacks_event += 1;
insert_entries_in_stacks_blocks(
confirm_entries_in_stacks_blocks(
&data.confirmed_blocks,
&stacks_db_conn_rw,
&self.ctx,
);
draft_entries_in_stacks_blocks(
&data.new_blocks,
&stacks_db_conn_rw,
&self.ctx,
)
}
StacksChainEvent::ChainUpdatedWithReorg(data) => {
insert_entries_in_stacks_blocks(
confirm_entries_in_stacks_blocks(
&data.confirmed_blocks,
&stacks_db_conn_rw,
&self.ctx,
);
draft_entries_in_stacks_blocks(
&data.blocks_to_apply,
&stacks_db_conn_rw,
&self.ctx,
)
}
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}

View File

@@ -0,0 +1,112 @@
use std::sync::mpsc::Sender;
use chainhook_event_observer::{
chainhooks::types::{
BitcoinChainhookSpecification, ChainhookSpecification, StacksChainhookSpecification,
},
observer::ObserverCommand,
utils::Context,
};
use threadpool::ThreadPool;
use crate::{
config::{Config, PredicatesApiConfig},
scan::{
bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate,
stacks::scan_stacks_chainstate_via_rocksdb_using_predicate,
},
storage::open_readonly_stacks_db_conn,
};
pub fn start_stacks_scan_runloop(
config: &Config,
stacks_scan_op_rx: crossbeam_channel::Receiver<StacksChainhookSpecification>,
observer_command_tx: Sender<ObserverCommand>,
ctx: &Context,
) {
let stacks_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_stacks_scans);
while let Ok(mut predicate_spec) = stacks_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx.clone();
stacks_scan_pool.execute(move || {
let stacks_db_conn =
match open_readonly_stacks_db_conn(&moved_config.expected_cache_path(), &moved_ctx)
{
Ok(db_conn) => db_conn,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"unable to store stacks block: {}",
e.to_string()
);
unimplemented!()
}
};
let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
&stacks_db_conn,
&moved_config,
&moved_ctx,
);
let last_block_scanned = match hiro_system_kit::nestable_block_on(op) {
Ok(last_block_scanned) => last_block_scanned,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Stacks chainstate: {e}",
);
return;
}
};
info!(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", last_block_scanned.index
);
predicate_spec.end_block = Some(last_block_scanned.index);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
));
});
}
let res = stacks_scan_pool.join();
res
}
pub fn start_bitcoin_scan_runloop(
config: &Config,
bitcoin_scan_op_rx: crossbeam_channel::Receiver<BitcoinChainhookSpecification>,
observer_command_tx: Sender<ObserverCommand>,
ctx: &Context,
) {
let bitcoin_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_bitcoin_scans);
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx.clone();
bitcoin_scan_pool.execute(move || {
let op = scan_bitcoin_chainstate_via_rpc_using_predicate(
&predicate_spec,
&moved_config,
&moved_ctx,
);
match hiro_system_kit::nestable_block_on(op) {
Ok(_) => {}
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
return;
}
};
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Bitcoin(predicate_spec),
));
});
}
let res = bitcoin_scan_pool.join();
}

View File

@@ -1,7 +1,7 @@
use std::path::PathBuf;
use chainhook_event_observer::{rocksdb::Options, rocksdb::DB, utils::Context};
use chainhook_types::{BlockIdentifier, StacksBlockData};
use chainhook_types::{BlockIdentifier, StacksBlockData, StacksBlockUpdate};
fn get_db_default_options() -> Options {
let mut opts = Options::default();
@@ -73,10 +73,22 @@ fn get_block_key(block_identifier: &BlockIdentifier) -> [u8; 12] {
key
}
fn get_last_insert_key() -> [u8; 3] {
fn get_unconfirmed_block_key(block_identifier: &BlockIdentifier) -> [u8; 12] {
let mut key = [0u8; 12];
key[..2].copy_from_slice(b"~:");
key[2..10].copy_from_slice(&block_identifier.index.to_be_bytes());
key[10..].copy_from_slice(b":d");
key
}
fn get_last_confirmed_insert_key() -> [u8; 3] {
*b"m:t"
}
fn get_last_unconfirmed_insert_key() -> [u8; 3] {
*b"m:~"
}
pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, _ctx: &Context) {
let key = get_block_key(&block.block_identifier);
let block_bytes = json!(block);
@@ -85,15 +97,42 @@ pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB,
.expect("unable to insert blocks");
stacks_db_rw
.put(
get_last_insert_key(),
get_last_confirmed_insert_key(),
block.block_identifier.index.to_be_bytes(),
)
.expect("unable to insert metadata");
}
pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option<u64> {
pub fn insert_unconfirmed_entry_in_stacks_blocks(
block: &StacksBlockData,
stacks_db_rw: &DB,
_ctx: &Context,
) {
let key = get_unconfirmed_block_key(&block.block_identifier);
let block_bytes = json!(block);
stacks_db_rw
.put(&key, &block_bytes.to_string().as_bytes())
.expect("unable to insert blocks");
stacks_db_rw
.put(
get_last_unconfirmed_insert_key(),
block.block_identifier.index.to_be_bytes(),
)
.expect("unable to insert metadata");
}
pub fn delete_unconfirmed_entry_from_stacks_blocks(
block_identifier: &BlockIdentifier,
stacks_db_rw: &DB,
_ctx: &Context,
) {
let key = get_unconfirmed_block_key(&block_identifier);
stacks_db_rw.delete(&key).expect("unable to delete blocks");
}
pub fn get_last_unconfirmed_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option<u64> {
stacks_db
.get(get_last_insert_key())
.get(get_last_unconfirmed_insert_key())
.unwrap_or(None)
.and_then(|bytes| {
Some(u64::from_be_bytes([
@@ -102,27 +141,55 @@ pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option<
})
}
pub fn insert_entries_in_stacks_blocks(
pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option<u64> {
stacks_db
.get(get_last_confirmed_insert_key())
.unwrap_or(None)
.and_then(|bytes| {
Some(u64::from_be_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]))
})
}
pub fn confirm_entries_in_stacks_blocks(
blocks: &Vec<StacksBlockData>,
stacks_db_rw: &DB,
ctx: &Context,
) {
for block in blocks.iter() {
insert_entry_in_stacks_blocks(block, stacks_db_rw, ctx);
delete_unconfirmed_entry_from_stacks_blocks(&block.block_identifier, stacks_db_rw, ctx);
}
}
pub fn draft_entries_in_stacks_blocks(
block_updates: &Vec<StacksBlockUpdate>,
stacks_db_rw: &DB,
ctx: &Context,
) {
for update in block_updates.iter() {
// TODO: Could be imperfect, from a microblock point of view
insert_unconfirmed_entry_in_stacks_blocks(&update.block, stacks_db_rw, ctx);
}
}
pub fn get_stacks_block_at_block_height(
block_height: u64,
confirmed: bool,
retry: u8,
stacks_db: &DB,
) -> Result<Option<StacksBlockData>, String> {
let mut attempt = 0;
loop {
match stacks_db.get(get_block_key(&BlockIdentifier {
let block_identifier = &BlockIdentifier {
hash: "".to_string(),
index: block_height,
})) {
};
match stacks_db.get(match confirmed {
true => get_block_key(block_identifier),
false => get_unconfirmed_block_key(block_identifier),
}) {
Ok(Some(entry)) => {
return Ok(Some({
let spec: StacksBlockData =