feat: migrate stacks scans to rocksdb

This commit is contained in:
Ludo Galabru
2023-06-04 23:58:05 -04:00
parent bc313fad5c
commit 4408b1e7ec
6 changed files with 382 additions and 203 deletions

27
Cargo.lock generated
View File

@@ -239,9 +239,9 @@ checksum = "383d29d513d8764dcdc42ea295d979eb99c3c9f00607b3692cf68a431f7dca72"
[[package]]
name = "bindgen"
version = "0.64.0"
version = "0.65.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4"
checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5"
dependencies = [
"bitflags 1.3.2",
"cexpr",
@@ -249,12 +249,13 @@ dependencies = [
"lazy_static",
"lazycell",
"peeking_take_while",
"prettyplease",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 1.0.105",
"syn 2.0.18",
]
[[package]]
@@ -461,7 +462,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chainhook"
version = "0.13.0"
version = "0.14.0"
dependencies = [
"ansi_term",
"atty",
@@ -2124,9 +2125,9 @@ dependencies = [
[[package]]
name = "librocksdb-sys"
version = "0.10.0+7.9.2"
version = "0.11.0+8.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fe4d5874f5ff2bc616e55e8c6086d478fcda13faf9495768a4aa1c22042d30b"
checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e"
dependencies = [
"bindgen",
"bzip2-sys",
@@ -2870,6 +2871,16 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "prettyplease"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b69d39aab54d069e7f2fe8cb970493e7834601ca2d8c65fd7bbd183578080d1"
dependencies = [
"proc-macro2",
"syn 2.0.18",
]
[[package]]
name = "prettytable-rs"
version = "0.10.0"
@@ -3375,9 +3386,9 @@ dependencies = [
[[package]]
name = "rocksdb"
version = "0.20.1"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "015439787fce1e75d55f279078d33ff14b4af5d93d995e8838ee4631301c8a99"
checksum = "bb6f170a4041d50a0ce04b0d2e14916d6ca863ea2e422689a5b694395d299ffe"
dependencies = [
"libc",
"librocksdb-sys",

View File

@@ -195,35 +195,41 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont
Ok(response) => response.bytes().await,
Err(e) => Err(e),
};
match (local_sha_file, remote_sha_file) {
let should_download = match (local_sha_file, remote_sha_file) {
(Ok(local), Ok(remote_response)) => {
println!("{:?}", local);
println!("{:?}", remote_response);
}
(Ok(local), _) => {
// println!("Local: {:?}", local)
let cache_invalidated = remote_response.starts_with(&local[0..32]) == false;
if cache_invalidated {
info!(
ctx.expect_logger(),
"More recent Stacks archive file detected"
);
}
cache_invalidated
}
(_, _) => {
// We will download the latest file
println!("error reading local / remote");
info!(
ctx.expect_logger(),
"Unable to retrieve Stacks archive file locally"
);
true
}
}
if !tsv_file_path.exists() {
info!(ctx.expect_logger(), "Downloading {}", url);
match download_tsv_file(&config).await {
Ok(_) => {}
Err(e) => {
error!(ctx.expect_logger(), "{}", e);
std::process::exit(1);
}
}
} else {
};
if !should_download {
info!(
ctx.expect_logger(),
"Building in-memory chainstate from file {}",
tsv_file_path.display()
"Stacks archive file already up to date"
);
config.add_local_stacks_tsv_source(&tsv_file_path);
return false;
}
info!(ctx.expect_logger(), "Downloading {}", url);
match download_tsv_file(&config).await {
Ok(_) => {}
Err(e) => {
error!(ctx.expect_logger(), "{}", e);
std::process::exit(1);
}
}
config.add_local_stacks_tsv_source(&tsv_file_path);
}

View File

@@ -4,11 +4,14 @@ use crate::{
archive::download_stacks_dataset_if_required,
block::{Record, RecordKind},
config::Config,
storage::{
insert_entry_in_stacks_blocks, is_stacks_block_present, open_readwrite_stacks_db_conn, get_last_block_height_inserted, get_stacks_block_at_block_height,
},
};
use chainhook_event_observer::{
chainhooks::stacks::evaluate_stacks_chainhook_on_blocks,
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
utils::Context,
utils::Context, rocksdb::DB,
};
use chainhook_event_observer::{
chainhooks::{
@@ -19,6 +22,186 @@ use chainhook_event_observer::{
};
use chainhook_types::BlockIdentifier;
pub async fn get_canonical_fork_from_tsv(
config: &mut Config,
ctx: &Context,
) -> Result<VecDeque<(BlockIdentifier, BlockIdentifier, String)>, String> {
let seed_tsv_path = config.expected_local_stacks_tsv_file().clone();
let (record_tx, record_rx) = std::sync::mpsc::channel();
let start_block = 0;
let parsing_handle = hiro_system_kit::thread_named("Stacks chainstate CSV parsing")
.spawn(move || {
let mut reader_builder = csv::ReaderBuilder::default()
.has_headers(false)
.delimiter(b'\t')
.buffer_capacity(8 * (1 << 10))
.from_path(&seed_tsv_path)
.expect("unable to create csv reader");
for result in reader_builder.deserialize() {
let record: Record = result.unwrap();
match &record.kind {
RecordKind::StacksBlockReceived => match record_tx.send(Some(record)) {
Err(_e) => {
break;
}
_ => {}
},
_ => {}
};
}
let _ = record_tx.send(None);
})
.expect("unable to spawn thread");
let canonical_fork = {
let mut cursor = BlockIdentifier::default();
let mut dump = HashMap::new();
while let Ok(Some(mut record)) = record_rx.recv() {
let (block_identifier, parent_block_identifier) = match (&record.kind, &record.blob) {
(RecordKind::StacksBlockReceived, Some(blob)) => {
match standardize_stacks_serialized_block_header(&blob) {
Ok(data) => data,
Err(e) => {
error!(ctx.expect_logger(), "{e}");
continue;
}
}
}
_ => unreachable!(),
};
if start_block > block_identifier.index {
continue;
}
if block_identifier.index > cursor.index {
cursor = block_identifier.clone(); // todo(lgalabru)
}
dump.insert(
block_identifier,
(parent_block_identifier, record.blob.take().unwrap()),
);
}
let mut canonical_fork = VecDeque::new();
while cursor.index > 0 {
let (block_identifer, (parent_block_identifier, blob)) =
match dump.remove_entry(&cursor) {
Some(entry) => entry,
None => break,
};
cursor = parent_block_identifier.clone(); // todo(lgalabru)
canonical_fork.push_front((block_identifer, parent_block_identifier, blob));
}
canonical_fork
};
let _ = parsing_handle.join();
Ok(canonical_fork)
}
pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
predicate_spec: &StacksChainhookSpecification,
stacks_db_conn: &DB,
config: &Config,
ctx: &Context,
) -> Result<BlockIdentifier, String> {
let start_block = match predicate_spec.start_block {
Some(start_block) => start_block,
None => {
return Err(
"Chainhook specification must include fields 'start_block' when using the scan command"
.into(),
);
}
};
let end_block = match predicate_spec.end_block {
Some(end_block) => end_block,
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Some(end_block) => end_block,
None => {
return Err(
"Chainhook specification must include fields 'start_block' when using the scan command"
.into(),
);
}
}
};
let proofs = HashMap::new();
let mut actions_triggered = 0;
let mut blocks_scanned = 0;
info!(
ctx.expect_logger(),
"Starting predicate evaluation on Stacks blocks"
);
let mut last_block_scanned = BlockIdentifier::default();
let mut err_count = 0;
for cursor in start_block..=end_block {
let block_data = match get_stacks_block_at_block_height(cursor, 3, stacks_db_conn) {
Ok(Some(block)) => block,
Ok(None) => unimplemented!(),
Err(_) => unimplemented!(),
};
last_block_scanned = block_data.block_identifier.clone();
blocks_scanned += 1;
let blocks: Vec<&dyn AbstractStacksBlock> = vec![&block_data];
let hits_per_blocks = evaluate_stacks_chainhook_on_blocks(blocks, &predicate_spec, ctx);
if hits_per_blocks.is_empty() {
continue;
}
let trigger = StacksTriggerChainhook {
chainhook: &predicate_spec,
apply: hits_per_blocks,
rollback: vec![],
};
match handle_stacks_hook_action(trigger, &proofs, &ctx) {
Err(e) => {
error!(ctx.expect_logger(), "unable to handle action {}", e);
}
Ok(action) => {
actions_triggered += 1;
let res = match action {
StacksChainhookOccurrence::Http(request) => {
send_request(request, 3, 1, &ctx).await
}
StacksChainhookOccurrence::File(path, bytes) => file_append(path, bytes, &ctx),
StacksChainhookOccurrence::Data(_payload) => unreachable!(),
};
if res.is_err() {
err_count += 1;
} else {
err_count = 0;
}
}
}
// We abort after 3 consecutive errors
if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
}
info!(
ctx.expect_logger(),
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
);
Ok(last_block_scanned)
}
pub async fn scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec: &StacksChainhookSpecification,
config: &mut Config,
@@ -36,92 +219,10 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
let _ = download_stacks_dataset_if_required(config, ctx).await;
let seed_tsv_path = config.expected_local_stacks_tsv_file().clone();
let (record_tx, record_rx) = std::sync::mpsc::channel();
let _parsing_handle = std::thread::spawn(move || {
let mut reader_builder = csv::ReaderBuilder::default()
.has_headers(false)
.delimiter(b'\t')
.buffer_capacity(8 * (1 << 10))
.from_path(&seed_tsv_path)
.expect("unable to create csv reader");
for result in reader_builder.deserialize() {
// Notice that we need to provide a type hint for automatic
// deserialization.
let record: Record = result.unwrap();
match &record.kind {
RecordKind::StacksBlockReceived => {
match record_tx.send(Some(record)) {
Err(_e) => {
// Abord the traversal once the receiver closed
break;
}
_ => {}
}
}
// RecordKind::BitcoinBlockReceived => {
// let _ = bitcoin_record_tx.send(Some(record));
// }
// RecordKind::StacksMicroblockReceived => {
// let _ = stacks_record_tx.send(Some(record));
// },
_ => {}
};
}
let _ = record_tx.send(None);
});
let mut canonical_fork = get_canonical_fork_from_tsv(config, ctx).await?;
let mut indexer = Indexer::new(config.network.clone());
let mut canonical_fork = {
let mut cursor = BlockIdentifier::default();
let mut dump = HashMap::new();
while let Ok(Some(record)) = record_rx.recv() {
let (block_identifier, parent_block_identifier) = match &record.kind {
RecordKind::StacksBlockReceived => {
match standardize_stacks_serialized_block_header(&record.raw_log) {
Ok(data) => data,
Err(e) => {
error!(ctx.expect_logger(), "{e}");
continue;
}
}
}
_ => unreachable!(),
};
if start_block > block_identifier.index {
continue;
}
if let Some(end_block) = predicate_spec.end_block {
if block_identifier.index > end_block {
break;
}
}
if block_identifier.index > cursor.index {
cursor = block_identifier.clone(); // todo(lgalabru)
}
dump.insert(block_identifier, (parent_block_identifier, record.raw_log));
}
let mut canonical_fork = VecDeque::new();
while cursor.index > 0 {
let (block_identifer, (parent_block_identifier, blob)) =
match dump.remove_entry(&cursor) {
Some(entry) => entry,
None => break,
};
cursor = parent_block_identifier.clone(); // todo(lgalabru)
canonical_fork.push_front((block_identifer, parent_block_identifier, blob));
}
canonical_fork
};
let proofs = HashMap::new();
let mut actions_triggered = 0;
@@ -192,3 +293,67 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
Ok(last_block_scanned)
}
pub async fn consolidate_local_stacks_chainstate_using_csv(
config: &mut Config,
ctx: &Context,
) -> Result<(), String> {
info!(
ctx.expect_logger(),
"Building local chainstate from Stacks archive file"
);
let new_archive_to_process = download_stacks_dataset_if_required(config, ctx).await;
// Nothing to do - early return
if !new_archive_to_process {
return Ok(());
}
let mut canonical_fork = get_canonical_fork_from_tsv(config, ctx).await?;
let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
let mut blocks_read = 0;
let blocks_to_insert = canonical_fork.len();
let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?;
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
blocks_read += 1;
// If blocks already stored, move on
if is_stacks_block_present(&block_identifier, 3, &stacks_db_rw) {
continue;
}
blocks_inserted += 1;
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
&blob,
&mut indexer.stacks_context,
ctx,
) {
Ok(block) => block,
Err(e) => {
error!(&ctx.expect_logger(), "{e}");
continue;
}
};
// TODO: return a result
insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx);
if blocks_inserted % 2500 == 0 {
info!(
ctx.expect_logger(),
"Importing Stacks blocks: {}/{}", blocks_read, blocks_to_insert
);
let _ = stacks_db_rw.flush();
}
}
let _ = stacks_db_rw.flush();
info!(
ctx.expect_logger(),
"{blocks_read} Stacks blocks read, {blocks_inserted} inserted"
);
Ok(())
}

View File

@@ -1,6 +1,9 @@
use crate::config::Config;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate;
use crate::scan::stacks::scan_stacks_chainstate_via_csv_using_predicate;
use crate::scan::stacks::{
consolidate_local_stacks_chainstate_using_csv, scan_stacks_chainstate_via_csv_using_predicate, scan_stacks_chainstate_via_rocksdb_using_predicate,
};
use crate::storage::{insert_entries_in_stacks_blocks, open_readwrite_stacks_db_conn, open_readonly_stacks_db_conn};
use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
@@ -9,17 +12,12 @@ use chainhook_event_observer::observer::{
start_event_observer, ApiKey, ObserverCommand, ObserverEvent,
};
use chainhook_event_observer::utils::Context;
use chainhook_types::{BitcoinBlockSignaling, StacksBlockData, StacksChainEvent};
use redis::{Commands, Connection};
use chainhook_types::{BitcoinBlockSignaling, StacksChainEvent};
use redis::Commands;
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 1;
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 12;
pub struct Service {
config: Config,
ctx: Context,
@@ -103,25 +101,7 @@ impl Service {
event_observer_config.chainhook_config = Some(chainhook_config);
event_observer_config.ordinals_enabled = !hord_disabled;
info!(
self.ctx.expect_logger(),
"Listening on port {} for Stacks chain events", event_observer_config.ingestion_port
);
match event_observer_config.bitcoin_block_signaling {
BitcoinBlockSignaling::ZeroMQ(ref url) => {
info!(
self.ctx.expect_logger(),
"Observing Bitcoin chain events via ZeroMQ: {}", url
);
}
BitcoinBlockSignaling::Stacks(ref _url) => {
info!(
self.ctx.expect_logger(),
"Observing Bitcoin chain events via Stacks node"
);
}
}
// Enable HTTP Chainhook API, if required
if self.config.chainhooks.enable_http_api {
info!(
self.ctx.expect_logger(),
@@ -130,6 +110,10 @@ impl Service {
);
}
// Download and ingest a Stacks dump
let _ = consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await;
// Start chainhook event observer
let context_cloned = self.ctx.clone();
let event_observer_config_moved = event_observer_config.clone();
let observer_command_tx_moved = observer_command_tx.clone();
@@ -146,24 +130,40 @@ impl Service {
// Stacks scan operation threadpool
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
let stacks_scan_pool = ThreadPool::new(STACKS_SCAN_THREAD_POOL_SIZE);
let stacks_scan_pool = ThreadPool::new(self.config.chainhooks.max_stacks_concurrent_scans);
let ctx = self.ctx.clone();
let config = self.config.clone();
let observer_command_tx_moved = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("Stacks scan runloop")
.spawn(move || {
while let Ok((predicate_spec, api_key)) = stacks_scan_op_rx.recv() {
while let Ok((mut predicate_spec, api_key)) = stacks_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let mut moved_config = config.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx_moved.clone();
stacks_scan_pool.execute(move || {
let op = scan_stacks_chainstate_via_csv_using_predicate(
let stacks_db_conn = match open_readonly_stacks_db_conn(
&moved_config.expected_cache_path(),
&moved_ctx,
) {
Ok(db_conn) => db_conn,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"unable to store stacks block: {}",
e.to_string()
);
unimplemented!()
}
};
let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
&mut moved_config,
&stacks_db_conn,
&moved_config,
&moved_ctx,
);
let last_block_in_csv = match hiro_system_kit::nestable_block_on(op) {
Ok(last_block_in_csv) => last_block_in_csv,
let last_block_scanned = match hiro_system_kit::nestable_block_on(op) {
Ok(last_block_scanned) => last_block_scanned,
Err(e) => {
error!(
moved_ctx.expect_logger(),
@@ -175,8 +175,9 @@ impl Service {
info!(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}",
last_block_in_csv.index
last_block_scanned.index
);
predicate_spec.end_block = Some(last_block_scanned.index);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
api_key,
@@ -228,6 +229,26 @@ impl Service {
})
.expect("unable to spawn thread");
info!(
self.ctx.expect_logger(),
"Listening on port {} for Stacks chain events", event_observer_config.ingestion_port
);
match event_observer_config.bitcoin_block_signaling {
BitcoinBlockSignaling::ZeroMQ(ref url) => {
info!(
self.ctx.expect_logger(),
"Observing Bitcoin chain events via ZeroMQ: {}", url
);
}
BitcoinBlockSignaling::Stacks(ref _url) => {
info!(
self.ctx.expect_logger(),
"Observing Bitcoin chain events via Stacks node"
);
}
}
let mut stacks_event = 0;
loop {
let event = match observer_event_rx.recv() {
Ok(cmd) => cmd,
@@ -290,24 +311,48 @@ impl Service {
debug!(self.ctx.expect_logger(), "Bitcoin update not stored");
}
ObserverEvent::StacksChainEvent(chain_event) => {
let stacks_db_conn_rw = match open_readwrite_stacks_db_conn(
&self.config.expected_cache_path(),
&self.ctx,
) {
Ok(db_conn) => db_conn,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to store stacks block: {}",
e.to_string()
);
continue;
}
};
match &chain_event {
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
update_storage_with_confirmed_stacks_blocks(
&mut redis_con,
stacks_event += 1;
insert_entries_in_stacks_blocks(
&data.confirmed_blocks,
&stacks_db_conn_rw,
&self.ctx,
);
}
StacksChainEvent::ChainUpdatedWithReorg(data) => {
update_storage_with_confirmed_stacks_blocks(
&mut redis_con,
insert_entries_in_stacks_blocks(
&data.confirmed_blocks,
&stacks_db_conn_rw,
&self.ctx,
);
}
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
};
// Every 32 blocks, we will check if there's a new Stacks file archive to ingest
if stacks_event > 32 {
stacks_event = 0;
let _ = consolidate_local_stacks_chainstate_using_csv(
&mut self.config,
&self.ctx,
)
.await;
}
}
ObserverEvent::Terminate => {
info!(self.ctx.expect_logger(), "Terminating runloop");
@@ -320,54 +365,6 @@ impl Service {
}
}
fn update_storage_with_confirmed_stacks_blocks(
redis_con: &mut Connection,
blocks: &Vec<StacksBlockData>,
ctx: &Context,
) {
let current_tip_height: u64 = redis_con.get(&format!("stx:tip")).unwrap_or(0);
let mut new_tip = None;
for block in blocks.iter() {
let res: Result<(), redis::RedisError> = redis_con.hset_multiple(
&format!("stx:{}", block.block_identifier.index),
&[
(
"block_identifier",
json!(block.block_identifier).to_string(),
),
(
"parent_block_identifier",
json!(block.parent_block_identifier).to_string(),
),
("transactions", json!(block.transactions).to_string()),
("metadata", json!(block.metadata).to_string()),
],
);
if let Err(error) = res {
crit!(
ctx.expect_logger(),
"unable to archive block {}: {}",
block.block_identifier,
error.to_string()
);
}
if block.block_identifier.index >= current_tip_height {
new_tip = Some(block);
}
}
if let Some(block) = new_tip {
info!(
ctx.expect_logger(),
"Archiving confirmed Stacks chain block {}", block.block_identifier
);
let _: Result<(), redis::RedisError> =
redis_con.set(&format!("stx:tip"), block.block_identifier.index);
}
}
fn load_predicates_from_redis(
config: &Config,
ctx: &Context,

View File

@@ -54,7 +54,7 @@ dashmap = "5.4.0"
fxhash = "0.2.1"
[dependencies.rocksdb]
version = "0.20.1"
version = "0.21.0"
default-features = false
optional = true
features = ["lz4", "snappy"]

View File

@@ -15,4 +15,4 @@ bitcoind_rpc_password = "devnet"
stacks_node_rpc_url = "http://localhost:20443"
[[event_source]]
tsv_file_url = "https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest.gz"
tsv_file_url = "https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest"