mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-16 18:19:17 +08:00
feat: migrate stacks scans to rocksdb
This commit is contained in:
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -239,9 +239,9 @@ checksum = "383d29d513d8764dcdc42ea295d979eb99c3c9f00607b3692cf68a431f7dca72"
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.64.0"
|
||||
version = "0.65.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4"
|
||||
checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"cexpr",
|
||||
@@ -249,12 +249,13 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"lazycell",
|
||||
"peeking_take_while",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
"rustc-hash",
|
||||
"shlex",
|
||||
"syn 1.0.105",
|
||||
"syn 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -461,7 +462,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "chainhook"
|
||||
version = "0.13.0"
|
||||
version = "0.14.0"
|
||||
dependencies = [
|
||||
"ansi_term",
|
||||
"atty",
|
||||
@@ -2124,9 +2125,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "librocksdb-sys"
|
||||
version = "0.10.0+7.9.2"
|
||||
version = "0.11.0+8.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fe4d5874f5ff2bc616e55e8c6086d478fcda13faf9495768a4aa1c22042d30b"
|
||||
checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e"
|
||||
dependencies = [
|
||||
"bindgen",
|
||||
"bzip2-sys",
|
||||
@@ -2870,6 +2871,16 @@ version = "0.2.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||
|
||||
[[package]]
|
||||
name = "prettyplease"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b69d39aab54d069e7f2fe8cb970493e7834601ca2d8c65fd7bbd183578080d1"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"syn 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prettytable-rs"
|
||||
version = "0.10.0"
|
||||
@@ -3375,9 +3386,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rocksdb"
|
||||
version = "0.20.1"
|
||||
version = "0.21.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "015439787fce1e75d55f279078d33ff14b4af5d93d995e8838ee4631301c8a99"
|
||||
checksum = "bb6f170a4041d50a0ce04b0d2e14916d6ca863ea2e422689a5b694395d299ffe"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"librocksdb-sys",
|
||||
|
||||
@@ -195,35 +195,41 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont
|
||||
Ok(response) => response.bytes().await,
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
match (local_sha_file, remote_sha_file) {
|
||||
let should_download = match (local_sha_file, remote_sha_file) {
|
||||
(Ok(local), Ok(remote_response)) => {
|
||||
println!("{:?}", local);
|
||||
println!("{:?}", remote_response);
|
||||
}
|
||||
(Ok(local), _) => {
|
||||
// println!("Local: {:?}", local)
|
||||
let cache_invalidated = remote_response.starts_with(&local[0..32]) == false;
|
||||
if cache_invalidated {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"More recent Stacks archive file detected"
|
||||
);
|
||||
}
|
||||
cache_invalidated
|
||||
}
|
||||
(_, _) => {
|
||||
// We will download the latest file
|
||||
println!("error reading local / remote");
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Unable to retrieve Stacks archive file locally"
|
||||
);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
if !tsv_file_path.exists() {
|
||||
info!(ctx.expect_logger(), "Downloading {}", url);
|
||||
match download_tsv_file(&config).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!(ctx.expect_logger(), "{}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
};
|
||||
if !should_download {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Building in-memory chainstate from file {}",
|
||||
tsv_file_path.display()
|
||||
"Stacks archive file already up to date"
|
||||
);
|
||||
config.add_local_stacks_tsv_source(&tsv_file_path);
|
||||
return false;
|
||||
}
|
||||
|
||||
info!(ctx.expect_logger(), "Downloading {}", url);
|
||||
match download_tsv_file(&config).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!(ctx.expect_logger(), "{}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
config.add_local_stacks_tsv_source(&tsv_file_path);
|
||||
}
|
||||
|
||||
@@ -4,11 +4,14 @@ use crate::{
|
||||
archive::download_stacks_dataset_if_required,
|
||||
block::{Record, RecordKind},
|
||||
config::Config,
|
||||
storage::{
|
||||
insert_entry_in_stacks_blocks, is_stacks_block_present, open_readwrite_stacks_db_conn, get_last_block_height_inserted, get_stacks_block_at_block_height,
|
||||
},
|
||||
};
|
||||
use chainhook_event_observer::{
|
||||
chainhooks::stacks::evaluate_stacks_chainhook_on_blocks,
|
||||
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
|
||||
utils::Context,
|
||||
utils::Context, rocksdb::DB,
|
||||
};
|
||||
use chainhook_event_observer::{
|
||||
chainhooks::{
|
||||
@@ -19,6 +22,186 @@ use chainhook_event_observer::{
|
||||
};
|
||||
use chainhook_types::BlockIdentifier;
|
||||
|
||||
pub async fn get_canonical_fork_from_tsv(
|
||||
config: &mut Config,
|
||||
ctx: &Context,
|
||||
) -> Result<VecDeque<(BlockIdentifier, BlockIdentifier, String)>, String> {
|
||||
let seed_tsv_path = config.expected_local_stacks_tsv_file().clone();
|
||||
|
||||
let (record_tx, record_rx) = std::sync::mpsc::channel();
|
||||
|
||||
let start_block = 0;
|
||||
|
||||
let parsing_handle = hiro_system_kit::thread_named("Stacks chainstate CSV parsing")
|
||||
.spawn(move || {
|
||||
let mut reader_builder = csv::ReaderBuilder::default()
|
||||
.has_headers(false)
|
||||
.delimiter(b'\t')
|
||||
.buffer_capacity(8 * (1 << 10))
|
||||
.from_path(&seed_tsv_path)
|
||||
.expect("unable to create csv reader");
|
||||
|
||||
for result in reader_builder.deserialize() {
|
||||
let record: Record = result.unwrap();
|
||||
match &record.kind {
|
||||
RecordKind::StacksBlockReceived => match record_tx.send(Some(record)) {
|
||||
Err(_e) => {
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
};
|
||||
}
|
||||
let _ = record_tx.send(None);
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
let canonical_fork = {
|
||||
let mut cursor = BlockIdentifier::default();
|
||||
let mut dump = HashMap::new();
|
||||
|
||||
while let Ok(Some(mut record)) = record_rx.recv() {
|
||||
let (block_identifier, parent_block_identifier) = match (&record.kind, &record.blob) {
|
||||
(RecordKind::StacksBlockReceived, Some(blob)) => {
|
||||
match standardize_stacks_serialized_block_header(&blob) {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
error!(ctx.expect_logger(), "{e}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
if start_block > block_identifier.index {
|
||||
continue;
|
||||
}
|
||||
|
||||
if block_identifier.index > cursor.index {
|
||||
cursor = block_identifier.clone(); // todo(lgalabru)
|
||||
}
|
||||
dump.insert(
|
||||
block_identifier,
|
||||
(parent_block_identifier, record.blob.take().unwrap()),
|
||||
);
|
||||
}
|
||||
|
||||
let mut canonical_fork = VecDeque::new();
|
||||
while cursor.index > 0 {
|
||||
let (block_identifer, (parent_block_identifier, blob)) =
|
||||
match dump.remove_entry(&cursor) {
|
||||
Some(entry) => entry,
|
||||
None => break,
|
||||
};
|
||||
cursor = parent_block_identifier.clone(); // todo(lgalabru)
|
||||
canonical_fork.push_front((block_identifer, parent_block_identifier, blob));
|
||||
}
|
||||
canonical_fork
|
||||
};
|
||||
let _ = parsing_handle.join();
|
||||
|
||||
Ok(canonical_fork)
|
||||
}
|
||||
|
||||
pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
|
||||
predicate_spec: &StacksChainhookSpecification,
|
||||
stacks_db_conn: &DB,
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
) -> Result<BlockIdentifier, String> {
|
||||
|
||||
let start_block = match predicate_spec.start_block {
|
||||
Some(start_block) => start_block,
|
||||
None => {
|
||||
return Err(
|
||||
"Chainhook specification must include fields 'start_block' when using the scan command"
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let end_block = match predicate_spec.end_block {
|
||||
Some(end_block) => end_block,
|
||||
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
|
||||
Some(end_block) => end_block,
|
||||
None => {
|
||||
return Err(
|
||||
"Chainhook specification must include fields 'start_block' when using the scan command"
|
||||
.into(),
|
||||
);
|
||||
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let proofs = HashMap::new();
|
||||
|
||||
let mut actions_triggered = 0;
|
||||
let mut blocks_scanned = 0;
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Starting predicate evaluation on Stacks blocks"
|
||||
);
|
||||
let mut last_block_scanned = BlockIdentifier::default();
|
||||
let mut err_count = 0;
|
||||
for cursor in start_block..=end_block {
|
||||
|
||||
let block_data = match get_stacks_block_at_block_height(cursor, 3, stacks_db_conn) {
|
||||
Ok(Some(block)) => block,
|
||||
Ok(None) => unimplemented!(),
|
||||
Err(_) => unimplemented!(),
|
||||
};
|
||||
last_block_scanned = block_data.block_identifier.clone();
|
||||
blocks_scanned += 1;
|
||||
|
||||
let blocks: Vec<&dyn AbstractStacksBlock> = vec![&block_data];
|
||||
|
||||
let hits_per_blocks = evaluate_stacks_chainhook_on_blocks(blocks, &predicate_spec, ctx);
|
||||
if hits_per_blocks.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let trigger = StacksTriggerChainhook {
|
||||
chainhook: &predicate_spec,
|
||||
apply: hits_per_blocks,
|
||||
rollback: vec![],
|
||||
};
|
||||
match handle_stacks_hook_action(trigger, &proofs, &ctx) {
|
||||
Err(e) => {
|
||||
error!(ctx.expect_logger(), "unable to handle action {}", e);
|
||||
}
|
||||
Ok(action) => {
|
||||
actions_triggered += 1;
|
||||
let res = match action {
|
||||
StacksChainhookOccurrence::Http(request) => {
|
||||
send_request(request, 3, 1, &ctx).await
|
||||
}
|
||||
StacksChainhookOccurrence::File(path, bytes) => file_append(path, bytes, &ctx),
|
||||
StacksChainhookOccurrence::Data(_payload) => unreachable!(),
|
||||
};
|
||||
if res.is_err() {
|
||||
err_count += 1;
|
||||
} else {
|
||||
err_count = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
// We abort after 3 consecutive errors
|
||||
if err_count >= 3 {
|
||||
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
|
||||
}
|
||||
}
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
|
||||
);
|
||||
|
||||
Ok(last_block_scanned)
|
||||
}
|
||||
|
||||
|
||||
pub async fn scan_stacks_chainstate_via_csv_using_predicate(
|
||||
predicate_spec: &StacksChainhookSpecification,
|
||||
config: &mut Config,
|
||||
@@ -36,92 +219,10 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
|
||||
|
||||
let _ = download_stacks_dataset_if_required(config, ctx).await;
|
||||
|
||||
let seed_tsv_path = config.expected_local_stacks_tsv_file().clone();
|
||||
|
||||
let (record_tx, record_rx) = std::sync::mpsc::channel();
|
||||
|
||||
let _parsing_handle = std::thread::spawn(move || {
|
||||
let mut reader_builder = csv::ReaderBuilder::default()
|
||||
.has_headers(false)
|
||||
.delimiter(b'\t')
|
||||
.buffer_capacity(8 * (1 << 10))
|
||||
.from_path(&seed_tsv_path)
|
||||
.expect("unable to create csv reader");
|
||||
|
||||
for result in reader_builder.deserialize() {
|
||||
// Notice that we need to provide a type hint for automatic
|
||||
// deserialization.
|
||||
let record: Record = result.unwrap();
|
||||
match &record.kind {
|
||||
RecordKind::StacksBlockReceived => {
|
||||
match record_tx.send(Some(record)) {
|
||||
Err(_e) => {
|
||||
// Abord the traversal once the receiver closed
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
// RecordKind::BitcoinBlockReceived => {
|
||||
// let _ = bitcoin_record_tx.send(Some(record));
|
||||
// }
|
||||
// RecordKind::StacksMicroblockReceived => {
|
||||
// let _ = stacks_record_tx.send(Some(record));
|
||||
// },
|
||||
_ => {}
|
||||
};
|
||||
}
|
||||
let _ = record_tx.send(None);
|
||||
});
|
||||
let mut canonical_fork = get_canonical_fork_from_tsv(config, ctx).await?;
|
||||
|
||||
let mut indexer = Indexer::new(config.network.clone());
|
||||
|
||||
let mut canonical_fork = {
|
||||
let mut cursor = BlockIdentifier::default();
|
||||
let mut dump = HashMap::new();
|
||||
|
||||
while let Ok(Some(record)) = record_rx.recv() {
|
||||
let (block_identifier, parent_block_identifier) = match &record.kind {
|
||||
RecordKind::StacksBlockReceived => {
|
||||
match standardize_stacks_serialized_block_header(&record.raw_log) {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
error!(ctx.expect_logger(), "{e}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
if start_block > block_identifier.index {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(end_block) = predicate_spec.end_block {
|
||||
if block_identifier.index > end_block {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if block_identifier.index > cursor.index {
|
||||
cursor = block_identifier.clone(); // todo(lgalabru)
|
||||
}
|
||||
dump.insert(block_identifier, (parent_block_identifier, record.raw_log));
|
||||
}
|
||||
|
||||
let mut canonical_fork = VecDeque::new();
|
||||
while cursor.index > 0 {
|
||||
let (block_identifer, (parent_block_identifier, blob)) =
|
||||
match dump.remove_entry(&cursor) {
|
||||
Some(entry) => entry,
|
||||
None => break,
|
||||
};
|
||||
cursor = parent_block_identifier.clone(); // todo(lgalabru)
|
||||
canonical_fork.push_front((block_identifer, parent_block_identifier, blob));
|
||||
}
|
||||
canonical_fork
|
||||
};
|
||||
let proofs = HashMap::new();
|
||||
|
||||
let mut actions_triggered = 0;
|
||||
@@ -192,3 +293,67 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
|
||||
|
||||
Ok(last_block_scanned)
|
||||
}
|
||||
|
||||
pub async fn consolidate_local_stacks_chainstate_using_csv(
|
||||
config: &mut Config,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Building local chainstate from Stacks archive file"
|
||||
);
|
||||
|
||||
let new_archive_to_process = download_stacks_dataset_if_required(config, ctx).await;
|
||||
// Nothing to do - early return
|
||||
if !new_archive_to_process {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut canonical_fork = get_canonical_fork_from_tsv(config, ctx).await?;
|
||||
|
||||
let mut indexer = Indexer::new(config.network.clone());
|
||||
let mut blocks_inserted = 0;
|
||||
let mut blocks_read = 0;
|
||||
let blocks_to_insert = canonical_fork.len();
|
||||
let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?;
|
||||
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
|
||||
blocks_read += 1;
|
||||
|
||||
// If blocks already stored, move on
|
||||
if is_stacks_block_present(&block_identifier, 3, &stacks_db_rw) {
|
||||
continue;
|
||||
}
|
||||
blocks_inserted += 1;
|
||||
|
||||
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
|
||||
&indexer.config,
|
||||
&blob,
|
||||
&mut indexer.stacks_context,
|
||||
ctx,
|
||||
) {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
error!(&ctx.expect_logger(), "{e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: return a result
|
||||
insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx);
|
||||
|
||||
if blocks_inserted % 2500 == 0 {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Importing Stacks blocks: {}/{}", blocks_read, blocks_to_insert
|
||||
);
|
||||
let _ = stacks_db_rw.flush();
|
||||
}
|
||||
}
|
||||
let _ = stacks_db_rw.flush();
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"{blocks_read} Stacks blocks read, {blocks_inserted} inserted"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use crate::config::Config;
|
||||
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate;
|
||||
use crate::scan::stacks::scan_stacks_chainstate_via_csv_using_predicate;
|
||||
use crate::scan::stacks::{
|
||||
consolidate_local_stacks_chainstate_using_csv, scan_stacks_chainstate_via_csv_using_predicate, scan_stacks_chainstate_via_rocksdb_using_predicate,
|
||||
};
|
||||
use crate::storage::{insert_entries_in_stacks_blocks, open_readwrite_stacks_db_conn, open_readonly_stacks_db_conn};
|
||||
|
||||
use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
|
||||
|
||||
@@ -9,17 +12,12 @@ use chainhook_event_observer::observer::{
|
||||
start_event_observer, ApiKey, ObserverCommand, ObserverEvent,
|
||||
};
|
||||
use chainhook_event_observer::utils::Context;
|
||||
use chainhook_types::{BitcoinBlockSignaling, StacksBlockData, StacksChainEvent};
|
||||
use redis::{Commands, Connection};
|
||||
use chainhook_types::{BitcoinBlockSignaling, StacksChainEvent};
|
||||
use redis::Commands;
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
pub const DEFAULT_INGESTION_PORT: u16 = 20455;
|
||||
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
|
||||
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 1;
|
||||
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 12;
|
||||
|
||||
pub struct Service {
|
||||
config: Config,
|
||||
ctx: Context,
|
||||
@@ -103,25 +101,7 @@ impl Service {
|
||||
event_observer_config.chainhook_config = Some(chainhook_config);
|
||||
event_observer_config.ordinals_enabled = !hord_disabled;
|
||||
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Listening on port {} for Stacks chain events", event_observer_config.ingestion_port
|
||||
);
|
||||
match event_observer_config.bitcoin_block_signaling {
|
||||
BitcoinBlockSignaling::ZeroMQ(ref url) => {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Observing Bitcoin chain events via ZeroMQ: {}", url
|
||||
);
|
||||
}
|
||||
BitcoinBlockSignaling::Stacks(ref _url) => {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Observing Bitcoin chain events via Stacks node"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Enable HTTP Chainhook API, if required
|
||||
if self.config.chainhooks.enable_http_api {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
@@ -130,6 +110,10 @@ impl Service {
|
||||
);
|
||||
}
|
||||
|
||||
// Download and ingest a Stacks dump
|
||||
let _ = consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await;
|
||||
|
||||
// Start chainhook event observer
|
||||
let context_cloned = self.ctx.clone();
|
||||
let event_observer_config_moved = event_observer_config.clone();
|
||||
let observer_command_tx_moved = observer_command_tx.clone();
|
||||
@@ -146,24 +130,40 @@ impl Service {
|
||||
|
||||
// Stacks scan operation threadpool
|
||||
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
|
||||
let stacks_scan_pool = ThreadPool::new(STACKS_SCAN_THREAD_POOL_SIZE);
|
||||
let stacks_scan_pool = ThreadPool::new(self.config.chainhooks.max_stacks_concurrent_scans);
|
||||
let ctx = self.ctx.clone();
|
||||
let config = self.config.clone();
|
||||
let observer_command_tx_moved = observer_command_tx.clone();
|
||||
let _ = hiro_system_kit::thread_named("Stacks scan runloop")
|
||||
.spawn(move || {
|
||||
while let Ok((predicate_spec, api_key)) = stacks_scan_op_rx.recv() {
|
||||
while let Ok((mut predicate_spec, api_key)) = stacks_scan_op_rx.recv() {
|
||||
let moved_ctx = ctx.clone();
|
||||
let mut moved_config = config.clone();
|
||||
let moved_config = config.clone();
|
||||
let observer_command_tx = observer_command_tx_moved.clone();
|
||||
stacks_scan_pool.execute(move || {
|
||||
let op = scan_stacks_chainstate_via_csv_using_predicate(
|
||||
let stacks_db_conn = match open_readonly_stacks_db_conn(
|
||||
&moved_config.expected_cache_path(),
|
||||
&moved_ctx,
|
||||
) {
|
||||
Ok(db_conn) => db_conn,
|
||||
Err(e) => {
|
||||
error!(
|
||||
moved_ctx.expect_logger(),
|
||||
"unable to store stacks block: {}",
|
||||
e.to_string()
|
||||
);
|
||||
unimplemented!()
|
||||
}
|
||||
};
|
||||
|
||||
let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
|
||||
&predicate_spec,
|
||||
&mut moved_config,
|
||||
&stacks_db_conn,
|
||||
&moved_config,
|
||||
&moved_ctx,
|
||||
);
|
||||
let last_block_in_csv = match hiro_system_kit::nestable_block_on(op) {
|
||||
Ok(last_block_in_csv) => last_block_in_csv,
|
||||
let last_block_scanned = match hiro_system_kit::nestable_block_on(op) {
|
||||
Ok(last_block_scanned) => last_block_scanned,
|
||||
Err(e) => {
|
||||
error!(
|
||||
moved_ctx.expect_logger(),
|
||||
@@ -175,8 +175,9 @@ impl Service {
|
||||
info!(
|
||||
moved_ctx.expect_logger(),
|
||||
"Stacks chainstate scan completed up to block: {}",
|
||||
last_block_in_csv.index
|
||||
last_block_scanned.index
|
||||
);
|
||||
predicate_spec.end_block = Some(last_block_scanned.index);
|
||||
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
|
||||
ChainhookSpecification::Stacks(predicate_spec),
|
||||
api_key,
|
||||
@@ -228,6 +229,26 @@ impl Service {
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Listening on port {} for Stacks chain events", event_observer_config.ingestion_port
|
||||
);
|
||||
match event_observer_config.bitcoin_block_signaling {
|
||||
BitcoinBlockSignaling::ZeroMQ(ref url) => {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Observing Bitcoin chain events via ZeroMQ: {}", url
|
||||
);
|
||||
}
|
||||
BitcoinBlockSignaling::Stacks(ref _url) => {
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Observing Bitcoin chain events via Stacks node"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let mut stacks_event = 0;
|
||||
loop {
|
||||
let event = match observer_event_rx.recv() {
|
||||
Ok(cmd) => cmd,
|
||||
@@ -290,24 +311,48 @@ impl Service {
|
||||
debug!(self.ctx.expect_logger(), "Bitcoin update not stored");
|
||||
}
|
||||
ObserverEvent::StacksChainEvent(chain_event) => {
|
||||
let stacks_db_conn_rw = match open_readwrite_stacks_db_conn(
|
||||
&self.config.expected_cache_path(),
|
||||
&self.ctx,
|
||||
) {
|
||||
Ok(db_conn) => db_conn,
|
||||
Err(e) => {
|
||||
error!(
|
||||
self.ctx.expect_logger(),
|
||||
"unable to store stacks block: {}",
|
||||
e.to_string()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match &chain_event {
|
||||
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
|
||||
update_storage_with_confirmed_stacks_blocks(
|
||||
&mut redis_con,
|
||||
stacks_event += 1;
|
||||
insert_entries_in_stacks_blocks(
|
||||
&data.confirmed_blocks,
|
||||
&stacks_db_conn_rw,
|
||||
&self.ctx,
|
||||
);
|
||||
}
|
||||
StacksChainEvent::ChainUpdatedWithReorg(data) => {
|
||||
update_storage_with_confirmed_stacks_blocks(
|
||||
&mut redis_con,
|
||||
insert_entries_in_stacks_blocks(
|
||||
&data.confirmed_blocks,
|
||||
&stacks_db_conn_rw,
|
||||
&self.ctx,
|
||||
);
|
||||
}
|
||||
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
|
||||
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
|
||||
};
|
||||
// Every 32 blocks, we will check if there's a new Stacks file archive to ingest
|
||||
if stacks_event > 32 {
|
||||
stacks_event = 0;
|
||||
let _ = consolidate_local_stacks_chainstate_using_csv(
|
||||
&mut self.config,
|
||||
&self.ctx,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
ObserverEvent::Terminate => {
|
||||
info!(self.ctx.expect_logger(), "Terminating runloop");
|
||||
@@ -320,54 +365,6 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
fn update_storage_with_confirmed_stacks_blocks(
|
||||
redis_con: &mut Connection,
|
||||
blocks: &Vec<StacksBlockData>,
|
||||
ctx: &Context,
|
||||
) {
|
||||
let current_tip_height: u64 = redis_con.get(&format!("stx:tip")).unwrap_or(0);
|
||||
|
||||
let mut new_tip = None;
|
||||
|
||||
for block in blocks.iter() {
|
||||
let res: Result<(), redis::RedisError> = redis_con.hset_multiple(
|
||||
&format!("stx:{}", block.block_identifier.index),
|
||||
&[
|
||||
(
|
||||
"block_identifier",
|
||||
json!(block.block_identifier).to_string(),
|
||||
),
|
||||
(
|
||||
"parent_block_identifier",
|
||||
json!(block.parent_block_identifier).to_string(),
|
||||
),
|
||||
("transactions", json!(block.transactions).to_string()),
|
||||
("metadata", json!(block.metadata).to_string()),
|
||||
],
|
||||
);
|
||||
if let Err(error) = res {
|
||||
crit!(
|
||||
ctx.expect_logger(),
|
||||
"unable to archive block {}: {}",
|
||||
block.block_identifier,
|
||||
error.to_string()
|
||||
);
|
||||
}
|
||||
if block.block_identifier.index >= current_tip_height {
|
||||
new_tip = Some(block);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(block) = new_tip {
|
||||
info!(
|
||||
ctx.expect_logger(),
|
||||
"Archiving confirmed Stacks chain block {}", block.block_identifier
|
||||
);
|
||||
let _: Result<(), redis::RedisError> =
|
||||
redis_con.set(&format!("stx:tip"), block.block_identifier.index);
|
||||
}
|
||||
}
|
||||
|
||||
fn load_predicates_from_redis(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
|
||||
@@ -54,7 +54,7 @@ dashmap = "5.4.0"
|
||||
fxhash = "0.2.1"
|
||||
|
||||
[dependencies.rocksdb]
|
||||
version = "0.20.1"
|
||||
version = "0.21.0"
|
||||
default-features = false
|
||||
optional = true
|
||||
features = ["lz4", "snappy"]
|
||||
|
||||
@@ -15,4 +15,4 @@ bitcoind_rpc_password = "devnet"
|
||||
stacks_node_rpc_url = "http://localhost:20443"
|
||||
|
||||
[[event_source]]
|
||||
tsv_file_url = "https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest.gz"
|
||||
tsv_file_url = "https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest"
|
||||
|
||||
Reference in New Issue
Block a user