mirror of
https://github.com/alexgo-io/bitcoin-indexer.git
synced 2026-06-14 08:29:31 +08:00
feat: simplify + improve coordination
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use crate::config::generator::generate_config;
|
||||
use crate::config::Config;
|
||||
use crate::core::pipeline::download_and_pipeline_blocks;
|
||||
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
|
||||
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
|
||||
use crate::core::pipeline::processors::start_inscription_indexing_processor;
|
||||
use crate::core::protocol::inscription_parsing::parse_ordinals_and_standardize_block;
|
||||
use crate::download::download_ordinals_dataset_if_required;
|
||||
@@ -12,8 +12,9 @@ use crate::db::{
|
||||
delete_data_in_hord_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
|
||||
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
|
||||
find_latest_inscription_block_height, find_lazy_block_at_block_height,
|
||||
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
|
||||
open_readwrite_hord_db_conn_rocks_db, initialize_hord_db, get_default_hord_db_file_path,
|
||||
get_default_hord_db_file_path, initialize_hord_db, open_readonly_hord_db_conn,
|
||||
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
|
||||
open_readwrite_hord_db_conn_rocks_db,
|
||||
};
|
||||
use chainhook_sdk::bitcoincore_rpc::{Auth, Client, RpcApi};
|
||||
use chainhook_sdk::chainhooks::types::HttpHook;
|
||||
@@ -263,7 +264,7 @@ struct StartCommand {
|
||||
enum HordDbCommand {
|
||||
/// Initialize a new hord db
|
||||
#[clap(name = "new", bin_name = "new")]
|
||||
New(SyncHordDbCommand),
|
||||
New(SyncHordDbCommand),
|
||||
/// Catch-up hord db
|
||||
#[clap(name = "sync", bin_name = "sync")]
|
||||
Sync(SyncHordDbCommand),
|
||||
@@ -482,8 +483,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
let mut total_inscriptions = 0;
|
||||
let mut total_transfers = 0;
|
||||
|
||||
let inscriptions_db_conn =
|
||||
initialize_hord_db(&config.expected_cache_path(), &ctx);
|
||||
let inscriptions_db_conn = initialize_hord_db(&config.expected_cache_path(), &ctx);
|
||||
while let Some(block_height) = block_range.pop_front() {
|
||||
let inscriptions =
|
||||
find_all_inscriptions_in_block(&block_height, &inscriptions_db_conn, &ctx);
|
||||
@@ -529,7 +529,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
if total_transfers == 0 && total_inscriptions == 0 {
|
||||
let db_file_path = get_default_hord_db_file_path(&config.expected_cache_path());
|
||||
warn!(ctx.expect_logger(), "No data available. Check the validity of the range being scanned and the validity of your local database {}", db_file_path.display());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Command::Scan(ScanCommand::Inscription(cmd)) => {
|
||||
@@ -637,20 +637,20 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
Command::Db(HordDbCommand::New(cmd)) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
initialize_hord_db(&config.expected_cache_path(), &ctx);
|
||||
},
|
||||
}
|
||||
Command::Db(HordDbCommand::Sync(cmd)) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
initialize_hord_db(&config.expected_cache_path(), &ctx);
|
||||
let service = Service::new(config, ctx.clone());
|
||||
service.update_state(None).await?;
|
||||
},
|
||||
}
|
||||
Command::Db(HordDbCommand::Repair(subcmd)) => match subcmd {
|
||||
RepairCommand::Blocks(cmd) => {
|
||||
let config = Config::default(false, false, false, &cmd.config_path)?;
|
||||
let mut hord_config = config.get_hord_config();
|
||||
hord_config.network_thread_max = cmd.network_threads;
|
||||
|
||||
let block_ingestion_processor = start_block_ingestion_processor(&config, ctx, None);
|
||||
let block_ingestion_processor = start_block_archiving_processor(&config, ctx, None);
|
||||
|
||||
download_and_pipeline_blocks(
|
||||
&config,
|
||||
@@ -658,7 +658,6 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
cmd.end_block,
|
||||
hord_config.first_inscription_height,
|
||||
Some(&block_ingestion_processor),
|
||||
Some(&block_ingestion_processor),
|
||||
10_000,
|
||||
&ctx,
|
||||
)
|
||||
@@ -677,7 +676,6 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
|
||||
cmd.start_block,
|
||||
cmd.end_block,
|
||||
hord_config.first_inscription_height,
|
||||
None,
|
||||
Some(&inscription_indexing_processor),
|
||||
10_000,
|
||||
&ctx,
|
||||
|
||||
@@ -155,9 +155,9 @@ impl Config {
|
||||
let bootstrap = match config_file.bootstrap {
|
||||
Some(bootstrap) => match bootstrap.download_url {
|
||||
Some(ref url) => BootstrapConfig::Download(url.to_string()),
|
||||
None => BootstrapConfig::Build
|
||||
}
|
||||
None => BootstrapConfig::Build
|
||||
None => BootstrapConfig::Build,
|
||||
},
|
||||
None => BootstrapConfig::Build,
|
||||
};
|
||||
|
||||
let config = Config {
|
||||
@@ -243,7 +243,7 @@ impl Config {
|
||||
pub fn should_bootstrap_through_download(&self) -> bool {
|
||||
match &self.bootstrap {
|
||||
BootstrapConfig::Build => false,
|
||||
BootstrapConfig::Download(_) => true
|
||||
BootstrapConfig::Download(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -267,7 +267,7 @@ impl Config {
|
||||
fn expected_remote_ordinals_sqlite_base_url(&self) -> &str {
|
||||
match &self.bootstrap {
|
||||
BootstrapConfig::Build => unreachable!(),
|
||||
BootstrapConfig::Download(url) => &url
|
||||
BootstrapConfig::Download(url) => &url,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -367,7 +367,9 @@ impl Config {
|
||||
working_dir: default_cache_path(),
|
||||
},
|
||||
http_api: PredicatesApi::Off,
|
||||
bootstrap: BootstrapConfig::Download(DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string()),
|
||||
bootstrap: BootstrapConfig::Download(
|
||||
DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string(),
|
||||
),
|
||||
limits: LimitsConfig {
|
||||
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
|
||||
max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE,
|
||||
|
||||
@@ -25,7 +25,9 @@ pub enum PostProcessorCommand {
|
||||
}
|
||||
|
||||
pub enum PostProcessorEvent {
|
||||
EmptyQueue,
|
||||
Started,
|
||||
Terminated,
|
||||
Expired,
|
||||
}
|
||||
|
||||
pub struct PostProcessorController {
|
||||
@@ -39,8 +41,7 @@ pub async fn download_and_pipeline_blocks(
|
||||
start_block: u64,
|
||||
end_block: u64,
|
||||
start_sequencing_blocks_at_height: u64,
|
||||
blocks_post_processor_pre_sequence: Option<&PostProcessorController>,
|
||||
blocks_post_processor_post_sequence: Option<&PostProcessorController>,
|
||||
blocks_post_processor: Option<&PostProcessorController>,
|
||||
speed: usize,
|
||||
ctx: &Context,
|
||||
) -> Result<(), String> {
|
||||
@@ -101,7 +102,7 @@ pub async fn download_and_pipeline_blocks(
|
||||
rx_thread_pool.push(rx);
|
||||
}
|
||||
|
||||
for rx in rx_thread_pool.into_iter() {
|
||||
for (thread_index, rx) in rx_thread_pool.into_iter().enumerate() {
|
||||
let block_compressed_tx_moved = block_compressed_tx.clone();
|
||||
let moved_ctx: Context = moved_ctx.clone();
|
||||
let moved_bitcoin_network = moved_bitcoin_network.clone();
|
||||
@@ -131,6 +132,8 @@ pub async fn download_and_pipeline_blocks(
|
||||
compressed_block,
|
||||
)));
|
||||
}
|
||||
moved_ctx
|
||||
.try_log(|logger| debug!(logger, "Exiting processing thread {thread_index}"));
|
||||
})
|
||||
.expect("unable to spawn thread");
|
||||
thread_pool_handles.push(handle);
|
||||
@@ -138,11 +141,7 @@ pub async fn download_and_pipeline_blocks(
|
||||
|
||||
let cloned_ctx = ctx.clone();
|
||||
|
||||
let blocks_post_processor_post_sequence_commands_tx = blocks_post_processor_post_sequence
|
||||
.as_ref()
|
||||
.and_then(|p| Some(p.commands_tx.clone()));
|
||||
|
||||
let blocks_post_processor_pre_sequence_commands_tx = blocks_post_processor_pre_sequence
|
||||
let blocks_post_processor_commands_tx = blocks_post_processor
|
||||
.as_ref()
|
||||
.and_then(|p| Some(p.commands_tx.clone()));
|
||||
|
||||
@@ -151,19 +150,52 @@ pub async fn download_and_pipeline_blocks(
|
||||
let mut inbox = HashMap::new();
|
||||
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
|
||||
let mut blocks_processed = 0;
|
||||
let mut pre_seq_processor_started = false;
|
||||
let mut post_seq_processor_started = false;
|
||||
let mut processor_started = false;
|
||||
let mut stop_runloop = false;
|
||||
|
||||
loop {
|
||||
if stop_runloop {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"#{blocks_processed} blocks successfully sent to processor"
|
||||
)
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
// Dequeue all the blocks available
|
||||
let mut new_blocks = vec![];
|
||||
while let Ok(Some((block_height, block, compacted_block))) =
|
||||
block_compressed_rx.try_recv()
|
||||
{
|
||||
blocks_processed += 1;
|
||||
new_blocks.push((block_height, block, compacted_block));
|
||||
if new_blocks.len() >= 10_000 {
|
||||
break;
|
||||
while let Ok(message) = block_compressed_rx.try_recv() {
|
||||
match message {
|
||||
Some((block_height, block, compacted_block)) => {
|
||||
blocks_processed += 1;
|
||||
new_blocks.push((block_height, block, compacted_block));
|
||||
// Max batch size: 10_000 blocks
|
||||
if new_blocks.len() >= 10_000 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
stop_runloop = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if blocks_processed == number_of_blocks_to_process {
|
||||
stop_runloop = true;
|
||||
}
|
||||
|
||||
// Early "continue"
|
||||
if new_blocks.is_empty() {
|
||||
sleep(Duration::from_millis(500));
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
|
||||
if !processor_started {
|
||||
processor_started = true;
|
||||
let _ = blocks_tx.send(PostProcessorCommand::Start);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,18 +208,15 @@ pub async fn download_and_pipeline_blocks(
|
||||
}
|
||||
}
|
||||
|
||||
if !ooo_compacted_blocks.is_empty() {
|
||||
if let Some(ref blocks_tx) = blocks_post_processor_pre_sequence_commands_tx {
|
||||
if !pre_seq_processor_started {
|
||||
pre_seq_processor_started = true;
|
||||
let _ = blocks_tx.send(PostProcessorCommand::Start);
|
||||
}
|
||||
|
||||
// Early "continue"
|
||||
if inbox.is_empty() {
|
||||
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
|
||||
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
|
||||
ooo_compacted_blocks,
|
||||
vec![],
|
||||
));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// In order processing: construct the longest sequence of known blocks
|
||||
@@ -199,24 +228,8 @@ pub async fn download_and_pipeline_blocks(
|
||||
inbox_cursor += 1;
|
||||
}
|
||||
|
||||
if blocks.is_empty() {
|
||||
if blocks_processed == number_of_blocks_to_process {
|
||||
cloned_ctx.try_log(|logger| {
|
||||
info!(
|
||||
logger,
|
||||
"#{blocks_processed} blocks successfully sent to processor"
|
||||
)
|
||||
});
|
||||
break;
|
||||
} else {
|
||||
sleep(Duration::from_secs(1));
|
||||
}
|
||||
} else {
|
||||
if let Some(ref blocks_tx) = blocks_post_processor_post_sequence_commands_tx {
|
||||
if !post_seq_processor_started {
|
||||
post_seq_processor_started = true;
|
||||
let _ = blocks_tx.send(PostProcessorCommand::Start);
|
||||
}
|
||||
if !blocks.is_empty() {
|
||||
if let Some(ref blocks_tx) = blocks_post_processor_commands_tx {
|
||||
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(
|
||||
compacted_blocks,
|
||||
blocks,
|
||||
@@ -248,7 +261,7 @@ pub async fn download_and_pipeline_blocks(
|
||||
}
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
debug!(
|
||||
logger,
|
||||
"Pipeline successfully fed with sequence of blocks ({} to {})", start_block, end_block
|
||||
)
|
||||
@@ -258,25 +271,32 @@ pub async fn download_and_pipeline_blocks(
|
||||
let _ = tx.send(None);
|
||||
}
|
||||
|
||||
ctx.try_log(|logger| debug!(logger, "Enqueued pipeline termination commands"));
|
||||
|
||||
for handle in thread_pool_handles.into_iter() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
|
||||
if let Some(post_processor) = blocks_post_processor_pre_sequence {
|
||||
ctx.try_log(|logger| debug!(logger, "Pipeline successfully terminated"));
|
||||
|
||||
if let Some(post_processor) = blocks_post_processor {
|
||||
if let Ok(PostProcessorEvent::Started) = post_processor.events_rx.recv() {
|
||||
ctx.try_log(|logger| debug!(logger, "Block post processing started"));
|
||||
let _ = post_processor
|
||||
.commands_tx
|
||||
.send(PostProcessorCommand::Terminate);
|
||||
}
|
||||
loop {
|
||||
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
|
||||
break;
|
||||
if let Ok(signal) = post_processor.events_rx.recv() {
|
||||
match signal {
|
||||
PostProcessorEvent::Terminated | PostProcessorEvent::Expired => break,
|
||||
PostProcessorEvent::Started => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(post_processor) = blocks_post_processor_post_sequence {
|
||||
loop {
|
||||
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
let _ = block_compressed_tx.send(None);
|
||||
|
||||
let _ = storage_thread.join();
|
||||
let _ = set.shutdown();
|
||||
|
||||
@@ -14,7 +14,7 @@ use crate::{
|
||||
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db, LazyBlock},
|
||||
};
|
||||
|
||||
pub fn start_block_ingestion_processor(
|
||||
pub fn start_block_archiving_processor(
|
||||
config: &Config,
|
||||
ctx: &Context,
|
||||
_post_processor: Option<Sender<BitcoinBlockData>>,
|
||||
@@ -31,22 +31,29 @@ pub fn start_block_ingestion_processor(
|
||||
let mut empty_cycles = 0;
|
||||
|
||||
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
|
||||
info!(ctx.expect_logger(), "Start block indexing runloop");
|
||||
let _ = events_tx.send(PostProcessorEvent::Started);
|
||||
debug!(ctx.expect_logger(), "Start block indexing runloop");
|
||||
}
|
||||
|
||||
loop {
|
||||
debug!(ctx.expect_logger(), "Tick");
|
||||
let (compacted_blocks, _) = match commands_rx.try_recv() {
|
||||
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
|
||||
(compacted_blocks, blocks)
|
||||
}
|
||||
Ok(PostProcessorCommand::Terminate) => break,
|
||||
Ok(PostProcessorCommand::Start) => continue,
|
||||
Ok(PostProcessorCommand::Terminate) => {
|
||||
debug!(ctx.expect_logger(), "Terminating block processor");
|
||||
let _ = events_tx.send(PostProcessorEvent::Terminated);
|
||||
break;
|
||||
}
|
||||
Ok(PostProcessorCommand::Start) => unreachable!(),
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => {
|
||||
empty_cycles += 1;
|
||||
|
||||
if empty_cycles == 30 {
|
||||
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
|
||||
warn!(ctx.expect_logger(), "Block processor reached expiration");
|
||||
let _ = events_tx.send(PostProcessorEvent::Expired);
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
if empty_cycles > 120 {
|
||||
@@ -19,7 +19,7 @@ use std::hash::BuildHasherDefault;
|
||||
|
||||
use crate::{
|
||||
core::{
|
||||
pipeline::processors::block_ingestion::store_compacted_blocks,
|
||||
pipeline::processors::block_archiving::store_compacted_blocks,
|
||||
protocol::{
|
||||
inscription_parsing::get_inscriptions_revealed_in_block,
|
||||
inscription_sequencing::{
|
||||
@@ -72,6 +72,7 @@ pub fn start_inscription_indexing_processor(
|
||||
let mut sequence_cursor = SequenceCursor::new(inscriptions_db_conn);
|
||||
|
||||
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
|
||||
let _ = events_tx.send(PostProcessorEvent::Started);
|
||||
info!(ctx.expect_logger(), "Start inscription indexing runloop");
|
||||
}
|
||||
|
||||
@@ -81,14 +82,19 @@ pub fn start_inscription_indexing_processor(
|
||||
empty_cycles = 0;
|
||||
(compacted_blocks, blocks)
|
||||
}
|
||||
Ok(PostProcessorCommand::Terminate) => break,
|
||||
Ok(PostProcessorCommand::Start) => continue,
|
||||
Ok(PostProcessorCommand::Terminate) => {
|
||||
debug!(ctx.expect_logger(), "Terminating block processor");
|
||||
let _ = events_tx.send(PostProcessorEvent::Terminated);
|
||||
break;
|
||||
}
|
||||
Ok(PostProcessorCommand::Start) => unreachable!(),
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => {
|
||||
empty_cycles += 1;
|
||||
if empty_cycles == 10 {
|
||||
empty_cycles = 0;
|
||||
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
|
||||
warn!(ctx.expect_logger(), "Block processor reached expiration");
|
||||
let _ = events_tx.send(PostProcessorEvent::Expired);
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
continue;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
pub mod block_ingestion;
|
||||
pub mod block_archiving;
|
||||
pub mod inscription_indexing;
|
||||
pub mod transfers_recomputing;
|
||||
|
||||
|
||||
@@ -38,7 +38,8 @@ pub fn start_transfers_recomputing_processor(
|
||||
let mut empty_cycles = 0;
|
||||
|
||||
if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
|
||||
info!(ctx.expect_logger(), "Start transfers recomputing runloop");
|
||||
let _ = events_tx.send(PostProcessorEvent::Started);
|
||||
info!(ctx.expect_logger(), "Start inscription indexing runloop");
|
||||
}
|
||||
|
||||
loop {
|
||||
@@ -47,14 +48,19 @@ pub fn start_transfers_recomputing_processor(
|
||||
empty_cycles = 0;
|
||||
blocks
|
||||
}
|
||||
Ok(PostProcessorCommand::Terminate) => break,
|
||||
Ok(PostProcessorCommand::Start) => continue,
|
||||
Ok(PostProcessorCommand::Terminate) => {
|
||||
debug!(ctx.expect_logger(), "Terminating block processor");
|
||||
let _ = events_tx.send(PostProcessorEvent::Terminated);
|
||||
break;
|
||||
}
|
||||
Ok(PostProcessorCommand::Start) => unreachable!(),
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => {
|
||||
empty_cycles += 1;
|
||||
if empty_cycles == 10 {
|
||||
empty_cycles = 0;
|
||||
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
|
||||
warn!(ctx.expect_logger(), "Block processor reached expiration");
|
||||
let _ = events_tx.send(PostProcessorEvent::Expired);
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
continue;
|
||||
|
||||
@@ -63,7 +63,7 @@ pub fn parallelize_inscription_data_computations(
|
||||
|
||||
// Nothing to do? early return
|
||||
if !has_transactions_to_process {
|
||||
return Ok(false)
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
|
||||
@@ -83,12 +83,8 @@ pub fn parallelize_inscription_data_computations(
|
||||
|
||||
let handle = hiro_system_kit::thread_named("Worker")
|
||||
.spawn(move || {
|
||||
while let Ok(Some((
|
||||
transaction_id,
|
||||
block_identifier,
|
||||
input_index,
|
||||
prioritary,
|
||||
))) = rx.recv()
|
||||
while let Ok(Some((transaction_id, block_identifier, input_index, prioritary))) =
|
||||
rx.recv()
|
||||
{
|
||||
let traversal: Result<TraversalResult, String> = compute_satoshi_number(
|
||||
&moved_hord_db_path,
|
||||
@@ -190,12 +186,8 @@ pub fn parallelize_inscription_data_computations(
|
||||
let _ = tx_thread_pool[thread_index].send(Some(w));
|
||||
} else {
|
||||
if let Some(next_block) = next_block_iter.next() {
|
||||
let (mut transactions_ids, _) = get_transactions_to_process(
|
||||
next_block,
|
||||
cache_l1,
|
||||
inscriptions_db_tx,
|
||||
ctx,
|
||||
);
|
||||
let (mut transactions_ids, _) =
|
||||
get_transactions_to_process(next_block, cache_l1, inscriptions_db_tx, ctx);
|
||||
|
||||
ctx.try_log(|logger| {
|
||||
info!(
|
||||
@@ -356,9 +348,7 @@ impl SequenceCursor {
|
||||
self.blessed = Some(inscription_number);
|
||||
inscription_number + 1
|
||||
}
|
||||
_ => {
|
||||
0
|
||||
}
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
Some(value) => value + 1,
|
||||
@@ -378,9 +368,7 @@ impl SequenceCursor {
|
||||
self.cursed = Some(inscription_number);
|
||||
inscription_number - 1
|
||||
}
|
||||
_ => {
|
||||
-1
|
||||
}
|
||||
_ => -1,
|
||||
}
|
||||
}
|
||||
Some(value) => value - 1,
|
||||
@@ -413,7 +401,7 @@ pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx(
|
||||
ctx,
|
||||
) {
|
||||
reinscriptions_data.insert(inscription_data.ordinal_number, inscription_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -667,11 +655,19 @@ pub fn consolidate_transaction_with_pre_computed_inscription_data(
|
||||
inscription.curse_type = Some(OrdinalInscriptionCurseType::Unknown);
|
||||
}
|
||||
|
||||
if traversal.transfer_data.transaction_identifier_location.eq(coinbase_txid) {
|
||||
if traversal
|
||||
.transfer_data
|
||||
.transaction_identifier_location
|
||||
.eq(coinbase_txid)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(output) = tx.metadata.outputs.get(traversal.transfer_data.output_index) {
|
||||
if let Some(output) = tx
|
||||
.metadata
|
||||
.outputs
|
||||
.get(traversal.transfer_data.output_index)
|
||||
{
|
||||
inscription.inscription_output_value = output.value;
|
||||
inscription.inscriber_address = {
|
||||
let script_pub_key = output.get_script_pubkey_hex();
|
||||
@@ -684,7 +680,6 @@ pub fn consolidate_transaction_with_pre_computed_inscription_data(
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -161,7 +161,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
|
||||
number_of_blocks_sent += 1;
|
||||
}
|
||||
actions_triggered += actions
|
||||
},
|
||||
}
|
||||
Err(_) => err_count += 1,
|
||||
}
|
||||
|
||||
|
||||
@@ -398,12 +398,16 @@ impl Service {
|
||||
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
|
||||
) -> Result<(), String> {
|
||||
// Start predicate processor
|
||||
let blocks_post_processor =
|
||||
start_inscription_indexing_processor(&self.config, &self.ctx, block_post_processor);
|
||||
|
||||
while let Some((start_block, end_block, speed)) =
|
||||
should_sync_hord_db(&self.config, &self.ctx)?
|
||||
{
|
||||
let blocks_post_processor = start_inscription_indexing_processor(
|
||||
&self.config,
|
||||
&self.ctx,
|
||||
block_post_processor.clone(),
|
||||
);
|
||||
|
||||
info!(
|
||||
self.ctx.expect_logger(),
|
||||
"Indexing inscriptions from block #{start_block} to block #{end_block}"
|
||||
@@ -416,26 +420,13 @@ impl Service {
|
||||
start_block,
|
||||
end_block,
|
||||
first_inscription_height,
|
||||
if end_block <= first_inscription_height {
|
||||
Some(&blocks_post_processor)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
if start_block >= first_inscription_height {
|
||||
Some(&blocks_post_processor)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
Some(&blocks_post_processor),
|
||||
speed,
|
||||
&self.ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let _ = blocks_post_processor
|
||||
.commands_tx
|
||||
.send(PostProcessorCommand::Terminate);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -461,17 +452,12 @@ impl Service {
|
||||
start_block,
|
||||
end_block,
|
||||
first_inscription_height,
|
||||
None,
|
||||
Some(&blocks_post_processor),
|
||||
100,
|
||||
&self.ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let _ = blocks_post_processor
|
||||
.commands_tx
|
||||
.send(PostProcessorCommand::Terminate);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user