feat: clippy and fmt support (#452)

* feat(cli): add custom commands `bitcoin-indexer-fmt` and `bitcoin-indexer-clippy` 
* chore: format and clippy the codebase
* feat(ci): add doctest ci job
* feat(ci): support format on save for rust files
This commit is contained in:
ASuciuX
2025-03-07 15:42:50 +02:00
committed by GitHub
parent 6b9531470a
commit 5fb8b02a9e
79 changed files with 1195 additions and 1077 deletions

View File

@@ -1,5 +1,8 @@
[alias]
bitcoin-indexer-install = "install --path components/cli --locked --force"
bitcoin-indexer-install = "install --path components/ordhook-cli --locked --force"
bitcoin-indexer-fmt = "fmt -- --config group_imports=StdExternalCrate,imports_granularity=Crate"
bitcoin-indexer-clippy = "clippy --tests --all-features --all-targets -- -A clippy::too_many_arguments -A clippy::needless_return -A clippy::type_complexity -A clippy::ptr_arg"
bitcoin-indexer-clippy-cli = "clippy --tests --all-features --all-targets --message-format=short -- -A clippy::too_many_arguments -A clippy::needless_return -A clippy::type_complexity -A clippy::ptr_arg -D warnings"
[env]
RUST_TEST_THREADS = "1"

View File

@@ -119,6 +119,102 @@ jobs:
run: npm run testenv:stop
if: always()
rustfmt:
name: rust-fmt
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
suite:
- cli
- chainhook-sdk
- chainhook-postgres
- ordhook-core
- runes
defaults:
run:
working-directory: ./components/${{ matrix.suite }}
steps:
- uses: actions/checkout@v4
with:
persist-credentials: false
- name: Rustfmt Job Summary
run: |
# Run cargo and store the original output
CARGO_STATUS=0
CARGO_OUTPUT=$(cargo fmt --all --manifest-path=Cargo.toml -- --config group_imports=StdExternalCrate,imports_granularity=Crate --color=always --check 2>/dev/null) || CARGO_STATUS=$?
if [ ${CARGO_STATUS} -eq 0 ]; then
cat <<MARKDOWN_INTRO >> $GITHUB_STEP_SUMMARY
# Rustfmt Results
The code is formatted perfectly!
MARKDOWN_INTRO
else
cat <<MARKDOWN_INTRO >> $GITHUB_STEP_SUMMARY
# Rustfmt Results
\`cargo fmt\` reported formatting errors in the following locations.
You can fix them by executing the following command and committing the changes.
\`\`\`bash
cargo bitcoin-indexer-fmt
\`\`\`
MARKDOWN_INTRO
echo "${CARGO_OUTPUT}" |
# Strip color codes
sed 's/\x1B\[[0-9;]*[A-Za-z]\x0f\?//g' |
# Strip (some) cursor movements
sed 's/\x1B.[A-G]//g' |
tr "\n" "\r" |
# Wrap each location into a HTML details
sed -E 's#Diff in ([^\r]*?)( at line |:)([[:digit:]]+):\r((:?[ +-][^\r]*\r)+)#<details>\n<summary>\1:\3</summary>\n\n```diff\n\4```\n\n</details>\n\n#g' |
tr "\r" "\n" >> $GITHUB_STEP_SUMMARY
fi
# Print the original cargo message
echo "${CARGO_OUTPUT}"
# Exit with the same status as cargo
exit "${CARGO_STATUS}"
clippy:
name: rust-clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
persist-credentials: false
- name: Run clippy
id: clippy
run: |
# Disable immediate exit on error
set +e
# Run clippy and capture output
cargo bitcoin-indexer-clippy-cli 2>&1 | tee /tmp/clippy_output.log
CLIPPY_EXIT_CODE=${PIPESTATUS[0]}
# Print output if clippy failed
if [ $CLIPPY_EXIT_CODE -ne 0 ]; then
echo "## ❌ Clippy Check Failed
To see and fix these issues, run:
\`\`\`bash
cargo bitcoin-indexer-clippy
\`\`\`
### Clippy Errors
\`\`\`
$(cat /tmp/clippy_output.log | grep -E '(error\:)|(warning\:)')
\`\`\`" >> $GITHUB_STEP_SUMMARY
fi
# Enable immediate exit on error again
set -e
exit $CLIPPY_EXIT_CODE
test:
strategy:
fail-fast: false
@@ -158,6 +254,10 @@ jobs:
- name: Install Rust
uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Run doc tests
run: |
cargo test --doc
- name: Run tests
run: |
cargo install --force cargo-tarpaulin
@@ -178,7 +278,7 @@ jobs:
semantic-release:
runs-on: ubuntu-latest
needs: [api-lint, api-test, test]
needs: [api-lint, api-test, test, rustfmt, clippy]
outputs:
new_release_version: ${{ steps.semantic.outputs.new_release_version }}
new_release_published: ${{ steps.semantic.outputs.new_release_published }}

9
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,9 @@
{
"editor.defaultFormatter": "rust-lang.rust-analyzer",
"editor.formatOnSave": true,
"editor.formatOnSaveMode": "file",
"rust-analyzer.rustfmt.extraArgs": [
"--config",
"group_imports=StdExternalCrate,imports_granularity=Crate"
]
}

View File

@@ -2,15 +2,18 @@
/ --- / Index Bitcoin meta-protocols like Ordinals, BRC-20, and Runes.
/ /
* [Features](#features)
* [Quick Start](#quick-start)
* [Installing](#installing)
* [Running the Indexer](#running-the-indexer)
* [Configuration](#configuration)
* [System Requirements](#system-requirements)
* [Postgres](#postgres)
* [Contribute](#contribute)
* [Community](#community)
- [Features](#features)
- [Quick Start](#quick-start)
- [Installing](#installing)
- [Running the Indexer](#running-the-indexer)
- [Running an API](#running-an-api)
- [Configuration](#configuration)
- [System Requirements](#system-requirements)
- [Postgres](#postgres)
- [Contribute](#contribute)
- [Code of Conduct](#code-of-conduct)
- [Contributing Guide](#contributing-guide)
- [Community](#community)
***

View File

@@ -37,9 +37,9 @@ pub fn pg_pool(config: &PgDatabaseConfig) -> Result<Pool, String> {
if let Some(size) = config.pool_max_size {
pool_builder = pool_builder.max_size(size);
}
Ok(pool_builder
pool_builder
.build()
.map_err(|e| format!("unable to build pg connection pool: {e}"))?)
.map_err(|e| format!("unable to build pg connection pool: {e}"))
}
/// Returns a new pg connection client taken from a pool.

View File

@@ -1,9 +1,9 @@
mod pg_bigint_u32;
mod pg_numeric_u64;
mod pg_numeric_u128;
mod pg_numeric_u64;
mod pg_smallint_u8;
pub use pg_bigint_u32::PgBigIntU32;
pub use pg_numeric_u64::PgNumericU64;
pub use pg_numeric_u128::PgNumericU128;
pub use pg_numeric_u64::PgNumericU64;
pub use pg_smallint_u8::PgSmallIntU8;

View File

@@ -57,9 +57,8 @@ impl Ord for PgBigIntU32 {
mod test {
use test_case::test_case;
use crate::pg_test_client;
use super::PgBigIntU32;
use crate::pg_test_client;
#[test_case(4294967295; "u32 max")]
#[test_case(0; "zero")]

View File

@@ -11,7 +11,7 @@ use tokio_postgres::types::{to_sql_checked, FromSql, IsNull, ToSql, Type};
/// Transforms a u128 value into postgres' `numeric` wire format.
pub fn u128_into_pg_numeric_bytes(number: u128, out: &mut BytesMut) {
let mut num = number.clone();
let mut num = number;
let mut digits = vec![];
while !num.is_zero() {
let remainder = (num % 10000).to_i16().unwrap();
@@ -136,9 +136,8 @@ impl Ord for PgNumericU128 {
mod test {
use test_case::test_case;
use crate::pg_test_client;
use super::PgNumericU128;
use crate::pg_test_client;
#[test_case(340282366920938463463374607431768211455; "u128 max")]
#[test_case(80000000000000000; "with trailing zeros")]

View File

@@ -53,9 +53,8 @@ impl Ord for PgNumericU64 {
mod test {
use test_case::test_case;
use crate::pg_test_client;
use super::PgNumericU64;
use crate::pg_test_client;
#[test_case(18446744073709551615; "u64 max")]
#[test_case(800000000000; "with trailing zeros")]

View File

@@ -39,9 +39,8 @@ impl<'a> FromSql<'a> for PgSmallIntU8 {
mod test {
use test_case::test_case;
use crate::pg_test_client;
use super::PgSmallIntU8;
use crate::pg_test_client;
#[test_case(255; "u8 max")]
#[test_case(0; "zero")]

View File

@@ -4,7 +4,7 @@ pub fn multi_row_query_param_str(rows: usize, columns: usize) -> String {
let mut arg_num = 1;
let mut arg_str = String::new();
for _ in 0..rows {
arg_str.push_str("(");
arg_str.push('(');
for i in 0..columns {
arg_str.push_str(format!("${},", arg_num + i).as_str());
}

View File

@@ -1,22 +1,22 @@
use std::time::Duration;
use crate::try_debug;
use crate::utils::Context;
use bitcoincore_rpc::bitcoin::hashes::Hash;
use bitcoincore_rpc::bitcoin::{self, Amount, BlockHash};
use bitcoincore_rpc::jsonrpc::error::RpcError;
use bitcoincore_rpc::{
bitcoin::{self, hashes::Hash, Amount, BlockHash},
jsonrpc::error::RpcError,
};
use bitcoincore_rpc_json::GetRawTransactionResultVoutScriptPubKey;
use chainhook_types::bitcoin::{OutPoint, TxIn, TxOut};
use chainhook_types::{
BitcoinBlockData, BitcoinBlockMetadata, BitcoinNetwork,
BitcoinTransactionData,BitcoinTransactionMetadata, BlockHeader, BlockIdentifier,
TransactionIdentifier,
bitcoin::{OutPoint, TxIn, TxOut},
BitcoinBlockData, BitcoinBlockMetadata, BitcoinNetwork, BitcoinTransactionData,
BitcoinTransactionMetadata, BlockHeader, BlockIdentifier, TransactionIdentifier,
};
use config::BitcoindConfig;
use hiro_system_kit::slog;
use reqwest::Client as HttpClient;
use serde::Deserialize;
use crate::{try_debug, utils::Context};
#[derive(Clone, PartialEq, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BitcoinBlockFullBreakdown {
@@ -213,7 +213,10 @@ pub async fn retrieve_block_hash(
});
let block_hash = http_client
.post(&bitcoin_config.rpc_url)
.basic_auth(&bitcoin_config.rpc_username, Some(&bitcoin_config.rpc_password))
.basic_auth(
&bitcoin_config.rpc_username,
Some(&bitcoin_config.rpc_password),
)
.header("Content-Type", "application/json")
.header("Host", &bitcoin_config.rpc_url[7..])
.json(&body)
@@ -283,7 +286,10 @@ pub async fn download_block(
});
let res = http_client
.post(&bitcoin_config.rpc_url)
.basic_auth(&bitcoin_config.rpc_username, Some(&bitcoin_config.rpc_password))
.basic_auth(
&bitcoin_config.rpc_username,
Some(&bitcoin_config.rpc_password),
)
.header("Content-Type", "application/json")
.header("Host", &bitcoin_config.rpc_url[7..])
.json(&body)
@@ -344,7 +350,12 @@ pub fn standardize_bitcoin_block(
let mut transactions = vec![];
let block_height = block.height as u64;
try_debug!(ctx, "Standardizing Bitcoin block #{} {}", block.height, block.hash);
try_debug!(
ctx,
"Standardizing Bitcoin block #{} {}",
block.height,
block.hash
);
for (tx_index, mut tx) in block.tx.into_iter().enumerate() {
let txid = tx.txid.to_string();

View File

@@ -1,14 +1,16 @@
use crate::{
indexer::{ChainSegment, ChainSegmentIncompatibility},
try_error, try_info, try_warn,
utils::Context,
};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use chainhook_types::{
BlockHeader, BlockIdentifier, BlockchainEvent, BlockchainUpdatedWithHeaders,
BlockchainUpdatedWithReorg,
};
use hiro_system_kit::slog;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use crate::{
indexer::{ChainSegment, ChainSegmentIncompatibility},
try_error, try_info, try_warn,
utils::Context,
};
pub struct ForkScratchPad {
canonical_fork_id: usize,

View File

@@ -1,15 +1,14 @@
pub mod bitcoin;
pub mod fork_scratch_pad;
use crate::utils::{AbstractBlock, Context};
use std::collections::VecDeque;
use chainhook_types::{BlockHeader, BlockIdentifier, BlockchainEvent};
use config::BitcoindConfig;
use hiro_system_kit::slog;
use std::collections::VecDeque;
use self::fork_scratch_pad::ForkScratchPad;
use crate::utils::{AbstractBlock, Context};
#[derive(Deserialize, Debug, Clone, Default)]
pub struct AssetClassCache {

View File

@@ -1,10 +1,9 @@
use crate::utils::Context;
use super::super::BlockchainEventExpectation;
use super::bitcoin_blocks;
use chainhook_types::{BitcoinBlockData, BlockchainEvent};
use hiro_system_kit::slog;
use super::{super::BlockchainEventExpectation, bitcoin_blocks};
use crate::utils::Context;
pub fn expect_no_chain_update() -> BlockchainEventExpectation {
Box::new(move |chain_event_to_check: Option<BlockchainEvent>| {
assert!(

View File

@@ -1,7 +1,8 @@
use base58::FromBase58;
use bitcoincore_rpc::bitcoin::blockdata::opcodes;
use bitcoincore_rpc::bitcoin::blockdata::script::Builder as BitcoinScriptBuilder;
use chainhook_types::{bitcoin::TxOut, BitcoinTransactionData, BitcoinTransactionMetadata, TransactionIdentifier};
use bitcoincore_rpc::bitcoin::blockdata::{opcodes, script::Builder as BitcoinScriptBuilder};
use chainhook_types::{
bitcoin::TxOut, BitcoinTransactionData, BitcoinTransactionMetadata, TransactionIdentifier,
};
pub fn generate_test_tx_bitcoin_p2pkh_transfer(
txid: u64,

View File

@@ -1,8 +1,8 @@
pub mod helpers;
use crate::utils::{AbstractBlock, Context};
use chainhook_types::{BitcoinBlockData, BlockchainEvent};
use super::fork_scratch_pad::ForkScratchPad;
use chainhook_types::{BitcoinBlockData, BlockchainEvent};
use crate::utils::{AbstractBlock, Context};
pub type BlockchainEventExpectation = Box<dyn Fn(Option<BlockchainEvent>)>;

View File

@@ -1,24 +1,27 @@
mod zmq;
use crate::indexer::bitcoin::{
build_http_client, download_and_parse_block_with_retry, standardize_bitcoin_block,
BitcoinBlockFullBreakdown,
use std::{
collections::HashMap,
error::Error,
str,
sync::mpsc::{Receiver, Sender},
};
use crate::utils::Context;
use chainhook_types::{
BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData,
BitcoinChainUpdatedWithReorgData, BitcoinNetwork, BlockIdentifier, BlockchainEvent,
};
use config::BitcoindConfig;
use hiro_system_kit;
use hiro_system_kit::slog;
use rocket::serde::Deserialize;
use rocket::Shutdown;
use std::collections::HashMap;
use std::error::Error;
use std::str;
use std::sync::mpsc::{Receiver, Sender};
use hiro_system_kit::{self, slog};
use rocket::{serde::Deserialize, Shutdown};
use crate::{
indexer::bitcoin::{
build_http_client, download_and_parse_block_with_retry, standardize_bitcoin_block,
BitcoinBlockFullBreakdown,
},
utils::Context,
};
#[derive(Deserialize)]
pub struct NewTransaction {
@@ -80,12 +83,14 @@ pub struct BitcoinBlockDataCached {
pub processed_by_sidecar: bool,
}
type BlockMutationSender =
crossbeam_channel::Sender<(Vec<BitcoinBlockDataCached>, Vec<BlockIdentifier>)>;
type BlockMutationReceiver = crossbeam_channel::Receiver<Vec<BitcoinBlockDataCached>>;
type BlockEventHandlerSender = crossbeam_channel::Sender<HandleBlock>;
pub struct ObserverSidecar {
pub bitcoin_blocks_mutator: Option<(
crossbeam_channel::Sender<(Vec<BitcoinBlockDataCached>, Vec<BlockIdentifier>)>,
crossbeam_channel::Receiver<Vec<BitcoinBlockDataCached>>,
)>,
pub bitcoin_chain_event_notifier: Option<crossbeam_channel::Sender<HandleBlock>>,
pub bitcoin_blocks_mutator: Option<(BlockMutationSender, BlockMutationReceiver)>,
pub bitcoin_chain_event_notifier: Option<BlockEventHandlerSender>,
}
impl ObserverSidecar {

View File

@@ -1,8 +1,10 @@
use std::{collections::VecDeque, sync::mpsc::Sender};
use config::BitcoindConfig;
use hiro_system_kit::slog;
use std::sync::mpsc::Sender;
use zmq::Socket;
use super::ObserverCommand;
use crate::{
indexer::{
bitcoin::{build_http_client, download_and_parse_block_with_retry},
@@ -11,9 +13,6 @@ use crate::{
try_info, try_warn,
utils::Context,
};
use std::collections::VecDeque;
use super::ObserverCommand;
fn new_zmq_socket() -> Socket {
let context = zmq::Context::new();
@@ -82,20 +81,16 @@ pub async fn start_zeromq_runloop(
block_hashes.push_front(block_hash);
while let Some(block_hash) = block_hashes.pop_front() {
let block = match download_and_parse_block_with_retry(
&http_client,
&block_hash,
&config,
ctx,
)
.await
{
Ok(block) => block,
Err(e) => {
try_warn!(ctx, "zmq: Unable to download block: {e}");
continue;
}
};
let block =
match download_and_parse_block_with_retry(&http_client, &block_hash, config, ctx)
.await
{
Ok(block) => block,
Err(e) => {
try_warn!(ctx, "zmq: Unable to download block: {e}");
continue;
}
};
let header = block.get_block_header();
try_info!(ctx, "zmq: Standardizing bitcoin block #{}", block.height);

View File

@@ -1,11 +1,10 @@
use std::{thread::sleep, time::Duration};
use crate::utils::Context;
use bitcoincore_rpc::{Auth, Client, RpcApi};
use config::BitcoindConfig;
use hiro_system_kit::slog;
use crate::{try_error, try_info};
use crate::{try_error, try_info, utils::Context};
fn bitcoind_get_client(config: &BitcoindConfig, ctx: &Context) -> Client {
loop {
@@ -49,7 +48,7 @@ pub fn bitcoind_wait_for_chain_tip(config: &BitcoindConfig, ctx: &Context) {
loop {
match bitcoin_rpc.get_blockchain_info() {
Ok(result) => {
if result.initial_block_download == false && result.blocks == result.headers {
if !result.initial_block_download && result.blocks == result.headers {
confirmations += 1;
// Wait for 10 confirmations before declaring node is at chain tip, just in case it's still connecting to
// peers.

View File

@@ -4,7 +4,7 @@ use std::{
collections::{BTreeSet, VecDeque},
fs::{self, OpenOptions},
io::{Read, Write},
path::PathBuf,
path::Path,
};
use chainhook_types::{BitcoinBlockData, BlockHeader, BlockIdentifier};
@@ -293,11 +293,10 @@ fn test_block_heights_blocks_limits_entries() {
};
}
pub fn read_file_content_at_path(file_path: &PathBuf) -> Result<Vec<u8>, String> {
use std::fs::File;
use std::io::BufReader;
pub fn read_file_content_at_path(file_path: &Path) -> Result<Vec<u8>, String> {
use std::{fs::File, io::BufReader};
let file = File::open(file_path.clone())
let file = File::open(file_path)
.map_err(|e| format!("unable to read file {}\n{:?}", file_path.display(), e))?;
let mut file_reader = BufReader::new(file);
let mut file_buffer = vec![];
@@ -307,9 +306,9 @@ pub fn read_file_content_at_path(file_path: &PathBuf) -> Result<Vec<u8>, String>
Ok(file_buffer)
}
pub fn write_file_content_at_path(file_path: &PathBuf, content: &[u8]) -> Result<(), String> {
pub fn write_file_content_at_path(file_path: &Path, content: &[u8]) -> Result<(), String> {
use std::fs::File;
let mut parent_directory = file_path.clone();
let mut parent_directory = file_path.to_path_buf();
parent_directory.pop();
fs::create_dir_all(&parent_directory).map_err(|e| {
format!(

View File

@@ -45,7 +45,7 @@ pub struct OutPoint {
impl TxOut {
pub fn get_script_pubkey_bytes(&self) -> Vec<u8> {
hex::decode(&self.get_script_pubkey_hex()).expect("not provided for coinbase txs")
hex::decode(self.get_script_pubkey_hex()).expect("not provided for coinbase txs")
}
pub fn get_script_pubkey_hex(&self) -> &str {

View File

@@ -1,8 +1,9 @@
use std::collections::BTreeMap;
use super::{BitcoinBlockData, BitcoinTransactionData};
use serde_json::Value as JsonValue;
use super::{BitcoinBlockData, BitcoinTransactionData};
pub struct ProcessedBitcoinTransaction {
pub tx: BitcoinTransactionData,
pub metadata: BTreeMap<String, JsonValue>,

View File

@@ -1,11 +1,17 @@
use crate::bitcoin::{TxIn, TxOut};
use crate::ordinals::OrdinalOperation;
use crate::Brc20Operation;
use std::{
cmp::Ordering,
fmt::Display,
hash::{Hash, Hasher},
};
use bitcoin::Network;
use schemars::JsonSchema;
use std::cmp::Ordering;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use crate::{
bitcoin::{TxIn, TxOut},
ordinals::OrdinalOperation,
Brc20Operation,
};
/// BlockIdentifier uniquely identifies a block in a particular network.
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
@@ -21,7 +27,7 @@ impl BlockIdentifier {
}
pub fn get_hash_bytes(&self) -> Vec<u8> {
hex::decode(&self.get_hash_bytes_str()).unwrap()
hex::decode(self.get_hash_bytes_str()).unwrap()
}
}
@@ -140,7 +146,7 @@ impl TransactionIdentifier {
}
pub fn get_hash_bytes(&self) -> Vec<u8> {
hex::decode(&self.get_hash_bytes_str()).unwrap()
hex::decode(self.get_hash_bytes_str()).unwrap()
}
pub fn get_8_hash_bytes(&self) -> [u8; 8] {
@@ -423,22 +429,6 @@ impl std::fmt::Display for BitcoinNetwork {
}
}
impl BitcoinNetwork {
pub fn from_str(network: &str) -> Result<BitcoinNetwork, String> {
let value = match network {
"regtest" => BitcoinNetwork::Regtest,
"testnet" => BitcoinNetwork::Testnet,
"mainnet" => BitcoinNetwork::Mainnet,
"signet" => BitcoinNetwork::Signet,
_ => {
return Err(format!(
"network '{}' unsupported (mainnet, testnet, regtest, signet)",
network
))
}
};
Ok(value)
}
pub fn as_str(&self) -> &str {
match self {
BitcoinNetwork::Regtest => "regtest",

View File

@@ -8,6 +8,9 @@ edition = "2021"
name = "bitcoin-indexer"
path = "src/main.rs"
[lib]
path = "src/lib.rs"
[dependencies]
config = { path = "../config" }
ordhook = { path = "../ordhook-core" }

View File

@@ -1,7 +1,7 @@
use std::process::Command;
fn current_git_hash() -> Option<String> {
if option_env!("GIT_COMMIT") == None {
if option_env!("GIT_COMMIT").is_none() {
let commit = Command::new("git")
.arg("log")
.arg("-1")

View File

@@ -1,5 +1,6 @@
use clap::{Parser, Subcommand};
/// Protocol command enum
#[derive(Parser, Debug)]
#[clap(name = "bitcoin-indexer", author, version, about, long_about = None)]
pub enum Protocol {

View File

@@ -1,16 +1,11 @@
use std::{path::PathBuf, process, thread::sleep, time::Duration};
use chainhook_sdk::utils::Context;
use clap::Parser;
use commands::{Command, ConfigCommand, DatabaseCommand, IndexCommand, Protocol, ServiceCommand};
use config::generator::generate_toml_config;
use config::Config;
use hiro_system_kit;
use ordhook::db::migrate_dbs;
use ordhook::service::Service;
use ordhook::try_info;
use std::path::PathBuf;
use std::thread::sleep;
use std::time::Duration;
use std::{process, u64};
use config::{generator::generate_toml_config, Config};
use hiro_system_kit::{self, error, info};
use ordhook::{db::migrate_dbs, service::Service, try_info};
mod commands;
@@ -134,12 +129,12 @@ async fn handle_command(opts: Protocol, ctx: &Context) -> Result<(), String> {
let chain_tip = runes::service::get_index_chain_tip(&config, ctx).await;
confirm_rollback(chain_tip, cmd.blocks)?;
let mut pg_client = runes::db::pg_connect(&config, false, &ctx).await;
let mut pg_client = runes::db::pg_connect(&config, false, ctx).await;
runes::scan::bitcoin::drop_blocks(
chain_tip - cmd.blocks as u64,
chain_tip,
&mut pg_client,
&ctx,
ctx,
)
.await;
}
@@ -154,8 +149,7 @@ async fn handle_command(opts: Protocol, ctx: &Context) -> Result<(), String> {
},
Protocol::Config(subcmd) => match subcmd {
ConfigCommand::New(cmd) => {
use std::fs::File;
use std::io::Write;
use std::{fs::File, io::Write};
let network = match (cmd.mainnet, cmd.testnet, cmd.regtest) {
(true, false, false) => "mainnet",
(false, true, false) => "testnet",

View File

@@ -0,0 +1,2 @@
// Re-export modules so they can be tested
pub mod cli;

View File

@@ -1,6 +1,5 @@
pub mod cli;
#[macro_use]
extern crate hiro_system_kit;
#[cfg(feature = "tcmalloc")]

View File

@@ -196,14 +196,14 @@ impl Config {
pub fn assert_ordinals_config(&self) -> Result<(), String> {
if self.ordinals.is_none() {
return Err(format!("Config entry for `ordinals` not found in config file."));
return Err("Config entry for `ordinals` not found in config file.".to_string());
}
Ok(())
}
pub fn assert_runes_config(&self) -> Result<(), String> {
if self.runes.is_none() {
return Err(format!("Config entry for `runes` not found in config file."));
return Err("Config entry for `runes` not found in config file.".to_string());
}
Ok(())
}

View File

@@ -1,8 +1,8 @@
#[macro_use]
extern crate serde_derive;
pub mod toml;
pub mod generator;
pub mod toml;
mod config;
pub use config::*;

View File

@@ -1,5 +1,7 @@
use std::fs::File;
use std::io::{BufReader, Read};
use std::{
fs::File,
io::{BufReader, Read},
};
use bitcoin::Network;
@@ -22,14 +24,14 @@ pub struct PgDatabaseConfigToml {
}
impl PgDatabaseConfigToml {
fn to_config(self) -> PgDatabaseConfig {
fn to_config(&self) -> PgDatabaseConfig {
PgDatabaseConfig {
dbname: self.database,
host: self.host,
dbname: self.database.clone(),
host: self.host.clone(),
port: self.port,
user: self.username,
password: self.password,
search_path: self.search_path,
user: self.username.clone(),
password: self.password.clone(),
search_path: self.search_path.clone(),
pool_max_size: self.pool_max_size,
}
}
@@ -153,13 +155,11 @@ impl ConfigToml {
}),
None => None,
};
let metrics = match toml.metrics {
Some(metrics) => Some(MetricsConfig {
enabled: metrics.enabled,
prometheus_port: metrics.prometheus_port,
}),
None => None,
};
let metrics = toml.metrics.map(|metrics| MetricsConfig {
enabled: metrics.enabled,
prometheus_port: metrics.prometheus_port,
});
let config = Config {
storage: StorageConfig {
working_dir: toml

View File

@@ -2,17 +2,17 @@ use super::{height::Height, sat::Sat};
#[derive(PartialEq, Debug)]
pub struct DecimalSat {
pub height: Height,
pub offset: u64,
pub height: Height,
pub offset: u64,
}
impl From<Sat> for DecimalSat {
fn from(sat: Sat) -> Self {
Self {
height: sat.height(),
offset: sat.third(),
fn from(sat: Sat) -> Self {
Self {
height: sat.height(),
offset: sat.third(),
}
}
}
}
// impl Display for DecimalSat {

View File

@@ -1,18 +1,18 @@
use {
super::{inscription::Inscription, tag::Tag},
bitcoin::{
blockdata::{
opcodes,
script::{
Instruction::{self, Op, PushBytes},
Instructions,
},
use std::{collections::BTreeMap, iter::Peekable};
use bitcoin::{
blockdata::{
opcodes,
script::{
Instruction::{self, Op, PushBytes},
Instructions,
},
script, Script, Transaction,
},
std::{collections::BTreeMap, iter::Peekable},
script, Script, Transaction,
};
use super::{inscription::Inscription, tag::Tag};
pub(crate) const PROTOCOL_ID: [u8; 3] = *b"ord";
pub(crate) const BODY_TAG: [u8; 0] = [];

View File

@@ -4,147 +4,147 @@ use super::{height::Height, sat::Sat, *};
pub struct Epoch(pub u32);
impl Epoch {
pub const STARTING_SATS: [Sat; 34] = [
Sat(0),
Sat(1050000000000000),
Sat(1575000000000000),
Sat(1837500000000000),
Sat(1968750000000000),
Sat(2034375000000000),
Sat(2067187500000000),
Sat(2083593750000000),
Sat(2091796875000000),
Sat(2095898437500000),
Sat(2097949218750000),
Sat(2098974609270000),
Sat(2099487304530000),
Sat(2099743652160000),
Sat(2099871825870000),
Sat(2099935912620000),
Sat(2099967955890000),
Sat(2099983977420000),
Sat(2099991988080000),
Sat(2099995993410000),
Sat(2099997995970000),
Sat(2099998997250000),
Sat(2099999497890000),
Sat(2099999748210000),
Sat(2099999873370000),
Sat(2099999935950000),
Sat(2099999967240000),
Sat(2099999982780000),
Sat(2099999990550000),
Sat(2099999994330000),
Sat(2099999996220000),
Sat(2099999997060000),
Sat(2099999997480000),
Sat(Sat::SUPPLY),
];
pub const FIRST_POST_SUBSIDY: Epoch = Self(33);
pub const STARTING_SATS: [Sat; 34] = [
Sat(0),
Sat(1050000000000000),
Sat(1575000000000000),
Sat(1837500000000000),
Sat(1968750000000000),
Sat(2034375000000000),
Sat(2067187500000000),
Sat(2083593750000000),
Sat(2091796875000000),
Sat(2095898437500000),
Sat(2097949218750000),
Sat(2098974609270000),
Sat(2099487304530000),
Sat(2099743652160000),
Sat(2099871825870000),
Sat(2099935912620000),
Sat(2099967955890000),
Sat(2099983977420000),
Sat(2099991988080000),
Sat(2099995993410000),
Sat(2099997995970000),
Sat(2099998997250000),
Sat(2099999497890000),
Sat(2099999748210000),
Sat(2099999873370000),
Sat(2099999935950000),
Sat(2099999967240000),
Sat(2099999982780000),
Sat(2099999990550000),
Sat(2099999994330000),
Sat(2099999996220000),
Sat(2099999997060000),
Sat(2099999997480000),
Sat(Sat::SUPPLY),
];
pub const FIRST_POST_SUBSIDY: Epoch = Self(33);
pub fn subsidy(self) -> u64 {
if self < Self::FIRST_POST_SUBSIDY {
(50 * COIN_VALUE) >> self.0
} else {
0
pub fn subsidy(self) -> u64 {
if self < Self::FIRST_POST_SUBSIDY {
(50 * COIN_VALUE) >> self.0
} else {
0
}
}
}
pub fn starting_sat(self) -> Sat {
*Self::STARTING_SATS
.get(usize::try_from(self.0).unwrap())
.unwrap_or_else(|| Self::STARTING_SATS.last().unwrap())
}
pub fn starting_sat(self) -> Sat {
*Self::STARTING_SATS
.get(usize::try_from(self.0).unwrap())
.unwrap_or_else(|| Self::STARTING_SATS.last().unwrap())
}
pub fn starting_height(self) -> Height {
Height(self.0 * SUBSIDY_HALVING_INTERVAL)
}
pub fn starting_height(self) -> Height {
Height(self.0 * SUBSIDY_HALVING_INTERVAL)
}
}
impl PartialEq<u32> for Epoch {
fn eq(&self, other: &u32) -> bool {
self.0 == *other
}
fn eq(&self, other: &u32) -> bool {
self.0 == *other
}
}
impl From<Sat> for Epoch {
fn from(sat: Sat) -> Self {
if sat < Self::STARTING_SATS[1] {
Epoch(0)
} else if sat < Self::STARTING_SATS[2] {
Epoch(1)
} else if sat < Self::STARTING_SATS[3] {
Epoch(2)
} else if sat < Self::STARTING_SATS[4] {
Epoch(3)
} else if sat < Self::STARTING_SATS[5] {
Epoch(4)
} else if sat < Self::STARTING_SATS[6] {
Epoch(5)
} else if sat < Self::STARTING_SATS[7] {
Epoch(6)
} else if sat < Self::STARTING_SATS[8] {
Epoch(7)
} else if sat < Self::STARTING_SATS[9] {
Epoch(8)
} else if sat < Self::STARTING_SATS[10] {
Epoch(9)
} else if sat < Self::STARTING_SATS[11] {
Epoch(10)
} else if sat < Self::STARTING_SATS[12] {
Epoch(11)
} else if sat < Self::STARTING_SATS[13] {
Epoch(12)
} else if sat < Self::STARTING_SATS[14] {
Epoch(13)
} else if sat < Self::STARTING_SATS[15] {
Epoch(14)
} else if sat < Self::STARTING_SATS[16] {
Epoch(15)
} else if sat < Self::STARTING_SATS[17] {
Epoch(16)
} else if sat < Self::STARTING_SATS[18] {
Epoch(17)
} else if sat < Self::STARTING_SATS[19] {
Epoch(18)
} else if sat < Self::STARTING_SATS[20] {
Epoch(19)
} else if sat < Self::STARTING_SATS[21] {
Epoch(20)
} else if sat < Self::STARTING_SATS[22] {
Epoch(21)
} else if sat < Self::STARTING_SATS[23] {
Epoch(22)
} else if sat < Self::STARTING_SATS[24] {
Epoch(23)
} else if sat < Self::STARTING_SATS[25] {
Epoch(24)
} else if sat < Self::STARTING_SATS[26] {
Epoch(25)
} else if sat < Self::STARTING_SATS[27] {
Epoch(26)
} else if sat < Self::STARTING_SATS[28] {
Epoch(27)
} else if sat < Self::STARTING_SATS[29] {
Epoch(28)
} else if sat < Self::STARTING_SATS[30] {
Epoch(29)
} else if sat < Self::STARTING_SATS[31] {
Epoch(30)
} else if sat < Self::STARTING_SATS[32] {
Epoch(31)
} else if sat < Self::STARTING_SATS[33] {
Epoch(32)
} else {
Epoch(33)
fn from(sat: Sat) -> Self {
if sat < Self::STARTING_SATS[1] {
Epoch(0)
} else if sat < Self::STARTING_SATS[2] {
Epoch(1)
} else if sat < Self::STARTING_SATS[3] {
Epoch(2)
} else if sat < Self::STARTING_SATS[4] {
Epoch(3)
} else if sat < Self::STARTING_SATS[5] {
Epoch(4)
} else if sat < Self::STARTING_SATS[6] {
Epoch(5)
} else if sat < Self::STARTING_SATS[7] {
Epoch(6)
} else if sat < Self::STARTING_SATS[8] {
Epoch(7)
} else if sat < Self::STARTING_SATS[9] {
Epoch(8)
} else if sat < Self::STARTING_SATS[10] {
Epoch(9)
} else if sat < Self::STARTING_SATS[11] {
Epoch(10)
} else if sat < Self::STARTING_SATS[12] {
Epoch(11)
} else if sat < Self::STARTING_SATS[13] {
Epoch(12)
} else if sat < Self::STARTING_SATS[14] {
Epoch(13)
} else if sat < Self::STARTING_SATS[15] {
Epoch(14)
} else if sat < Self::STARTING_SATS[16] {
Epoch(15)
} else if sat < Self::STARTING_SATS[17] {
Epoch(16)
} else if sat < Self::STARTING_SATS[18] {
Epoch(17)
} else if sat < Self::STARTING_SATS[19] {
Epoch(18)
} else if sat < Self::STARTING_SATS[20] {
Epoch(19)
} else if sat < Self::STARTING_SATS[21] {
Epoch(20)
} else if sat < Self::STARTING_SATS[22] {
Epoch(21)
} else if sat < Self::STARTING_SATS[23] {
Epoch(22)
} else if sat < Self::STARTING_SATS[24] {
Epoch(23)
} else if sat < Self::STARTING_SATS[25] {
Epoch(24)
} else if sat < Self::STARTING_SATS[26] {
Epoch(25)
} else if sat < Self::STARTING_SATS[27] {
Epoch(26)
} else if sat < Self::STARTING_SATS[28] {
Epoch(27)
} else if sat < Self::STARTING_SATS[29] {
Epoch(28)
} else if sat < Self::STARTING_SATS[30] {
Epoch(29)
} else if sat < Self::STARTING_SATS[31] {
Epoch(30)
} else if sat < Self::STARTING_SATS[32] {
Epoch(31)
} else if sat < Self::STARTING_SATS[33] {
Epoch(32)
} else {
Epoch(33)
}
}
}
}
impl From<Height> for Epoch {
fn from(height: Height) -> Self {
Self(height.0 / SUBSIDY_HALVING_INTERVAL)
}
fn from(height: Height) -> Self {
Self(height.0 / SUBSIDY_HALVING_INTERVAL)
}
}
// #[cfg(test)]

View File

@@ -1,9 +1,9 @@
use {
super::{inscription_id::InscriptionId, media::Media, tag::Tag, *},
bitcoin::{constants::MAX_SCRIPT_ELEMENT_SIZE, hashes::Hash, opcodes, script, ScriptBuf, Txid},
ciborium::Value,
std::{io::Cursor, str},
};
use std::{io::Cursor, str};
use bitcoin::{constants::MAX_SCRIPT_ELEMENT_SIZE, hashes::Hash, opcodes, script, ScriptBuf, Txid};
use ciborium::Value;
use super::{inscription_id::InscriptionId, media::Media, tag::Tag, *};
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, Eq, Default)]
pub struct Inscription {

View File

@@ -1,6 +1,7 @@
use std::str::FromStr;
use anyhow::{anyhow, Error};
use self::{ImageRendering::*, Language::*, Media::*};
#[derive(Debug, PartialEq, Copy, Clone)]
@@ -62,7 +63,7 @@ pub enum ImageRendering {
// }
// }
impl Media {
impl Media {
#[rustfmt::skip]
const TABLE: &'static [(&'static str, Media, &'static [&'static str])] = &[
("application/cbor", Unknown, &["cbor"]),
@@ -106,61 +107,61 @@ pub enum ImageRendering {
("video/webm", Video, &["webm"]),
];
// pub(crate) fn content_type_for_path(
// path: &Path,
// ) -> Result<(&'static str, BrotliEncoderMode), Error> {
// let extension = path
// .extension()
// .ok_or_else(|| anyhow!("file must have extension"))?
// .to_str()
// .ok_or_else(|| anyhow!("unrecognized extension"))?;
// pub(crate) fn content_type_for_path(
// path: &Path,
// ) -> Result<(&'static str, BrotliEncoderMode), Error> {
// let extension = path
// .extension()
// .ok_or_else(|| anyhow!("file must have extension"))?
// .to_str()
// .ok_or_else(|| anyhow!("unrecognized extension"))?;
// let extension = extension.to_lowercase();
// let extension = extension.to_lowercase();
// if extension == "mp4" {
// Media::check_mp4_codec(path)?;
// }
// if extension == "mp4" {
// Media::check_mp4_codec(path)?;
// }
// for (content_type, mode, _, extensions) in Self::TABLE {
// if extensions.contains(&extension.as_str()) {
// return Ok((*content_type, *mode));
// }
// }
// for (content_type, mode, _, extensions) in Self::TABLE {
// if extensions.contains(&extension.as_str()) {
// return Ok((*content_type, *mode));
// }
// }
// let mut extensions = Self::TABLE
// .iter()
// .flat_map(|(_, _, _, extensions)| extensions.first().cloned())
// .collect::<Vec<&str>>();
// let mut extensions = Self::TABLE
// .iter()
// .flat_map(|(_, _, _, extensions)| extensions.first().cloned())
// .collect::<Vec<&str>>();
// extensions.sort();
// extensions.sort();
// Err(anyhow!(
// "unsupported file extension `.{extension}`, supported extensions: {}",
// extensions.join(" "),
// ))
// }
// Err(anyhow!(
// "unsupported file extension `.{extension}`, supported extensions: {}",
// extensions.join(" "),
// ))
// }
// pub(crate) fn check_mp4_codec(path: &Path) -> Result<(), Error> {
// let f = File::open(path)?;
// let size = f.metadata()?.len();
// let reader = BufReader::new(f);
// pub(crate) fn check_mp4_codec(path: &Path) -> Result<(), Error> {
// let f = File::open(path)?;
// let size = f.metadata()?.len();
// let reader = BufReader::new(f);
// let mp4 = Mp4Reader::read_header(reader, size)?;
// let mp4 = Mp4Reader::read_header(reader, size)?;
// for track in mp4.tracks().values() {
// if let TrackType::Video = track.track_type()? {
// let media_type = track.media_type()?;
// if media_type != MediaType::H264 {
// return Err(anyhow!(
// "Unsupported video codec, only H.264 is supported in MP4: {media_type}"
// ));
// }
// }
// }
// for track in mp4.tracks().values() {
// if let TrackType::Video = track.track_type()? {
// let media_type = track.media_type()?;
// if media_type != MediaType::H264 {
// return Err(anyhow!(
// "Unsupported video codec, only H.264 is supported in MP4: {media_type}"
// ));
// }
// }
// }
// Ok(())
// }
}
// Ok(())
// }
}
impl FromStr for Media {
type Err = Error;

View File

@@ -1,119 +1,120 @@
use std::{fmt::{self, Display, Formatter}, str::FromStr};
use std::{
fmt::{self, Display, Formatter},
str::FromStr,
};
use super::{degree::Degree, sat::Sat, *};
#[derive(
Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd
)]
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd)]
pub enum Rarity {
Common,
Uncommon,
Rare,
Epic,
Legendary,
Mythic,
Common,
Uncommon,
Rare,
Epic,
Legendary,
Mythic,
}
impl Rarity {
pub const ALL: [Rarity; 6] = [
Rarity::Common,
Rarity::Uncommon,
Rarity::Rare,
Rarity::Epic,
Rarity::Legendary,
Rarity::Mythic,
];
pub const ALL: [Rarity; 6] = [
Rarity::Common,
Rarity::Uncommon,
Rarity::Rare,
Rarity::Epic,
Rarity::Legendary,
Rarity::Mythic,
];
pub fn supply(self) -> u64 {
match self {
Self::Common => 2_099_999_990_760_000,
Self::Uncommon => 6_926_535,
Self::Rare => 3_432,
Self::Epic => 27,
Self::Legendary => 5,
Self::Mythic => 1,
pub fn supply(self) -> u64 {
match self {
Self::Common => 2_099_999_990_760_000,
Self::Uncommon => 6_926_535,
Self::Rare => 3_432,
Self::Epic => 27,
Self::Legendary => 5,
Self::Mythic => 1,
}
}
}
}
impl From<Rarity> for u8 {
fn from(rarity: Rarity) -> Self {
rarity as u8
}
fn from(rarity: Rarity) -> Self {
rarity as u8
}
}
impl TryFrom<u8> for Rarity {
type Error = u8;
type Error = u8;
fn try_from(rarity: u8) -> Result<Self, u8> {
match rarity {
0 => Ok(Self::Common),
1 => Ok(Self::Uncommon),
2 => Ok(Self::Rare),
3 => Ok(Self::Epic),
4 => Ok(Self::Legendary),
5 => Ok(Self::Mythic),
n => Err(n),
fn try_from(rarity: u8) -> Result<Self, u8> {
match rarity {
0 => Ok(Self::Common),
1 => Ok(Self::Uncommon),
2 => Ok(Self::Rare),
3 => Ok(Self::Epic),
4 => Ok(Self::Legendary),
5 => Ok(Self::Mythic),
n => Err(n),
}
}
}
}
impl Display for Rarity {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"{}",
match self {
Self::Common => "common",
Self::Uncommon => "uncommon",
Self::Rare => "rare",
Self::Epic => "epic",
Self::Legendary => "legendary",
Self::Mythic => "mythic",
}
)
}
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"{}",
match self {
Self::Common => "common",
Self::Uncommon => "uncommon",
Self::Rare => "rare",
Self::Epic => "epic",
Self::Legendary => "legendary",
Self::Mythic => "mythic",
}
)
}
}
impl From<Sat> for Rarity {
fn from(sat: Sat) -> Self {
let Degree {
hour,
minute,
second,
third,
} = sat.degree();
fn from(sat: Sat) -> Self {
let Degree {
hour,
minute,
second,
third,
} = sat.degree();
if hour == 0 && minute == 0 && second == 0 && third == 0 {
Self::Mythic
} else if minute == 0 && second == 0 && third == 0 {
Self::Legendary
} else if minute == 0 && third == 0 {
Self::Epic
} else if second == 0 && third == 0 {
Self::Rare
} else if third == 0 {
Self::Uncommon
} else {
Self::Common
if hour == 0 && minute == 0 && second == 0 && third == 0 {
Self::Mythic
} else if minute == 0 && second == 0 && third == 0 {
Self::Legendary
} else if minute == 0 && third == 0 {
Self::Epic
} else if second == 0 && third == 0 {
Self::Rare
} else if third == 0 {
Self::Uncommon
} else {
Self::Common
}
}
}
}
impl FromStr for Rarity {
type Err = String;
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"common" => Ok(Self::Common),
"uncommon" => Ok(Self::Uncommon),
"rare" => Ok(Self::Rare),
"epic" => Ok(Self::Epic),
"legendary" => Ok(Self::Legendary),
"mythic" => Ok(Self::Mythic),
_ => Err(format!("invalid rarity `{s}`")),
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"common" => Ok(Self::Common),
"uncommon" => Ok(Self::Uncommon),
"rare" => Ok(Self::Rare),
"epic" => Ok(Self::Epic),
"legendary" => Ok(Self::Legendary),
"mythic" => Ok(Self::Mythic),
_ => Err(format!("invalid rarity `{s}`")),
}
}
}
}
// #[cfg(test)]

View File

@@ -5,98 +5,100 @@ use bitcoin::{constants::MAX_SCRIPT_ELEMENT_SIZE, script};
#[derive(Copy, Clone)]
#[repr(u8)]
pub(crate) enum Tag {
Pointer = 2,
#[allow(unused)]
Unbound = 66,
Pointer = 2,
#[allow(unused)]
Unbound = 66,
ContentType = 1,
Parent = 3,
Metadata = 5,
Metaprotocol = 7,
ContentEncoding = 9,
Delegate = 11,
Rune = 13,
#[allow(unused)]
Note = 15,
#[allow(unused)]
Nop = 255,
ContentType = 1,
Parent = 3,
Metadata = 5,
Metaprotocol = 7,
ContentEncoding = 9,
Delegate = 11,
Rune = 13,
#[allow(unused)]
Note = 15,
#[allow(unused)]
Nop = 255,
}
impl Tag {
fn chunked(self) -> bool {
matches!(self, Self::Metadata)
}
fn chunked(self) -> bool {
matches!(self, Self::Metadata)
}
pub(crate) fn bytes(self) -> [u8; 1] {
[self as u8]
}
pub(crate) fn bytes(self) -> [u8; 1] {
[self as u8]
}
pub(crate) fn append(self, builder: &mut script::Builder, value: &Option<Vec<u8>>) {
if let Some(value) = value {
let mut tmp = script::Builder::new();
mem::swap(&mut tmp, builder);
pub(crate) fn append(self, builder: &mut script::Builder, value: &Option<Vec<u8>>) {
if let Some(value) = value {
let mut tmp = script::Builder::new();
mem::swap(&mut tmp, builder);
if self.chunked() {
for chunk in value.chunks(MAX_SCRIPT_ELEMENT_SIZE) {
tmp = tmp
.push_slice::<&script::PushBytes>(self.bytes().as_slice().try_into().unwrap())
.push_slice::<&script::PushBytes>(chunk.try_into().unwrap());
if self.chunked() {
for chunk in value.chunks(MAX_SCRIPT_ELEMENT_SIZE) {
tmp = tmp
.push_slice::<&script::PushBytes>(
self.bytes().as_slice().try_into().unwrap(),
)
.push_slice::<&script::PushBytes>(chunk.try_into().unwrap());
}
} else {
tmp = tmp
.push_slice::<&script::PushBytes>(self.bytes().as_slice().try_into().unwrap())
.push_slice::<&script::PushBytes>(value.as_slice().try_into().unwrap());
}
mem::swap(&mut tmp, builder);
}
} else {
tmp = tmp
.push_slice::<&script::PushBytes>(self.bytes().as_slice().try_into().unwrap())
.push_slice::<&script::PushBytes>(value.as_slice().try_into().unwrap());
}
mem::swap(&mut tmp, builder);
}
}
pub(crate) fn append_array(self, builder: &mut script::Builder, values: &Vec<Vec<u8>>) {
let mut tmp = script::Builder::new();
mem::swap(&mut tmp, builder);
for value in values {
tmp = tmp
.push_slice::<&script::PushBytes>(self.bytes().as_slice().try_into().unwrap())
.push_slice::<&script::PushBytes>(value.as_slice().try_into().unwrap());
}
mem::swap(&mut tmp, builder);
}
pub(crate) fn append_array(self, builder: &mut script::Builder, values: &Vec<Vec<u8>>) {
let mut tmp = script::Builder::new();
mem::swap(&mut tmp, builder);
pub(crate) fn take(self, fields: &mut BTreeMap<&[u8], Vec<&[u8]>>) -> Option<Vec<u8>> {
if self.chunked() {
let value = fields.remove(self.bytes().as_slice())?;
if value.is_empty() {
None
} else {
Some(value.into_iter().flatten().cloned().collect())
}
} else {
let values = fields.get_mut(self.bytes().as_slice())?;
if values.is_empty() {
None
} else {
let value = values.remove(0).to_vec();
if values.is_empty() {
fields.remove(self.bytes().as_slice());
for value in values {
tmp = tmp
.push_slice::<&script::PushBytes>(self.bytes().as_slice().try_into().unwrap())
.push_slice::<&script::PushBytes>(value.as_slice().try_into().unwrap());
}
Some(value)
}
mem::swap(&mut tmp, builder);
}
}
pub(crate) fn take_array(self, fields: &mut BTreeMap<&[u8], Vec<&[u8]>>) -> Vec<Vec<u8>> {
fields
.remove(self.bytes().as_slice())
.unwrap_or_default()
.into_iter()
.map(|v| v.to_vec())
.collect()
}
pub(crate) fn take(self, fields: &mut BTreeMap<&[u8], Vec<&[u8]>>) -> Option<Vec<u8>> {
if self.chunked() {
let value = fields.remove(self.bytes().as_slice())?;
if value.is_empty() {
None
} else {
Some(value.into_iter().flatten().cloned().collect())
}
} else {
let values = fields.get_mut(self.bytes().as_slice())?;
if values.is_empty() {
None
} else {
let value = values.remove(0).to_vec();
if values.is_empty() {
fields.remove(self.bytes().as_slice());
}
Some(value)
}
}
}
pub(crate) fn take_array(self, fields: &mut BTreeMap<&[u8], Vec<&[u8]>>) -> Vec<Vec<u8>> {
fields
.remove(self.bytes().as_slice())
.unwrap_or_default()
.into_iter()
.map(|v| v.to_vec())
.collect()
}
}

View File

@@ -107,7 +107,7 @@ pub async fn get_unsent_token_transfers<T: GenericClient>(
)
.await
.map_err(|e| format!("get_unsent_token_transfers: {e}"))?;
results.extend(rows.iter().map(|row| DbOperation::from_pg_row(row)));
results.extend(rows.iter().map(DbOperation::from_pg_row));
}
Ok(results)
}
@@ -116,7 +116,7 @@ pub async fn insert_tokens<T: GenericClient>(
tokens: &Vec<DbToken>,
client: &T,
) -> Result<(), String> {
if tokens.len() == 0 {
if tokens.is_empty() {
return Ok(());
}
for chunk in tokens.chunks(BATCH_QUERY_CHUNK_SIZE) {
@@ -158,7 +158,7 @@ pub async fn insert_operations<T: GenericClient>(
operations: &Vec<DbOperation>,
client: &T,
) -> Result<(), String> {
if operations.len() == 0 {
if operations.is_empty() {
return Ok(());
}
for chunk in operations.chunks(BATCH_QUERY_CHUNK_SIZE) {
@@ -248,7 +248,7 @@ pub async fn update_operation_counts<T: GenericClient>(
counts: &HashMap<String, i32>,
client: &T,
) -> Result<(), String> {
if counts.len() == 0 {
if counts.is_empty() {
return Ok(());
}
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
@@ -274,7 +274,7 @@ pub async fn update_address_operation_counts<T: GenericClient>(
counts: &HashMap<String, HashMap<String, i32>>,
client: &T,
) -> Result<(), String> {
if counts.len() == 0 {
if counts.is_empty() {
return Ok(());
}
for chunk in counts
@@ -312,7 +312,7 @@ pub async fn update_token_operation_counts<T: GenericClient>(
counts: &HashMap<String, i32>,
client: &T,
) -> Result<(), String> {
if counts.len() == 0 {
if counts.is_empty() {
return Ok(());
}
for chunk in counts
@@ -353,7 +353,7 @@ pub async fn update_token_minted_supplies<T: GenericClient>(
supplies: &HashMap<String, PgNumericU128>,
client: &T,
) -> Result<(), String> {
if supplies.len() == 0 {
if supplies.is_empty() {
return Ok(());
}
for chunk in supplies

View File

@@ -13,19 +13,16 @@ use deadpool_postgres::GenericClient;
use lru::LruCache;
use maplit::hashmap;
use crate::core::protocol::satoshi_tracking::parse_output_and_offset_from_satpoint;
use super::{
brc20_pg,
models::{DbOperation, DbToken},
verifier::{VerifiedBrc20BalanceData, VerifiedBrc20TokenDeployData, VerifiedBrc20TransferData},
};
use crate::core::protocol::satoshi_tracking::parse_output_and_offset_from_satpoint;
/// If the given `config` has BRC-20 enabled, returns a BRC-20 memory cache.
pub fn brc20_new_cache(config: &Config) -> Option<Brc20MemoryCache> {
let Some(brc20) = config.ordinals_brc20_config() else {
return None;
};
let brc20 = config.ordinals_brc20_config()?;
if !brc20.enabled {
return None;
}
@@ -117,7 +114,7 @@ impl Brc20MemoryCache {
client: &T,
) -> Result<Option<u128>, String> {
if let Some(minted) = self.token_minted_supplies.get(tick) {
return Ok(Some(minted.clone()));
return Ok(Some(*minted));
}
self.handle_cache_miss(client).await?;
if let Some(minted_supply) = brc20_pg::get_token_minted_supply(tick, client).await? {
@@ -136,7 +133,7 @@ impl Brc20MemoryCache {
) -> Result<Option<u128>, String> {
let key = format!("{}:{}", tick, address);
if let Some(balance) = self.token_addr_avail_balances.get(&key) {
return Ok(Some(balance.clone()));
return Ok(Some(*balance));
}
self.handle_cache_miss(client).await?;
if let Some(balance) =
@@ -157,7 +154,7 @@ impl Brc20MemoryCache {
let mut cache_missed_ordinal_numbers = HashSet::new();
for ordinal_number in ordinal_numbers.iter() {
// Use `get` instead of `contains` so we promote this value in the LRU.
if let Some(_) = self.ignored_inscriptions.get(*ordinal_number) {
if self.ignored_inscriptions.get(*ordinal_number).is_some() {
continue;
}
if let Some(row) = self.unsent_transfers.get(*ordinal_number) {
@@ -493,6 +490,7 @@ mod test {
use chainhook_types::{BitcoinNetwork, BlockIdentifier, TransactionIdentifier};
use test_case::test_case;
use super::Brc20MemoryCache;
use crate::{
core::meta_protocols::brc20::{
brc20_pg,
@@ -506,8 +504,6 @@ mod test {
db::{pg_reset_db, pg_test_connection, pg_test_connection_pool},
};
use super::Brc20MemoryCache;
#[tokio::test]
async fn test_brc20_memory_cache_transfer_miss() -> Result<(), String> {
let ctx = get_test_ctx();

View File

@@ -7,14 +7,13 @@ use chainhook_types::{
};
use deadpool_postgres::Transaction;
use crate::{core::meta_protocols::brc20::u128_amount_to_decimals_str, try_info};
use super::{
brc20_activation_height,
cache::Brc20MemoryCache,
parser::ParsedBrc20Operation,
verifier::{verify_brc20_operation, verify_brc20_transfers, VerifiedBrc20Operation},
};
use crate::{core::meta_protocols::brc20::u128_amount_to_decimals_str, try_info};
/// Index ordinal transfers in a single Bitcoin block looking for BRC-20 transfers.
async fn index_unverified_brc20_transfers(
@@ -30,7 +29,7 @@ async fn index_unverified_brc20_transfers(
}
let mut results = vec![];
let mut verified_brc20_transfers =
verify_brc20_transfers(transfers, brc20_cache, &brc20_db_tx, &ctx).await?;
verify_brc20_transfers(transfers, brc20_cache, brc20_db_tx, ctx).await?;
// Sort verified transfers by tx_index to make sure they are applied in the order they came through.
verified_brc20_transfers.sort_by(|a, b| a.2.tx_index.cmp(&b.2.tx_index));
@@ -119,8 +118,8 @@ pub async fn index_block_and_insert_brc20_operations(
&block.block_identifier,
&block.metadata.network,
brc20_cache,
&brc20_db_tx,
&ctx,
brc20_db_tx,
ctx,
)
.await?
else {

View File

@@ -31,7 +31,7 @@ pub fn brc20_self_mint_activation_height(network: &BitcoinNetwork) -> u64 {
pub fn decimals_str_amount_to_u128(amt: &String, decimals: u8) -> Result<u128, String> {
let parts: Vec<&str> = amt.split('.').collect();
let first = parts
.get(0)
.first()
.ok_or("decimals_str_amount_to_u128: first part not found")?;
let integer = (*first)
.parse::<u128>()
@@ -40,7 +40,7 @@ pub fn decimals_str_amount_to_u128(amt: &String, decimals: u8) -> Result<u128, S
let mut fractional = 0u128;
if let Some(second) = parts.get(1) {
let mut padded = String::with_capacity(decimals as usize);
padded.push_str(*second);
padded.push_str(second);
padded.push_str(&"0".repeat(decimals as usize - (*second).len()));
fractional = padded
.parse::<u128>()
@@ -58,7 +58,7 @@ pub fn u128_amount_to_decimals_str(amount: u128, decimals: u8) -> String {
}
let decimal_point = num_str.len() as i32 - decimals as i32;
if decimal_point < 0 {
let padding = "0".repeat(decimal_point.abs() as usize);
let padding = "0".repeat(decimal_point.unsigned_abs() as usize);
format!("0.{padding}{num_str}")
} else {
let (integer, fractional) = num_str.split_at(decimal_point as usize);

View File

@@ -1,5 +1,7 @@
use ord::inscription::Inscription;
use ord::media::{Language, Media};
use ord::{
inscription::Inscription,
media::{Language, Media},
};
#[derive(PartialEq, Debug, Clone)]
pub struct ParsedBrc20TokenDeployData {
@@ -203,12 +205,13 @@ pub fn parse_brc20_operation(
#[cfg(test)]
mod test {
use ord::inscription::Inscription;
use test_case::test_case;
use super::{parse_brc20_operation, ParsedBrc20Operation};
use crate::core::meta_protocols::brc20::parser::{
ParsedBrc20BalanceData, ParsedBrc20TokenDeployData,
};
use ord::inscription::Inscription;
use test_case::test_case;
struct InscriptionBuilder {
body: Option<Vec<u8>>,

View File

@@ -21,6 +21,12 @@ pub struct Brc20RevealBuilder {
pub parents: Vec<String>,
}
impl Default for Brc20RevealBuilder {
fn default() -> Self {
Self::new()
}
}
impl Brc20RevealBuilder {
pub fn new() -> Self {
Brc20RevealBuilder {
@@ -101,6 +107,12 @@ pub struct Brc20TransferBuilder {
pub tx_index: usize,
}
impl Default for Brc20TransferBuilder {
fn default() -> Self {
Self::new()
}
}
impl Brc20TransferBuilder {
pub fn new() -> Self {
Brc20TransferBuilder {

View File

@@ -1,18 +1,20 @@
use std::collections::HashMap;
use chainhook_sdk::utils::Context;
use chainhook_types::{
BitcoinNetwork, BlockIdentifier, OrdinalInscriptionRevealData, OrdinalInscriptionTransferData,
OrdinalInscriptionTransferDestination, TransactionIdentifier,
};
use chainhook_sdk::utils::Context;
use deadpool_postgres::Transaction;
use super::{
brc20_self_mint_activation_height,
cache::Brc20MemoryCache,
decimals_str_amount_to_u128,
parser::{amt_has_valid_decimals, ParsedBrc20Operation},
};
use crate::try_debug;
use super::cache::Brc20MemoryCache;
use super::parser::{amt_has_valid_decimals, ParsedBrc20Operation};
use super::{brc20_self_mint_activation_height, decimals_str_amount_to_u128};
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct VerifiedBrc20TokenDeployData {
pub tick: String,
@@ -106,7 +108,7 @@ pub async fn verify_brc20_operation(
return Ok(None);
};
if data.tick.len() == 5 {
if reveal.parents.len() == 0 {
if reveal.parents.is_empty() {
try_debug!(
ctx,
"BRC-20: Attempting to mint self-minted token {} without a parent ref",
@@ -185,7 +187,7 @@ pub async fn verify_brc20_operation(
return Ok(None);
}
let Some(avail_balance) = cache
.get_token_address_avail_balance(&token.ticker, &inscriber_address, db_tx)
.get_token_address_avail_balance(&token.ticker, inscriber_address, db_tx)
.await?
else {
try_debug!(
@@ -291,7 +293,7 @@ pub async fn verify_brc20_transfers(
(*tx_identifier).clone(),
));
}
return Ok(results);
Ok(results)
}
#[cfg(test)]
@@ -304,6 +306,7 @@ mod test {
};
use test_case::test_case;
use super::{verify_brc20_operation, verify_brc20_transfers, VerifiedBrc20TransferData};
use crate::{
core::meta_protocols::brc20::{
brc20_pg,
@@ -317,8 +320,6 @@ mod test {
db::{pg_reset_db, pg_test_connection, pg_test_connection_pool},
};
use super::{verify_brc20_operation, verify_brc20_transfers, VerifiedBrc20TransferData};
#[test_case(
ParsedBrc20Operation::Deploy(ParsedBrc20TokenDeployData {
tick: "pepe".to_string(),

View File

@@ -4,15 +4,14 @@ pub mod protocol;
#[cfg(test)]
pub mod test_builders;
use std::{hash::BuildHasherDefault, ops::Div};
use bitcoin::Network;
use chainhook_postgres::pg_pool_client;
use chainhook_sdk::utils::{bitcoind::bitcoind_get_block_height, Context};
use config::Config;
use dashmap::DashMap;
use fxhash::{FxBuildHasher, FxHasher};
use std::hash::BuildHasherDefault;
use std::ops::Div;
use chainhook_sdk::utils::Context;
use crate::{
db::{
@@ -25,7 +24,6 @@ use crate::{
},
service::PgConnectionPools,
};
use chainhook_sdk::utils::bitcoind::bitcoind_get_block_height;
pub fn first_inscription_height(config: &Config) -> u64 {
match config.bitcoind.network {
@@ -62,7 +60,7 @@ pub enum SatPosition {
Fee(u64),
}
pub fn resolve_absolute_pointer(inputs: &Vec<u64>, absolute_pointer_value: u64) -> (usize, u64) {
pub fn resolve_absolute_pointer(inputs: &[u64], absolute_pointer_value: u64) -> (usize, u64) {
let mut selected_index = 0;
let mut cumulated_input_value = 0;
// Check for overflow
@@ -84,8 +82,8 @@ pub fn resolve_absolute_pointer(inputs: &Vec<u64>, absolute_pointer_value: u64)
pub fn compute_next_satpoint_data(
input_index: usize,
inputs: &Vec<u64>,
outputs: &Vec<u64>,
inputs: &[u64],
outputs: &[u64],
relative_pointer_value: u64,
_ctx: Option<&Context>,
) -> SatPosition {
@@ -125,13 +123,11 @@ pub async fn should_sync_rocks_db(
pg_pools: &PgConnectionPools,
ctx: &Context,
) -> Result<Option<(u64, u64)>, String> {
let blocks_db = open_blocks_db_with_retry(true, &config, &ctx);
let blocks_db = open_blocks_db_with_retry(true, config, ctx);
let last_compressed_block = find_last_block_inserted(&blocks_db) as u64;
let ord_client = pg_pool_client(&pg_pools.ordinals).await?;
let last_indexed_block = match ordinals_pg::get_chain_tip_block_height(&ord_client).await? {
Some(last_indexed_block) => last_indexed_block,
None => 0,
};
let last_indexed_block =
(ordinals_pg::get_chain_tip_block_height(&ord_client).await?).unwrap_or_default();
let res = if last_compressed_block < last_indexed_block {
Some((last_compressed_block, last_indexed_block))
@@ -146,13 +142,13 @@ pub async fn should_sync_ordinals_db(
pg_pools: &PgConnectionPools,
ctx: &Context,
) -> Result<Option<(u64, u64, usize)>, String> {
let blocks_db = open_blocks_db_with_retry(true, &config, &ctx);
let blocks_db = open_blocks_db_with_retry(true, config, ctx);
let mut start_block = find_last_block_inserted(&blocks_db) as u64;
let ord_client = pg_pool_client(&pg_pools.ordinals).await?;
match ordinals_pg::get_chain_tip_block_height(&ord_client).await? {
Some(height) => {
if find_pinned_block_bytes_at_block_height(height as u32, 3, &blocks_db, &ctx).is_none()
if find_pinned_block_bytes_at_block_height(height as u32, 3, &blocks_db, ctx).is_none()
{
start_block = start_block.min(height);
} else {

View File

@@ -1,21 +1,24 @@
pub mod processors;
use chainhook_sdk::utils::Context;
use std::{
collections::{HashMap, VecDeque},
thread::{sleep, JoinHandle},
time::Duration,
};
use chainhook_sdk::{
indexer::bitcoin::{
build_http_client, parse_downloaded_block, standardize_bitcoin_block,
try_download_block_bytes_with_retry,
},
utils::Context,
};
use chainhook_types::{BitcoinBlockData, BitcoinNetwork};
use config::Config;
use crossbeam_channel::bounded;
use std::collections::{HashMap, VecDeque};
use std::thread::{sleep, JoinHandle};
use std::time::Duration;
use tokio::task::JoinSet;
use crate::db::cursor::BlockBytesCursor;
use crate::{try_debug, try_info};
use chainhook_sdk::indexer::bitcoin::{
build_http_client, parse_downloaded_block, standardize_bitcoin_block,
try_download_block_bytes_with_retry,
};
use crate::{db::cursor::BlockBytesCursor, try_debug, try_info};
pub enum PostProcessorCommand {
ProcessBlocks(Vec<(u64, Vec<u8>)>, Vec<BitcoinBlockData>),
@@ -90,7 +93,7 @@ pub async fn bitcoind_download_blocks(
}
let moved_ctx: Context = ctx.clone();
let moved_bitcoin_network = config.bitcoind.network.clone();
let moved_bitcoin_network = config.bitcoind.network;
let mut tx_thread_pool = vec![];
let mut rx_thread_pool = vec![];
@@ -105,7 +108,7 @@ pub async fn bitcoind_download_blocks(
for (thread_index, rx) in rx_thread_pool.into_iter().enumerate() {
let block_compressed_tx_moved = block_compressed_tx.clone();
let moved_ctx: Context = moved_ctx.clone();
let moved_bitcoin_network = moved_bitcoin_network.clone();
let moved_bitcoin_network_inner = moved_bitcoin_network;
let handle = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
@@ -118,7 +121,7 @@ pub async fn bitcoind_download_blocks(
let block_data = if block_height >= start_sequencing_blocks_at_height {
let block = standardize_bitcoin_block(
raw_block_data,
&BitcoinNetwork::from_network(moved_bitcoin_network),
&BitcoinNetwork::from_network(moved_bitcoin_network_inner),
&moved_ctx,
)
.expect("unable to deserialize block");
@@ -227,7 +230,6 @@ pub async fn bitcoind_download_blocks(
stop_runloop = true;
}
}
()
})
.expect("unable to spawn thread");
@@ -290,7 +292,7 @@ pub async fn bitcoind_download_blocks(
let _ = block_compressed_tx.send(None);
let _ = storage_thread.join();
let _ = set.shutdown();
set.shutdown().await;
try_info!(
ctx,

View File

@@ -1,12 +1,13 @@
use std::{
thread::{sleep, JoinHandle},
time::Duration,
};
use chainhook_sdk::utils::Context;
use chainhook_types::BitcoinBlockData;
use config::Config;
use crossbeam_channel::{Sender, TryRecvError};
use rocksdb::DB;
use std::{
thread::{sleep, JoinHandle},
time::Duration,
};
use crate::{
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
@@ -83,8 +84,8 @@ pub fn store_compacted_blocks(
block_height as u32,
&compacted_block,
update_tip,
&blocks_db_rw,
&ctx,
blocks_db_rw,
ctx,
);
try_info!(ctx, "Block #{block_height} saved to disk");
}

View File

@@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, HashMap},
hash::BuildHasherDefault,
sync::Arc,
thread::{sleep, JoinHandle},
time::Duration,
@@ -10,10 +11,8 @@ use chainhook_sdk::utils::Context;
use chainhook_types::{BitcoinBlockData, TransactionIdentifier};
use config::Config;
use crossbeam_channel::TryRecvError;
use dashmap::DashMap;
use fxhash::FxHasher;
use std::hash::BuildHasherDefault;
use crate::{
core::{
@@ -22,7 +21,11 @@ use crate::{
cache::{brc20_new_cache, Brc20MemoryCache},
index::index_block_and_insert_brc20_operations,
},
pipeline::processors::block_archiving::store_compacted_blocks,
new_traversals_lazy_cache,
pipeline::{
processors::block_archiving::store_compacted_blocks, PostProcessorCommand,
PostProcessorController, PostProcessorEvent,
},
protocol::{
inscription_parsing::parse_inscriptions_in_standardized_block,
inscription_sequencing::{
@@ -41,11 +44,6 @@ use crate::{
utils::monitoring::PrometheusMonitoring,
};
use crate::core::{
new_traversals_lazy_cache,
pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
};
pub fn start_inscription_indexing_processor(
config: &Config,
pg_pools: &PgConnectionPools,
@@ -166,7 +164,7 @@ async fn process_blocks(
index_block(
&mut block,
&next_blocks,
next_blocks,
sequence_cursor,
&mut cache_l1,
cache_l2,
@@ -212,11 +210,11 @@ pub async fn index_block(
// Parsed BRC20 ops will be deposited here for this block.
let mut brc20_operation_map = HashMap::new();
parse_inscriptions_in_standardized_block(block, &mut brc20_operation_map, config, &ctx);
parse_inscriptions_in_standardized_block(block, &mut brc20_operation_map, config, ctx);
let has_inscription_reveals = parallelize_inscription_data_computations(
&block,
&next_blocks,
block,
next_blocks,
cache_l1,
cache_l2,
config,
@@ -247,7 +245,7 @@ pub async fn index_block(
&mut brc20_operation_map,
brc20_cache,
&brc20_tx,
&ctx,
ctx,
)
.await?;

View File

@@ -1,5 +1,6 @@
use bitcoin::hash_types::Txid;
use bitcoin::Witness;
use std::{collections::HashMap, str, str::FromStr};
use bitcoin::{hash_types::Txid, Witness};
use chainhook_sdk::utils::Context;
use chainhook_types::{
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier,
@@ -7,17 +8,20 @@ use chainhook_types::{
OrdinalOperation,
};
use config::Config;
use ord::{
envelope::{Envelope, ParsedEnvelope},
inscription::Inscription,
inscription_id::InscriptionId,
};
use serde_json::json;
use std::collections::HashMap;
use std::str::FromStr;
use crate::core::meta_protocols::brc20::brc20_activation_height;
use crate::core::meta_protocols::brc20::parser::{parse_brc20_operation, ParsedBrc20Operation};
use crate::try_warn;
use ord::envelope::{Envelope, ParsedEnvelope};
use ord::inscription::Inscription;
use ord::inscription_id::InscriptionId;
use std::str;
use crate::{
core::meta_protocols::brc20::{
brc20_activation_height,
parser::{parse_brc20_operation, ParsedBrc20Operation},
},
try_warn,
};
pub fn parse_inscriptions_from_witness(
input_index: usize,
@@ -29,7 +33,7 @@ pub fn parse_inscriptions_from_witness(
let envelopes: Vec<Envelope<Inscription>> = Envelope::from_tapscript(tapscript, input_index)
.ok()?
.into_iter()
.map(|e| ParsedEnvelope::from(e))
.map(ParsedEnvelope::from)
.collect();
let mut inscriptions = vec![];
for envelope in envelopes.into_iter() {
@@ -59,9 +63,9 @@ pub fn parse_inscriptions_from_witness(
};
let no_content_bytes = vec![];
let inscription_content_bytes = envelope.payload.body().take().unwrap_or(&no_content_bytes);
let inscription_content_bytes = envelope.payload.body().unwrap_or(&no_content_bytes);
let mut content_bytes = "0x".to_string();
content_bytes.push_str(&hex::encode(&inscription_content_bytes));
content_bytes.push_str(&hex::encode(inscription_content_bytes));
let parents = envelope
.payload
@@ -69,15 +73,9 @@ pub fn parse_inscriptions_from_witness(
.iter()
.map(|i| i.to_string())
.collect();
let delegate = envelope
.payload
.delegate()
.and_then(|i| Some(i.to_string()));
let metaprotocol = envelope
.payload
.metaprotocol()
.and_then(|p| Some(p.to_string()));
let metadata = envelope.payload.metadata().and_then(|m| Some(json!(m)));
let delegate = envelope.payload.delegate().map(|i| i.to_string());
let metaprotocol = envelope.payload.metaprotocol().map(|p| p.to_string());
let metadata = envelope.payload.metadata().map(|m| json!(m));
// Most of these fields will be calculated later when we know for certain which satoshi contains this inscription.
let reveal_data = OrdinalInscriptionRevealData {
@@ -100,7 +98,7 @@ pub fn parse_inscriptions_from_witness(
ordinal_block_height: 0,
ordinal_offset: 0,
transfers_pre_inscription: 0,
satpoint_post_inscription: format!(""),
satpoint_post_inscription: String::new(),
curse_type,
charms: 0,
unbound_sequence: None,
@@ -133,8 +131,7 @@ pub fn parse_inscriptions_from_standardized_tx(
) {
for (reveal, inscription) in inscriptions.into_iter() {
if let Some(brc20) = config.ordinals_brc20_config() {
if brc20.enabled && block_identifier.index >= brc20_activation_height(&network)
{
if brc20.enabled && block_identifier.index >= brc20_activation_height(network) {
match parse_brc20_operation(&inscription) {
Ok(Some(op)) => {
brc20_operation_map.insert(reveal.inscription_id.clone(), op);
@@ -179,9 +176,8 @@ mod test {
use chainhook_types::OrdinalOperation;
use config::Config;
use crate::core::test_builders::{TestBlockBuilder, TestTransactionBuilder, TestTxInBuilder};
use super::parse_inscriptions_in_standardized_block;
use crate::core::test_builders::{TestBlockBuilder, TestTransactionBuilder, TestTxInBuilder};
#[test]
fn parses_inscriptions_in_block() {

View File

@@ -1,7 +1,7 @@
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
hash::BuildHasherDefault,
sync::Arc,
sync::{mpsc::channel, Arc},
};
use bitcoin::Network;
@@ -16,36 +16,32 @@ use crossbeam_channel::unbounded;
use dashmap::DashMap;
use deadpool_postgres::Transaction;
use fxhash::FxHasher;
use crate::core::protocol::satoshi_tracking::UNBOUND_INSCRIPTION_SATPOINT;
use crate::{
core::resolve_absolute_pointer,
db::{self, cursor::TransactionBytesCursor, ordinals_pg},
try_debug, try_error, try_info,
utils::format_inscription_id,
};
use ord::{charm::Charm, sat::Sat};
use std::sync::mpsc::channel;
use super::{
satoshi_numbering::{compute_satoshi_number, TraversalResult},
satoshi_tracking::compute_satpoint_post_transfer,
sequence_cursor::SequenceCursor,
};
use crate::{
core::{protocol::satoshi_tracking::UNBOUND_INSCRIPTION_SATPOINT, resolve_absolute_pointer},
db::{self, cursor::TransactionBytesCursor, ordinals_pg},
try_debug, try_error, try_info,
utils::format_inscription_id,
};
/// Parallelize the computation of ordinals numbers for inscriptions present in a block.
///
/// This function will:
/// 1) Limit the number of ordinals numbers to compute by filtering out all the ordinals numbers pre-computed
/// and present in the L1 cache.
/// and present in the L1 cache.
/// 2) Create a threadpool, by spawning as many threads as specified by the config to process the batch ordinals to
/// retrieve
/// retrieve the ordinal number.
/// 3) Consume eventual entries in cache L1
/// 4) Inject the ordinals to compute (random order) in a priority queue
/// via the command line).
/// via the command line).
/// 5) Keep injecting ordinals from next blocks (if any) as long as the ordinals from the current block are not all
/// computed and augment the cache L1 for future blocks.
/// computed and augment the cache L1 for future blocks.
///
/// If the block has already been computed in the past (so presence of ordinals number present in the `inscriptions` db)
/// the transaction is removed from the set to compute, and not injected in L1 either.
@@ -59,7 +55,7 @@ use super::{
///
pub fn parallelize_inscription_data_computations(
block: &BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
next_blocks: &[BitcoinBlockData],
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize, u64), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
config: &Config,
@@ -89,7 +85,7 @@ pub fn parallelize_inscription_data_computations(
let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
let blocks_db = Arc::new(db::blocks::open_blocks_db_with_retry(false, &config, &ctx));
let blocks_db = Arc::new(db::blocks::open_blocks_db_with_retry(false, config, ctx));
let thread_pool_capacity = config.resources.get_optimal_thread_pool_capacity();
for thread_index in 0..thread_pool_capacity {
@@ -175,11 +171,11 @@ pub fn parallelize_inscription_data_computations(
}
// Feed each worker from the thread pool with 2 workitems each
for thread_index in 0..thread_pool_capacity {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
for thread in &tx_thread_pool {
let _ = thread.send(priority_queue.pop_front());
}
for thread_index in 0..thread_pool_capacity {
let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front());
for thread in &tx_thread_pool {
let _ = thread.send(priority_queue.pop_front());
}
let mut next_block_iter = next_blocks.iter();
@@ -220,34 +216,28 @@ pub fn parallelize_inscription_data_computations(
if let Some(w) = priority_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(w) = warmup_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else {
if let Some(next_block) = next_block_iter.next() {
let (transactions_ids, _) = get_transactions_to_process(next_block, cache_l1);
} else if let Some(w) = warmup_queue.pop_front() {
let _ = tx_thread_pool[thread_index].send(Some(w));
} else if let Some(next_block) = next_block_iter.next() {
let (transactions_ids, _) = get_transactions_to_process(next_block, cache_l1);
try_info!(
inner_ctx,
"Number of inscriptions in block #{} to pre-process: {}",
block.block_identifier.index,
transactions_ids.len()
);
try_info!(
inner_ctx,
"Number of inscriptions in block #{} to pre-process: {}",
block.block_identifier.index,
transactions_ids.len()
);
for (transaction_id, input_index, inscription_pointer) in
transactions_ids.into_iter()
{
warmup_queue.push_back((
transaction_id,
next_block.block_identifier.clone(),
input_index,
inscription_pointer,
false,
));
}
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
}
for (transaction_id, input_index, inscription_pointer) in transactions_ids.into_iter() {
warmup_queue.push_back((
transaction_id,
next_block.block_identifier.clone(),
input_index,
inscription_pointer,
false,
));
}
let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front());
}
}
try_debug!(
@@ -259,9 +249,10 @@ pub fn parallelize_inscription_data_computations(
// Collect eventual results for incoming blocks
for tx in tx_thread_pool.iter() {
// Empty the queue
if let Ok((traversal_result, _prioritary, thread_index)) = traversal_rx.try_recv() {
if let Ok((traversal, inscription_pointer, _)) = traversal_result {
try_debug!(
if let Ok((Ok((traversal, inscription_pointer, _)), _prioritary, thread_index)) =
traversal_rx.try_recv()
{
try_debug!(
inner_ctx,
"Completed ordinal number retrieval for Satpoint {}:{}:{} (block: #{}:{}, transfers: {}, pre-retrieval, thread: {thread_index})",
traversal.transaction_identifier_inscription.hash,
@@ -271,15 +262,14 @@ pub fn parallelize_inscription_data_computations(
traversal.get_ordinal_coinbase_offset(),
traversal.transfers
);
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
inscription_pointer,
),
traversal,
);
}
cache_l1.insert(
(
traversal.transaction_identifier_inscription.clone(),
traversal.inscription_input_index,
inscription_pointer,
),
traversal,
);
}
let _ = tx.send(None);
}
@@ -306,7 +296,7 @@ pub fn parallelize_inscription_data_computations(
/// 1) Retrieve all the eventual inscriptions previously stored in DB for the block
/// 2) Traverse the list of transaction present in the block (except coinbase).
/// 3) Check if the transaction is present in the cache L1 and augment the cache hit list accordingly and move on to the
/// next transaction.
/// next transaction.
/// 4) Check if the transaction was processed in the pastand move on to the next transaction.
/// 5) Augment the list of transaction to process.
///
@@ -557,7 +547,7 @@ async fn update_tx_inscriptions_with_consensus_sequence_data(
}
let (destination, satpoint_post_transfer, output_value) =
compute_satpoint_post_transfer(&&*tx, input_index, relative_offset, network, ctx);
compute_satpoint_post_transfer(&*tx, input_index, relative_offset, network, ctx);
inscription.satpoint_post_inscription = satpoint_post_transfer;
inscription_subindex += 1;
@@ -610,8 +600,6 @@ async fn update_tx_inscriptions_with_consensus_sequence_data(
mod test {
use std::collections::BTreeMap;
use test_case::test_case;
use chainhook_postgres::{pg_begin, pg_pool_client};
use chainhook_sdk::utils::Context;
use chainhook_types::{
@@ -620,7 +608,9 @@ mod test {
OrdinalOperation, TransactionIdentifier,
};
use ord::charm::Charm;
use test_case::test_case;
use super::update_block_inscriptions_with_consensus_sequence_data;
use crate::{
core::{
protocol::{satoshi_numbering::TraversalResult, sequence_cursor::SequenceCursor},
@@ -632,8 +622,6 @@ mod test {
},
};
use super::update_block_inscriptions_with_consensus_sequence_data;
#[test_case(None => Ok(("0000000000000000000000000000000000000000000000000000000000000000:0:0".into(), Some(0))); "first unbound sequence")]
#[test_case(Some(230) => Ok(("0000000000000000000000000000000000000000000000000000000000000000:0:231".into(), Some(231))); "next unbound sequence")]
#[tokio::test]

View File

@@ -1,17 +1,19 @@
use std::{hash::BuildHasherDefault, sync::Arc};
use chainhook_sdk::utils::Context;
use chainhook_types::{BlockIdentifier, OrdinalInscriptionNumber, TransactionIdentifier};
use config::Config;
use dashmap::DashMap;
use fxhash::FxHasher;
use std::hash::BuildHasherDefault;
use std::sync::Arc;
use ord::{height::Height, sat::Sat};
use crate::db::blocks::find_pinned_block_bytes_at_block_height;
use config::Config;
use crate::db::cursor::{BlockBytesCursor, TransactionBytesCursor};
use crate::try_error;
use ord::height::Height;
use ord::sat::Sat;
use crate::{
db::{
blocks::find_pinned_block_bytes_at_block_height,
cursor::{BlockBytesCursor, TransactionBytesCursor},
},
try_error,
};
#[derive(Clone, Debug)]
pub struct TraversalResult {
@@ -60,41 +62,38 @@ pub fn compute_satoshi_number(
let mut back_track = vec![];
let (mut tx_cursor, mut ordinal_block_number) = match traversals_cache
.get(&(block_identifier.index as u32, txid.clone()))
.get(&(block_identifier.index as u32, txid))
{
Some(entry) => {
let tx = entry.value();
(
(
tx.inputs[inscription_input_index].txin.clone(),
tx.inputs[inscription_input_index].txin,
tx.inputs[inscription_input_index].vout.into(),
),
tx.inputs[inscription_input_index].block_height,
)
}
None => loop {
match find_pinned_block_bytes_at_block_height(ordinal_block_number, 3, &blocks_db, &ctx)
{
None => {
match find_pinned_block_bytes_at_block_height(ordinal_block_number, 3, blocks_db, ctx) {
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
}
Some(block_bytes) => {
let cursor = BlockBytesCursor::new(&block_bytes.as_ref());
let cursor = BlockBytesCursor::new(block_bytes.as_ref());
match cursor.find_and_serialize_transaction_with_txid(&txid) {
Some(tx) => {
break (
(
tx.inputs[inscription_input_index].txin.clone(),
tx.inputs[inscription_input_index].vout.into(),
),
tx.inputs[inscription_input_index].block_height,
);
}
Some(tx) => (
(
tx.inputs[inscription_input_index].txin,
tx.inputs[inscription_input_index].vout.into(),
),
tx.inputs[inscription_input_index].block_height,
),
None => return Err(format!("txid not in block #{ordinal_block_number}")),
}
}
}
},
}
};
let mut hops: u32 = 0;
@@ -128,7 +127,7 @@ pub fn compute_satoshi_number(
if sats_out < sats_in {
ordinal_offset = sats_out - (sats_in - input.txin_value);
ordinal_block_number = input.block_height;
tx_cursor = (input.txin.clone(), input.vout as usize);
tx_cursor = (input.txin, input.vout as usize);
next_found_in_cache = true;
break;
}
@@ -159,17 +158,10 @@ pub fn compute_satoshi_number(
}
let pinned_block_bytes = {
loop {
match find_pinned_block_bytes_at_block_height(
ordinal_block_number,
3,
&blocks_db,
&ctx,
) {
Some(block) => break block,
None => {
return Err(format!("block #{ordinal_block_number} not in database (traversing {} / {} in progress)", transaction_identifier.hash, block_identifier.index));
}
match find_pinned_block_bytes_at_block_height(ordinal_block_number, 3, blocks_db, ctx) {
Some(block) => block,
None => {
return Err(format!("block #{ordinal_block_number} not in database (traversing {} / {} in progress)", transaction_identifier.hash, block_identifier.index));
}
}
};
@@ -189,7 +181,7 @@ pub fn compute_satoshi_number(
}
ordinal_offset += intra_coinbase_output_offset;
let subsidy = Height(ordinal_block_number.into()).subsidy();
let subsidy = Height(ordinal_block_number).subsidy();
if ordinal_offset < subsidy {
// Great!
break;
@@ -223,7 +215,7 @@ pub fn compute_satoshi_number(
if sats_in > total_out {
ordinal_offset = total_out - (sats_in - input.txin_value);
ordinal_block_number = input.block_height;
tx_cursor = (input.txin.clone(), input.vout as usize);
tx_cursor = (input.txin, input.vout as usize);
break;
}
}
@@ -262,12 +254,12 @@ pub fn compute_satoshi_number(
sats_in += input.txin_value;
if sats_out < sats_in {
back_track.push((ordinal_block_number, tx_cursor.0.clone(), tx_cursor.1));
back_track.push((ordinal_block_number, tx_cursor.0, tx_cursor.1));
traversals_cache
.insert((ordinal_block_number, tx_cursor.0), tx_bytes_cursor.clone());
ordinal_offset = sats_out - (sats_in - input.txin_value);
ordinal_block_number = input.block_height;
tx_cursor = (input.txin.clone(), input.vout as usize);
tx_cursor = (input.txin, input.vout as usize);
break;
}
}
@@ -293,7 +285,7 @@ pub fn compute_satoshi_number(
}
}
let height = Height(ordinal_block_number.into());
let height = Height(ordinal_block_number);
let ordinal_number = height.starting_sat().0 + ordinal_offset;
Ok((
@@ -315,9 +307,11 @@ mod test {
use chainhook_sdk::utils::Context;
use chainhook_types::{bitcoin::TxOut, BlockIdentifier, TransactionIdentifier};
use config::Config;
use dashmap::DashMap;
use fxhash::FxHasher;
use super::compute_satoshi_number;
use crate::{
core::{
new_traversals_lazy_cache,
@@ -329,9 +323,6 @@ mod test {
drop_all_dbs,
},
};
use config::Config;
use super::compute_satoshi_number;
fn store_tx_in_traversals_cache(
cache: &DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>,

View File

@@ -8,6 +8,7 @@ use chainhook_types::{
};
use deadpool_postgres::Transaction;
use super::inscription_sequencing::get_bitcoin_network;
use crate::{
core::{compute_next_satpoint_data, SatPosition},
db::ordinals_pg,
@@ -15,8 +16,6 @@ use crate::{
utils::format_outpoint_to_watch,
};
use super::inscription_sequencing::get_bitcoin_network;
pub const UNBOUND_INSCRIPTION_SATPOINT: &str =
"0000000000000000000000000000000000000000000000000000000000000000:0";
@@ -31,7 +30,7 @@ pub fn parse_output_and_offset_from_satpoint(
) -> Result<(String, Option<u64>), String> {
let parts: Vec<&str> = satpoint.split(':').collect();
let tx_id = parts
.get(0)
.first()
.ok_or("get_output_and_offset_from_satpoint: tx_id not found")?;
let output = parts
.get(1)
@@ -80,8 +79,13 @@ pub fn compute_satpoint_post_transfer(
.inputs
.iter()
.map(|o| o.previous_output.value)
.collect::<_>();
let outputs = tx.metadata.outputs.iter().map(|o| o.value).collect::<_>();
.collect::<Vec<_>>();
let outputs = tx
.metadata
.outputs
.iter()
.map(|o| o.value)
.collect::<Vec<_>>();
let post_transfer_data = compute_next_satpoint_data(
input_index,
&inputs,
@@ -95,8 +99,8 @@ pub fn compute_satpoint_post_transfer(
SatPosition::Output((output_index, offset)) => {
let outpoint = format_outpoint_to_watch(&tx.transaction_identifier, output_index);
let script_pub_key_hex = tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match ScriptBuf::from_hex(&script_pub_key_hex) {
Ok(script) => match Address::from_script(&script, network.clone()) {
let updated_address = match ScriptBuf::from_hex(script_pub_key_hex) {
Ok(script) => match Address::from_script(&script, network) {
Ok(address) => {
OrdinalInscriptionTransferDestination::Transferred(address.to_string())
}
@@ -192,7 +196,7 @@ pub async fn augment_transaction_with_ordinal_transfers(
let Some(entries) = input_satpoints.get(&input_index) else {
continue;
};
for watched_satpoint in entries.into_iter() {
for watched_satpoint in entries.iter() {
if updated_sats.contains(&watched_satpoint.ordinal_number) {
continue;
}
@@ -207,7 +211,7 @@ pub async fn augment_transaction_with_ordinal_transfers(
let (destination, satpoint_post_transfer, post_transfer_output_value) =
compute_satpoint_post_transfer(
&&*tx,
&*tx,
input_index,
watched_satpoint.offset,
network,
@@ -224,9 +228,7 @@ pub async fn augment_transaction_with_ordinal_transfers(
};
// Keep an in-memory copy of this watchpoint at its new tx output for later retrieval.
let (output, _) = parse_output_and_offset_from_satpoint(&satpoint_post_transfer)?;
let entry = block_transferred_satpoints
.entry(output)
.or_insert(vec![]);
let entry = block_transferred_satpoints.entry(output).or_default();
entry.push(watched_satpoint.clone());
try_info!(
@@ -256,6 +258,7 @@ mod test {
OrdinalInscriptionTransferDestination, OrdinalOperation,
};
use super::compute_satpoint_post_transfer;
use crate::{
core::{
protocol::satoshi_tracking::augment_block_with_transfers,
@@ -266,8 +269,6 @@ mod test {
db::{ordinals_pg, pg_reset_db, pg_test_connection, pg_test_connection_pool},
};
use super::compute_satpoint_post_transfer;
#[tokio::test]
async fn tracks_chained_satoshi_transfers_in_block() -> Result<(), String> {
let ordinal_number: u64 = 283888212016616;

View File

@@ -2,9 +2,8 @@ use bitcoin::Network;
use chainhook_types::OrdinalInscriptionNumber;
use deadpool_postgres::GenericClient;
use crate::db::ordinals_pg;
use super::inscription_sequencing;
use crate::db::ordinals_pg;
/// Helper caching inscription sequence cursor
///
@@ -20,6 +19,12 @@ pub struct SequenceCursor {
current_block_height: u64,
}
impl Default for SequenceCursor {
fn default() -> Self {
Self::new()
}
}
impl SequenceCursor {
pub fn new() -> Self {
SequenceCursor {
@@ -56,8 +61,7 @@ impl SequenceCursor {
false => self.pick_next_pos_classic(client).await?,
};
let jubilee = if block_height >= inscription_sequencing::get_jubilee_block_height(&network)
{
let jubilee = if block_height >= inscription_sequencing::get_jubilee_block_height(network) {
self.pick_next_jubilee_number(client).await?
} else {
classic
@@ -107,8 +111,8 @@ impl SequenceCursor {
match self.jubilee_cursor {
None => match ordinals_pg::get_highest_inscription_number(client).await? {
Some(inscription_number) => {
self.jubilee_cursor = Some(inscription_number as i64);
Ok(inscription_number as i64 + 1)
self.jubilee_cursor = Some(inscription_number);
Ok(inscription_number + 1)
}
_ => Ok(0),
},
@@ -167,10 +171,10 @@ impl SequenceCursor {
mod test {
use bitcoin::Network;
use chainhook_postgres::{pg_begin, pg_pool_client};
use chainhook_types::OrdinalOperation;
use test_case::test_case;
use super::SequenceCursor;
use crate::{
core::test_builders::{TestBlockBuilder, TestTransactionBuilder},
db::{
@@ -179,8 +183,6 @@ mod test {
},
};
use super::SequenceCursor;
#[test_case((780000, false) => Ok((2, 2)); "with blessed pre jubilee")]
#[test_case((780000, true) => Ok((-2, -2)); "with cursed pre jubilee")]
#[test_case((850000, false) => Ok((2, 2)); "with blessed post jubilee")]

View File

@@ -48,11 +48,11 @@ fn rocks_db_default_options(ulimit: usize, _memory_available: usize) -> Options
pub fn open_blocks_db_with_retry(readwrite: bool, config: &Config, ctx: &Context) -> DB {
let mut retries = 0;
let blocks_db = loop {
loop {
let res = if readwrite {
open_readwrite_blocks_db(config, &ctx)
open_readwrite_blocks_db(config, ctx)
} else {
open_readonly_blocks_db(config, &ctx)
open_readonly_blocks_db(config, ctx)
};
match res {
Ok(db) => break db,
@@ -67,8 +67,7 @@ pub fn open_blocks_db_with_retry(readwrite: bool, config: &Config, ctx: &Context
continue;
}
}
};
blocks_db
}
}
pub fn open_readonly_blocks_db(config: &Config, _ctx: &Context) -> Result<DB, String> {
@@ -78,15 +77,15 @@ pub fn open_readonly_blocks_db(config: &Config, _ctx: &Context) -> Result<DB, St
opts.set_disable_auto_compactions(true);
opts.set_max_background_jobs(0);
let db = DB::open_for_read_only(&opts, path, false)
.map_err(|e| format!("unable to read hord.rocksdb: {}", e.to_string()))?;
.map_err(|e| format!("unable to read hord.rocksdb: {}", e))?;
Ok(db)
}
fn open_readwrite_blocks_db(config: &Config, _ctx: &Context) -> Result<DB, String> {
let path = get_default_blocks_db_path(&config.expected_cache_path());
let opts = rocks_db_default_options(config.resources.ulimit, config.resources.memory_available);
let db = DB::open(&opts, path)
.map_err(|e| format!("unable to read-write hord.rocksdb: {}", e.to_string()))?;
let db =
DB::open(&opts, path).map_err(|e| format!("unable to read-write hord.rocksdb: {}", e))?;
Ok(db)
}
@@ -100,7 +99,7 @@ pub fn insert_entry_in_blocks(
let block_height_bytes = block_height.to_be_bytes();
let mut retries = 0;
loop {
let res = blocks_db_rw.put(&block_height_bytes, block_bytes);
let res = blocks_db_rw.put(block_height_bytes, block_bytes);
match res {
Ok(_) => break,
Err(e) => {
@@ -165,7 +164,7 @@ pub fn find_pinned_block_bytes_at_block_height<'a>(
}
}
pub fn find_block_bytes_at_block_height<'a>(
pub fn find_block_bytes_at_block_height(
block_height: u32,
retry: u8,
blocks_db: &DB,
@@ -202,13 +201,13 @@ pub fn find_block_bytes_at_block_height<'a>(
pub fn run_compaction(blocks_db_rw: &DB, lim: u32) {
let gen = 0u32.to_be_bytes();
let _ = blocks_db_rw.compact_range(Some(&gen), Some(&lim.to_be_bytes()));
blocks_db_rw.compact_range(Some(&gen), Some(&lim.to_be_bytes()));
}
pub fn find_missing_blocks(blocks_db: &DB, start: u32, end: u32, ctx: &Context) -> Vec<u32> {
let mut missing_blocks = vec![];
for i in start..=end {
if find_pinned_block_bytes_at_block_height(i as u32, 0, &blocks_db, ctx).is_none() {
if find_pinned_block_bytes_at_block_height(i, 0, blocks_db, ctx).is_none() {
missing_blocks.push(i);
}
}

View File

@@ -1,5 +1,4 @@
use std::io::Cursor;
use std::io::{Read, Write};
use std::io::{Cursor, Read, Write};
use chainhook_sdk::indexer::bitcoin::BitcoinBlockFullBreakdown;
use chainhook_types::BitcoinBlockData;
@@ -63,7 +62,7 @@ const SATS_LEN: usize = 8;
const INPUT_SIZE: usize = TXID_LEN + 4 + 2 + SATS_LEN;
const OUTPUT_SIZE: usize = 8;
impl<'a> BlockBytesCursor<'a> {
impl BlockBytesCursor<'_> {
pub fn new(bytes: &[u8]) -> BlockBytesCursor {
let tx_len = u16::from_be_bytes([bytes[0], bytes[1]]);
BlockBytesCursor { bytes, tx_len }
@@ -131,7 +130,7 @@ impl<'a> BlockBytesCursor<'a> {
let mut txin_value = [0u8; 8];
cursor.read_exact(&mut txin_value).expect("data corrupted");
inputs.push(TransactionInputBytesCursor {
txin: txin,
txin,
block_height: u32::from_be_bytes(block_height),
vout: u16::from_be_bytes(vout),
txin_value: u64::from_be_bytes(txin_value),
@@ -186,14 +185,14 @@ impl<'a> BlockBytesCursor<'a> {
}
pub fn iter_tx(&self) -> TransactionBytesCursorIterator {
TransactionBytesCursorIterator::new(&self)
TransactionBytesCursorIterator::new(self)
}
pub fn from_full_block<'b>(block: &BitcoinBlockFullBreakdown) -> std::io::Result<Vec<u8>> {
pub fn from_full_block(block: &BitcoinBlockFullBreakdown) -> std::io::Result<Vec<u8>> {
let mut buffer = vec![];
// Number of transactions in the block (not including coinbase)
let tx_len = block.tx.len() as u16;
buffer.write(&tx_len.to_be_bytes())?;
buffer.write_all(&tx_len.to_be_bytes())?;
// For each transaction:
let u16_max = u16::MAX as usize;
for (i, tx) in block.tx.iter().enumerate() {
@@ -211,15 +210,15 @@ impl<'a> BlockBytesCursor<'a> {
inputs_len = 0;
}
// Number of inputs
buffer.write(&inputs_len.to_be_bytes())?;
buffer.write_all(&inputs_len.to_be_bytes())?;
// Number of outputs
buffer.write(&outputs_len.to_be_bytes())?;
buffer.write_all(&outputs_len.to_be_bytes())?;
}
// For each transaction:
for tx in block.tx.iter() {
// txid - 8 first bytes
let txid = {
let txid = hex::decode(tx.txid.to_string()).unwrap();
let txid = hex::decode(&tx.txid).unwrap();
[
txid[0], txid[1], txid[2], txid[3], txid[4], txid[5], txid[6], txid[7],
]
@@ -229,12 +228,12 @@ impl<'a> BlockBytesCursor<'a> {
let inputs_len = if tx.vin.len() > u16_max {
0
} else {
tx.vin.len() as usize
tx.vin.len()
};
let outputs_len = if tx.vout.len() > u16_max {
0
} else {
tx.vout.len() as usize
tx.vout.len()
};
// For each transaction input:
@@ -253,29 +252,29 @@ impl<'a> BlockBytesCursor<'a> {
buffer.write_all(&txin)?;
// txin's block height
let block_height = input.prevout.as_ref().unwrap().height as u32;
buffer.write(&block_height.to_be_bytes())?;
buffer.write_all(&block_height.to_be_bytes())?;
// txin's vout index
let vout = input.vout.unwrap() as u16;
buffer.write(&vout.to_be_bytes())?;
buffer.write_all(&vout.to_be_bytes())?;
// txin's sats value
let sats = input.prevout.as_ref().unwrap().value.to_sat();
buffer.write(&sats.to_be_bytes())?;
buffer.write_all(&sats.to_be_bytes())?;
}
// For each transaction output:
for i in 0..outputs_len {
let output = &tx.vout[i];
let sats = output.value.to_sat();
buffer.write(&sats.to_be_bytes())?;
buffer.write_all(&sats.to_be_bytes())?;
}
}
Ok(buffer)
}
pub fn from_standardized_block<'b>(block: &BitcoinBlockData) -> std::io::Result<Vec<u8>> {
pub fn from_standardized_block(block: &BitcoinBlockData) -> std::io::Result<Vec<u8>> {
let mut buffer = vec![];
// Number of transactions in the block (not including coinbase)
let tx_len = block.transactions.len() as u16;
buffer.write(&tx_len.to_be_bytes())?;
buffer.write_all(&tx_len.to_be_bytes())?;
// For each transaction:
for (i, tx) in block.transactions.iter().enumerate() {
let inputs_len = if i > 0 {
@@ -285,9 +284,9 @@ impl<'a> BlockBytesCursor<'a> {
};
let outputs_len = tx.metadata.outputs.len() as u16;
// Number of inputs
buffer.write(&inputs_len.to_be_bytes())?;
buffer.write_all(&inputs_len.to_be_bytes())?;
// Number of outputs
buffer.write(&outputs_len.to_be_bytes())?;
buffer.write_all(&outputs_len.to_be_bytes())?;
}
// For each transaction:
for (i, tx) in block.transactions.iter().enumerate() {
@@ -302,19 +301,19 @@ impl<'a> BlockBytesCursor<'a> {
buffer.write_all(&txin)?;
// txin's block height
let block_height = input.previous_output.block_height as u32;
buffer.write(&block_height.to_be_bytes())?;
buffer.write_all(&block_height.to_be_bytes())?;
// txin's vout index
let vout = input.previous_output.vout as u16;
buffer.write(&vout.to_be_bytes())?;
buffer.write_all(&vout.to_be_bytes())?;
// txin's sats value
let sats = input.previous_output.value;
buffer.write(&sats.to_be_bytes())?;
buffer.write_all(&sats.to_be_bytes())?;
}
}
// For each transaction output:
for output in tx.metadata.outputs.iter() {
let sats = output.value;
buffer.write(&sats.to_be_bytes())?;
buffer.write_all(&sats.to_be_bytes())?;
}
}
Ok(buffer)
@@ -337,7 +336,7 @@ impl<'a> TransactionBytesCursorIterator<'a> {
}
}
impl<'a> Iterator for TransactionBytesCursorIterator<'a> {
impl Iterator for TransactionBytesCursorIterator<'_> {
type Item = TransactionBytesCursor;
fn next(&mut self) -> Option<TransactionBytesCursor> {
@@ -366,13 +365,14 @@ impl<'a> Iterator for TransactionBytesCursorIterator<'a> {
#[cfg(test)]
mod tests {
use super::*;
use chainhook_sdk::{
indexer::bitcoin::{parse_downloaded_block, standardize_bitcoin_block},
utils::Context,
};
use chainhook_types::BitcoinNetwork;
use super::*;
#[test]
fn test_block_cursor_roundtrip() {
let ctx = Context::empty();

View File

@@ -4,7 +4,6 @@ pub mod models;
pub mod ordinals_pg;
use chainhook_postgres::pg_connect_with_retry;
use chainhook_sdk::utils::Context;
use config::Config;

View File

@@ -37,7 +37,7 @@ impl DbCurrentLocation {
tx_index: PgBigIntU32(tx_index as u32),
address: reveal.inscriber_address.clone(),
output,
offset: offset.map(|o| PgNumericU64(o)),
offset: offset.map(PgNumericU64),
}
}
@@ -62,7 +62,7 @@ impl DbCurrentLocation {
OrdinalInscriptionTransferDestination::Burnt(_) => None,
},
output,
offset: offset.map(|o| PgNumericU64(o)),
offset: offset.map(PgNumericU64),
}
}
}

View File

@@ -79,7 +79,7 @@ impl DbInscription {
}),
recursive: false, // This will be determined later
input_index: PgBigIntU32(reveal.inscription_input_index as u32),
pointer: reveal.inscription_pointer.map(|p| PgNumericU64(p)),
pointer: reveal.inscription_pointer.map(PgNumericU64),
metadata: reveal.metadata.as_ref().map(|m| m.to_string()),
metaprotocol: reveal.metaprotocol.clone(),
delegate: reveal.delegate.clone(),

View File

@@ -45,7 +45,7 @@ impl DbLocation {
block_hash: block_identifier.hash[2..].to_string(),
address: reveal.inscriber_address.clone(),
output,
offset: offset.map(|o| PgNumericU64(o)),
offset: offset.map(PgNumericU64),
prev_output: None,
prev_offset: None,
value: Some(PgNumericU64(reveal.inscription_output_value)),
@@ -88,10 +88,10 @@ impl DbLocation {
OrdinalInscriptionTransferDestination::Burnt(_) => None,
},
output,
offset: offset.map(|o| PgNumericU64(o)),
offset: offset.map(PgNumericU64),
prev_output: Some(prev_output),
prev_offset: prev_offset.map(|o| PgNumericU64(o)),
value: transfer.post_transfer_output_value.map(|v| PgNumericU64(v)),
prev_offset: prev_offset.map(PgNumericU64),
value: transfer.post_transfer_output_value.map(PgNumericU64),
transfer_type: match transfer.destination {
OrdinalInscriptionTransferDestination::Transferred(_) => "transferred".to_string(),
OrdinalInscriptionTransferDestination::SpentInFees => "spent_in_fees".to_string(),

View File

@@ -1,8 +1,7 @@
use chainhook_postgres::{types::PgNumericU64, FromPgRow};
use chainhook_types::OrdinalInscriptionRevealData;
use tokio_postgres::Row;
use ord::{rarity::Rarity, sat::Sat};
use tokio_postgres::Row;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DbSatoshi {

View File

@@ -1,13 +1,13 @@
mod db_current_location;
mod db_inscription;
mod db_inscription_recursion;
mod db_inscription_parent;
mod db_inscription_recursion;
mod db_location;
mod db_satoshi;
pub use db_current_location::DbCurrentLocation;
pub use db_inscription::DbInscription;
pub use db_inscription_parent::DbInscriptionParent;
pub use db_inscription_recursion::DbInscriptionRecursion;
pub use db_location::DbLocation;
pub use db_satoshi::DbSatoshi;
pub use db_inscription_parent::DbInscriptionParent;

View File

@@ -11,14 +11,13 @@ use deadpool_postgres::GenericClient;
use refinery::embed_migrations;
use tokio_postgres::{types::ToSql, Client};
use crate::core::protocol::{
satoshi_numbering::TraversalResult, satoshi_tracking::WatchedSatpoint,
};
use super::models::{
DbCurrentLocation, DbInscription, DbInscriptionParent, DbInscriptionRecursion, DbLocation,
DbSatoshi,
};
use crate::core::protocol::{
satoshi_numbering::TraversalResult, satoshi_tracking::WatchedSatpoint,
};
embed_migrations!("../../migrations/ordinals");
pub async fn migrate(client: &mut Client) -> Result<(), String> {
@@ -115,7 +114,7 @@ pub async fn get_reinscriptions_for_block<T: GenericClient>(
client: &T,
) -> Result<HashMap<u64, String>, String> {
let mut ordinal_numbers = vec![];
for (_, value) in inscriptions_data {
for value in inscriptions_data.values() {
if value.ordinal_number != 0 {
ordinal_numbers.push(PgNumericU64(value.ordinal_number));
}
@@ -235,10 +234,10 @@ pub async fn get_inscribed_satpoints_at_tx_inputs<T: GenericClient>(
}
async fn insert_inscriptions<T: GenericClient>(
inscriptions: &Vec<DbInscription>,
inscriptions: &[DbInscription],
client: &T,
) -> Result<(), String> {
if inscriptions.len() == 0 {
if inscriptions.is_empty() {
return Ok(());
}
for chunk in inscriptions.chunks(500) {
@@ -286,10 +285,10 @@ async fn insert_inscriptions<T: GenericClient>(
}
async fn insert_inscription_recursions<T: GenericClient>(
inscription_recursions: &Vec<DbInscriptionRecursion>,
inscription_recursions: &[DbInscriptionRecursion],
client: &T,
) -> Result<(), String> {
if inscription_recursions.len() == 0 {
if inscription_recursions.is_empty() {
return Ok(());
}
for chunk in inscription_recursions.chunks(500) {
@@ -316,10 +315,10 @@ async fn insert_inscription_recursions<T: GenericClient>(
}
async fn insert_inscription_parents<T: GenericClient>(
inscription_parents: &Vec<DbInscriptionParent>,
inscription_parents: &[DbInscriptionParent],
client: &T,
) -> Result<(), String> {
if inscription_parents.len() == 0 {
if inscription_parents.is_empty() {
return Ok(());
}
for chunk in inscription_parents.chunks(500) {
@@ -346,10 +345,10 @@ async fn insert_inscription_parents<T: GenericClient>(
}
async fn insert_locations<T: GenericClient>(
locations: &Vec<DbLocation>,
locations: &[DbLocation],
client: &T,
) -> Result<(), String> {
if locations.len() == 0 {
if locations.is_empty() {
return Ok(());
}
for chunk in locations.chunks(500) {
@@ -436,10 +435,10 @@ async fn insert_locations<T: GenericClient>(
}
async fn insert_satoshis<T: GenericClient>(
satoshis: &Vec<DbSatoshi>,
satoshis: &[DbSatoshi],
client: &T,
) -> Result<(), String> {
if satoshis.len() == 0 {
if satoshis.is_empty() {
return Ok(());
}
for chunk in satoshis.chunks(500) {
@@ -556,7 +555,7 @@ async fn update_mime_type_counts<T: GenericClient>(
counts: &HashMap<String, i32>,
client: &T,
) -> Result<(), String> {
if counts.len() == 0 {
if counts.is_empty() {
return Ok(());
}
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
@@ -582,7 +581,7 @@ async fn update_sat_rarity_counts<T: GenericClient>(
counts: &HashMap<String, i32>,
client: &T,
) -> Result<(), String> {
if counts.len() == 0 {
if counts.is_empty() {
return Ok(());
}
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
@@ -608,7 +607,7 @@ async fn update_inscription_type_counts<T: GenericClient>(
counts: &HashMap<String, i32>,
client: &T,
) -> Result<(), String> {
if counts.len() == 0 {
if counts.is_empty() {
return Ok(());
}
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
@@ -634,7 +633,7 @@ async fn update_genesis_address_counts<T: GenericClient>(
counts: &HashMap<String, i32>,
client: &T,
) -> Result<(), String> {
if counts.len() == 0 {
if counts.is_empty() {
return Ok(());
}
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
@@ -660,7 +659,7 @@ async fn update_recursive_counts<T: GenericClient>(
counts: &HashMap<bool, i32>,
client: &T,
) -> Result<(), String> {
if counts.len() == 0 {
if counts.is_empty() {
return Ok(());
}
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
@@ -771,7 +770,7 @@ pub async fn insert_block<T: GenericClient>(
let mime_type = inscription.mime_type.clone();
let genesis_address = inscription.address.clone();
let recursions = DbInscriptionRecursion::from_reveal(reveal)?;
let is_recursive = recursions.len() > 0;
let is_recursive = !recursions.is_empty();
if is_recursive {
inscription.recursive = true;
}

View File

@@ -1,25 +1,14 @@
use crate::core::meta_protocols::brc20::cache::{brc20_new_cache, Brc20MemoryCache};
use crate::core::pipeline::bitcoind_download_blocks;
use crate::core::pipeline::processors::block_archiving::start_block_archiving_processor;
use crate::core::pipeline::processors::inscription_indexing::{
index_block, rollback_block, start_inscription_indexing_processor,
use std::{
collections::BTreeMap,
hash::BuildHasherDefault,
sync::{mpsc::channel, Arc},
};
use crate::core::protocol::sequence_cursor::SequenceCursor;
use crate::core::{
first_inscription_height, new_traversals_lazy_cache, should_sync_ordinals_db,
should_sync_rocks_db,
};
use crate::db::blocks::{self, find_missing_blocks, open_blocks_db_with_retry, run_compaction};
use crate::db::cursor::{BlockBytesCursor, TransactionBytesCursor};
use crate::db::ordinals_pg;
use crate::utils::monitoring::{start_serving_prometheus_metrics, PrometheusMonitoring};
use crate::{try_crit, try_error, try_info};
use chainhook_postgres::{pg_begin, pg_pool, pg_pool_client};
use chainhook_sdk::observer::{
start_event_observer, BitcoinBlockDataCached, ObserverEvent, ObserverSidecar,
use chainhook_sdk::{
observer::{start_event_observer, BitcoinBlockDataCached, ObserverEvent, ObserverSidecar},
utils::{bitcoind::bitcoind_wait_for_chain_tip, BlockHeights, Context},
};
use chainhook_sdk::utils::bitcoind::bitcoind_wait_for_chain_tip;
use chainhook_sdk::utils::{BlockHeights, Context};
use chainhook_types::BlockIdentifier;
use config::{Config, OrdinalsMetaProtocolsConfig};
use crossbeam_channel::select;
@@ -27,10 +16,31 @@ use dashmap::DashMap;
use deadpool_postgres::Pool;
use fxhash::FxHasher;
use std::collections::BTreeMap;
use std::hash::BuildHasherDefault;
use std::sync::mpsc::channel;
use std::sync::Arc;
use crate::{
core::{
first_inscription_height,
meta_protocols::brc20::cache::{brc20_new_cache, Brc20MemoryCache},
new_traversals_lazy_cache,
pipeline::{
bitcoind_download_blocks,
processors::{
block_archiving::start_block_archiving_processor,
inscription_indexing::{
index_block, rollback_block, start_inscription_indexing_processor,
},
},
},
protocol::sequence_cursor::SequenceCursor,
should_sync_ordinals_db, should_sync_rocks_db,
},
db::{
blocks::{self, find_missing_blocks, open_blocks_db_with_retry, run_compaction},
cursor::{BlockBytesCursor, TransactionBytesCursor},
ordinals_pg,
},
try_crit, try_error, try_info,
utils::monitoring::{start_serving_prometheus_metrics, PrometheusMonitoring},
};
#[derive(Debug, Clone)]
pub struct PgConnectionPools {
@@ -97,7 +107,7 @@ impl Service {
let ctx_cloned = self.ctx.clone();
let port = metrics.prometheus_port;
let _ = std::thread::spawn(move || {
let _ = hiro_system_kit::nestable_block_on(start_serving_prometheus_metrics(
hiro_system_kit::nestable_block_on(start_serving_prometheus_metrics(
port,
registry_moved,
ctx_cloned,
@@ -151,19 +161,16 @@ impl Service {
break;
}
};
match event {
ObserverEvent::Terminate => {
try_info!(&self.ctx, "Terminating runloop");
break;
}
_ => {}
if let ObserverEvent::Terminate = event {
try_info!(&self.ctx, "Terminating runloop");
break;
}
}
Ok(())
}
/// Rolls back index data for the specified block heights.
pub async fn rollback(&self, block_heights: &Vec<u64>) -> Result<(), String> {
pub async fn rollback(&self, block_heights: &[u64]) -> Result<(), String> {
for block_height in block_heights.iter() {
rollback_block(*block_height, &self.config, &self.pg_pools, &self.ctx).await?;
}
@@ -251,7 +258,7 @@ impl Service {
bitcoind_download_blocks(
&self.config,
missing_blocks.into_iter().map(|x| x as u64).collect(),
tip.into(),
tip,
&block_ingestion_processor,
10_000,
&self.ctx,
@@ -281,7 +288,7 @@ impl Service {
start_block_archiving_processor(&self.config, &self.ctx, true, None);
let blocks = BlockHeights::BlockRange(start_block, end_block)
.get_sorted_entries()
.map_err(|_e| format!("Block start / end block spec invalid"))?;
.map_err(|_e| "Block start / end block spec invalid".to_string())?;
bitcoind_download_blocks(
&self.config,
blocks.into(),
@@ -314,7 +321,7 @@ impl Service {
);
let blocks = BlockHeights::BlockRange(start_block, end_block)
.get_sorted_entries()
.map_err(|_e| format!("Block start / end block spec invalid"))?;
.map_err(|_e| "Block start / end block spec invalid".to_string())?;
bitcoind_download_blocks(
&self.config,
blocks.into(),
@@ -333,8 +340,8 @@ impl Service {
}
pub async fn chainhook_sidecar_mutate_blocks(
blocks_to_mutate: &mut Vec<BitcoinBlockDataCached>,
block_ids_to_rollback: &Vec<BlockIdentifier>,
blocks_to_mutate: &mut [BitcoinBlockDataCached],
block_ids_to_rollback: &[BlockIdentifier],
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
brc20_cache: &mut Option<Brc20MemoryCache>,
prometheus: &PrometheusMonitoring,
@@ -342,14 +349,14 @@ pub async fn chainhook_sidecar_mutate_blocks(
pg_pools: &PgConnectionPools,
ctx: &Context,
) -> Result<(), String> {
if block_ids_to_rollback.len() > 0 {
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
if !block_ids_to_rollback.is_empty() {
let blocks_db_rw = open_blocks_db_with_retry(true, config, ctx);
for block_id in block_ids_to_rollback.iter() {
blocks::delete_blocks_in_block_range(
block_id.index as u32,
block_id.index as u32,
&blocks_db_rw,
&ctx,
ctx,
);
rollback_block(block_id.index, config, pg_pools, ctx).await?;
}
@@ -372,13 +379,13 @@ pub async fn chainhook_sidecar_mutate_blocks(
}
};
{
let blocks_db_rw = open_blocks_db_with_retry(true, &config, ctx);
let blocks_db_rw = open_blocks_db_with_retry(true, config, ctx);
blocks::insert_entry_in_blocks(
cached_block.block.block_identifier.index as u32,
&block_bytes,
true,
&blocks_db_rw,
&ctx,
ctx,
);
blocks_db_rw
.flush()
@@ -391,12 +398,12 @@ pub async fn chainhook_sidecar_mutate_blocks(
&vec![],
&mut sequence_cursor,
&mut cache_l1,
&cache_l2,
cache_l2,
brc20_cache.as_mut(),
prometheus,
&config,
config,
pg_pools,
&ctx,
ctx,
)
.await?;
cached_block.processed_by_sidecar = true;

View File

@@ -4,16 +4,15 @@ pub mod monitoring;
use std::{
fs,
io::{Read, Write},
path::PathBuf,
path::Path,
};
use chainhook_types::TransactionIdentifier;
pub fn read_file_content_at_path(file_path: &PathBuf) -> Result<Vec<u8>, String> {
use std::fs::File;
use std::io::BufReader;
pub fn read_file_content_at_path(file_path: &Path) -> Result<Vec<u8>, String> {
use std::{fs::File, io::BufReader};
let file = File::open(file_path.clone())
let file = File::open(file_path)
.map_err(|e| format!("unable to read file {}\n{:?}", file_path.display(), e))?;
let mut file_reader = BufReader::new(file);
let mut file_buffer = vec![];
@@ -23,9 +22,9 @@ pub fn read_file_content_at_path(file_path: &PathBuf) -> Result<Vec<u8>, String>
Ok(file_buffer)
}
pub fn write_file_content_at_path(file_path: &PathBuf, content: &[u8]) -> Result<(), String> {
pub fn write_file_content_at_path(file_path: &Path, content: &[u8]) -> Result<(), String> {
use std::fs::File;
let mut parent_directory = file_path.clone();
let mut parent_directory = file_path.to_path_buf();
parent_directory.pop();
fs::create_dir_all(&parent_directory).map_err(|e| {
format!(
@@ -34,7 +33,7 @@ pub fn write_file_content_at_path(file_path: &PathBuf, content: &[u8]) -> Result
e
)
})?;
let mut file = File::create(&file_path)
let mut file = File::create(file_path)
.map_err(|e| format!("unable to open file {}\n{}", file_path.display(), e))?;
file.write_all(content)
.map_err(|e| format!("unable to write file {}\n{}", file_path.display(), e))?;
@@ -55,14 +54,18 @@ pub fn format_inscription_id(
pub fn parse_satpoint_to_watch(outpoint_to_watch: &str) -> (TransactionIdentifier, usize, u64) {
let comps: Vec<&str> = outpoint_to_watch.split(":").collect();
let tx = TransactionIdentifier::new(comps[0]);
let output_index = comps[1].to_string().parse::<usize>().expect(&format!(
"fatal: unable to extract output_index from outpoint {}",
outpoint_to_watch
));
let offset = comps[2].to_string().parse::<u64>().expect(&format!(
"fatal: unable to extract offset from outpoint {}",
outpoint_to_watch
));
let output_index = comps[1].to_string().parse::<usize>().unwrap_or_else(|_| {
panic!(
"fatal: unable to extract output_index from outpoint {}",
outpoint_to_watch
)
});
let offset = comps[2].to_string().parse::<u64>().unwrap_or_else(|_| {
panic!(
"fatal: unable to extract offset from outpoint {}",
outpoint_to_watch
)
});
(tx, output_index, offset)
}
@@ -79,20 +82,24 @@ pub fn format_outpoint_to_watch(
pub fn parse_inscription_id(inscription_id: &str) -> (TransactionIdentifier, usize) {
let comps: Vec<&str> = inscription_id.split("i").collect();
let tx = TransactionIdentifier::new(&comps[0]);
let output_index = comps[1].to_string().parse::<usize>().expect(&format!(
"fatal: unable to extract output_index from inscription_id {}",
inscription_id
));
let tx = TransactionIdentifier::new(comps[0]);
let output_index = comps[1].to_string().parse::<usize>().unwrap_or_else(|_| {
panic!(
"fatal: unable to extract output_index from inscription_id {}",
inscription_id
)
});
(tx, output_index)
}
pub fn parse_outpoint_to_watch(outpoint_to_watch: &str) -> (TransactionIdentifier, usize) {
let comps: Vec<&str> = outpoint_to_watch.split(":").collect();
let tx = TransactionIdentifier::new(&comps[0]);
let output_index = comps[1].to_string().parse::<usize>().expect(&format!(
"fatal: unable to extract output_index from outpoint {}",
outpoint_to_watch
));
let tx = TransactionIdentifier::new(comps[0]);
let output_index = comps[1].to_string().parse::<usize>().unwrap_or_else(|_| {
panic!(
"fatal: unable to extract output_index from outpoint {}",
outpoint_to_watch
)
});
(tx, output_index)
}

View File

@@ -20,6 +20,11 @@ pub struct PrometheusMonitoring {
pub registered_predicates: UInt64Gauge,
pub registry: Registry,
}
impl Default for PrometheusMonitoring {
fn default() -> Self {
Self::new()
}
}
impl PrometheusMonitoring {
pub fn new() -> PrometheusMonitoring {

View File

@@ -24,6 +24,12 @@ pub struct DbCache {
pub balance_deductions: HashMap<(String, String), DbBalanceChange>,
}
impl Default for DbCache {
fn default() -> Self {
Self::new()
}
}
impl DbCache {
pub fn new() -> Self {
DbCache {
@@ -38,34 +44,34 @@ impl DbCache {
/// Insert all data into the DB and clear cache.
pub async fn flush(&mut self, db_tx: &mut Transaction<'_>, ctx: &Context) {
try_info!(ctx, "Flushing DB cache...");
if self.runes.len() > 0 {
if !self.runes.is_empty() {
try_debug!(ctx, "Flushing {} runes", self.runes.len());
let _ = pg_insert_runes(&self.runes, db_tx, ctx).await;
self.runes.clear();
}
if self.supply_changes.len() > 0 {
if !self.supply_changes.is_empty() {
try_debug!(ctx, "Flushing {} supply changes", self.supply_changes.len());
let _ = pg_insert_supply_changes(
&self.supply_changes.values().cloned().collect(),
&self.supply_changes.values().cloned().collect::<Vec<_>>(),
db_tx,
ctx,
)
.await;
self.supply_changes.clear();
}
if self.ledger_entries.len() > 0 {
if !self.ledger_entries.is_empty() {
try_debug!(ctx, "Flushing {} ledger entries", self.ledger_entries.len());
let _ = pg_insert_ledger_entries(&self.ledger_entries, db_tx, ctx).await;
self.ledger_entries.clear();
}
if self.balance_increases.len() > 0 {
if !self.balance_increases.is_empty() {
try_debug!(
ctx,
"Flushing {} balance increases",
self.balance_increases.len()
);
let _ = pg_insert_balance_changes(
&self.balance_increases.values().cloned().collect(),
&self.balance_increases.values().cloned().collect::<Vec<_>>(),
true,
db_tx,
ctx,
@@ -73,14 +79,18 @@ impl DbCache {
.await;
self.balance_increases.clear();
}
if self.balance_deductions.len() > 0 {
if !self.balance_deductions.is_empty() {
try_debug!(
ctx,
"Flushing {} balance deductions",
self.balance_deductions.len()
);
let _ = pg_insert_balance_changes(
&self.balance_deductions.values().cloned().collect(),
&self
.balance_deductions
.values()
.cloned()
.collect::<Vec<_>>(),
false,
db_tx,
ctx,

View File

@@ -8,6 +8,10 @@ use lru::LruCache;
use ordinals::{Cenotaph, Edict, Etching, Rune, RuneId, Runestone};
use tokio_postgres::{Client, Transaction};
use super::{
db_cache::DbCache, input_rune_balance::InputRuneBalance, transaction_cache::TransactionCache,
transaction_location::TransactionLocation, utils::move_block_output_cache_to_output_cache,
};
use crate::{
db::{
cache::utils::input_rune_balances_from_tx_inputs,
@@ -21,11 +25,6 @@ use crate::{
try_debug, try_info, try_warn,
};
use super::{
db_cache::DbCache, input_rune_balance::InputRuneBalance, transaction_cache::TransactionCache,
transaction_location::TransactionLocation, utils::move_block_output_cache_to_output_cache,
};
/// Holds rune data across multiple blocks for faster computations. Processes rune events as they happen during transactions and
/// generates database rows for later insertion.
pub struct IndexCache {
@@ -104,7 +103,7 @@ impl IndexCache {
for (rune_id, balances) in input_runes.iter() {
try_debug!(ctx, "INPUT {rune_id} {balances:?} {location}");
}
if input_runes.len() > 0 {
if !input_runes.is_empty() {
try_debug!(
ctx,
"First output: {first_eligible_output:?}, total_outputs: {total_outputs}"
@@ -219,13 +218,13 @@ impl IndexCache {
.unwrap_or(0);
if let Some(ledger_entry) = self
.tx_cache
.apply_mint(&rune_id, total_mints, &db_rune, ctx)
.apply_mint(rune_id, total_mints, &db_rune, ctx)
{
self.add_ledger_entries_to_db_cache(&vec![ledger_entry.clone()]);
if let Some(total) = self.rune_total_mints_cache.get_mut(rune_id) {
*total += 1;
} else {
self.rune_total_mints_cache.put(rune_id.clone(), 1);
self.rune_total_mints_cache.put(*rune_id, 1);
}
}
}
@@ -251,13 +250,13 @@ impl IndexCache {
.unwrap_or(0);
if let Some(ledger_entry) =
self.tx_cache
.apply_cenotaph_mint(&rune_id, total_mints, &db_rune, ctx)
.apply_cenotaph_mint(rune_id, total_mints, &db_rune, ctx)
{
self.add_ledger_entries_to_db_cache(&vec![ledger_entry]);
if let Some(total) = self.rune_total_mints_cache.get_mut(rune_id) {
*total += 1;
} else {
self.rune_total_mints_cache.put(rune_id.clone(), 1);
self.rune_total_mints_cache.put(*rune_id, 1);
}
}
}
@@ -295,15 +294,13 @@ impl IndexCache {
if rune_id.block == 0 && rune_id.tx == 0 {
return self.tx_cache.etching.clone();
}
if let Some(cached_rune) = self.rune_cache.get(&rune_id) {
if let Some(cached_rune) = self.rune_cache.get(rune_id) {
return Some(cached_rune.clone());
}
// Cache miss, look in DB.
self.db_cache.flush(db_tx, ctx).await;
let Some(db_rune) = pg_get_rune_by_id(rune_id, db_tx, ctx).await else {
return None;
};
self.rune_cache.put(rune_id.clone(), db_rune.clone());
let db_rune = pg_get_rune_by_id(rune_id, db_tx, ctx).await?;
self.rune_cache.put(*rune_id, db_rune.clone());
return Some(db_rune);
}
@@ -314,22 +311,18 @@ impl IndexCache {
ctx: &Context,
) -> Option<u128> {
let real_rune_id = if rune_id.block == 0 && rune_id.tx == 0 {
let Some(etching) = self.tx_cache.etching.as_ref() else {
return None;
};
let etching = self.tx_cache.etching.as_ref()?;
RuneId::from_str(etching.id.as_str()).unwrap()
} else {
rune_id.clone()
*rune_id
};
if let Some(total) = self.rune_total_mints_cache.get(&real_rune_id) {
return Some(*total);
}
// Cache miss, look in DB.
self.db_cache.flush(db_tx, ctx).await;
let Some(total) = pg_get_rune_total_mints(rune_id, db_tx, ctx).await else {
return None;
};
self.rune_total_mints_cache.put(rune_id.clone(), total);
let total = pg_get_rune_total_mints(rune_id, db_tx, ctx).await?;
self.rune_total_mints_cache.put(*rune_id, total);
return Some(total);
}
@@ -348,7 +341,7 @@ impl IndexCache {
})
.or_insert(DbSupplyChange::from_operation(
entry.rune_id.clone(),
entry.block_height.clone(),
entry.block_height,
));
}
DbLedgerOperation::Mint => {
@@ -362,7 +355,7 @@ impl IndexCache {
})
.or_insert(DbSupplyChange::from_mint(
entry.rune_id.clone(),
entry.block_height.clone(),
entry.block_height,
entry.amount.unwrap(),
));
}
@@ -377,7 +370,7 @@ impl IndexCache {
})
.or_insert(DbSupplyChange::from_burn(
entry.rune_id.clone(),
entry.block_height.clone(),
entry.block_height,
entry.amount.unwrap(),
));
}
@@ -388,7 +381,7 @@ impl IndexCache {
.and_modify(|i| i.total_operations += 1)
.or_insert(DbSupplyChange::from_operation(
entry.rune_id.clone(),
entry.block_height.clone(),
entry.block_height,
));
if let Some(address) = entry.address.clone() {
self.db_cache
@@ -397,7 +390,7 @@ impl IndexCache {
.and_modify(|i| i.balance += entry.amount.unwrap())
.or_insert(DbBalanceChange::from_operation(
entry.rune_id.clone(),
entry.block_height.clone(),
entry.block_height,
address,
entry.amount.unwrap(),
));
@@ -410,7 +403,7 @@ impl IndexCache {
.and_modify(|i| i.total_operations += 1)
.or_insert(DbSupplyChange::from_operation(
entry.rune_id.clone(),
entry.block_height.clone(),
entry.block_height,
));
if let Some(address) = entry.address.clone() {
self.db_cache
@@ -419,7 +412,7 @@ impl IndexCache {
.and_modify(|i| i.balance += entry.amount.unwrap())
.or_insert(DbBalanceChange::from_operation(
entry.rune_id.clone(),
entry.block_height.clone(),
entry.block_height,
address,
entry.amount.unwrap(),
));

View File

@@ -1,11 +1,16 @@
use bitcoin::ScriptBuf;
use chainhook_sdk::utils::Context;
use ordinals::{Cenotaph, Edict, Etching, Rune, RuneId};
use std::{
collections::{HashMap, VecDeque},
vec,
};
use bitcoin::ScriptBuf;
use chainhook_sdk::utils::Context;
use ordinals::{Cenotaph, Edict, Etching, Rune, RuneId};
use super::{
input_rune_balance::InputRuneBalance, transaction_location::TransactionLocation,
utils::move_rune_balance_to_output,
};
use crate::{
db::{
cache::utils::{is_rune_mintable, new_sequential_ledger_entry},
@@ -16,11 +21,6 @@ use crate::{
try_debug, try_info, try_warn,
};
use super::{
input_rune_balance::InputRuneBalance, transaction_location::TransactionLocation,
utils::move_rune_balance_to_output,
};
/// Holds cached data relevant to a single transaction during indexing.
pub struct TransactionCache {
pub location: TransactionLocation,
@@ -209,7 +209,7 @@ impl TransactionCache {
Some(new_sequential_ledger_entry(
&self.location,
Some(terms_amount.0),
rune_id.clone(),
*rune_id,
None,
None,
None,
@@ -241,7 +241,7 @@ impl TransactionCache {
Some(new_sequential_ledger_entry(
&self.location,
Some(terms_amount.0),
rune_id.clone(),
*rune_id,
None,
None,
None,
@@ -283,7 +283,7 @@ impl TransactionCache {
.unwrap_or(0);
// Perform movements.
let mut results = vec![];
if self.eligible_outputs.len() == 0 {
if self.eligible_outputs.is_empty() {
// No eligible outputs means burn.
try_info!(
ctx,
@@ -391,12 +391,12 @@ impl TransactionCache {
}
fn add_input_runes(&mut self, rune_id: &RuneId, entry: InputRuneBalance) {
if let Some(balance) = self.input_runes.get_mut(&rune_id) {
if let Some(balance) = self.input_runes.get_mut(rune_id) {
balance.push_back(entry);
} else {
let mut vec = VecDeque::new();
vec.push_back(entry);
self.input_runes.insert(rune_id.clone(), vec);
self.input_runes.insert(*rune_id, vec);
}
}
}
@@ -410,6 +410,7 @@ mod test {
use maplit::hashmap;
use ordinals::{Edict, Etching, Rune, Terms};
use super::TransactionCache;
use crate::db::{
cache::{
input_rune_balance::InputRuneBalance, transaction_location::TransactionLocation,
@@ -418,8 +419,6 @@ mod test {
models::{db_ledger_operation::DbLedgerOperation, db_rune::DbRune},
};
use super::TransactionCache;
#[test]
fn etches_rune() {
let location = TransactionLocation::dummy();

View File

@@ -7,6 +7,7 @@ use lru::LruCache;
use ordinals::RuneId;
use tokio_postgres::Transaction;
use super::{input_rune_balance::InputRuneBalance, transaction_location::TransactionLocation};
use crate::{
db::{
models::{
@@ -17,8 +18,6 @@ use crate::{
try_info, try_warn,
};
use super::{input_rune_balance::InputRuneBalance, transaction_location::TransactionLocation};
/// Takes all transaction inputs and transforms them into rune balances to be allocated for operations. Looks inside an output LRU
/// cache and the DB when there are cache misses.
///
@@ -55,7 +54,7 @@ pub async fn input_rune_balances_from_tx_inputs(
}
// Look for cache misses in database. We don't need to `flush` the DB cache here because we've already looked in the current
// block's output cache.
if cache_misses.len() > 0 {
if !cache_misses.is_empty() {
let output_balances = pg_get_input_rune_balances(cache_misses, db_tx, ctx).await;
indexed_input_runes.extend(output_balances);
}
@@ -87,9 +86,9 @@ pub fn move_block_output_cache_to_output_cache(
output_cache: &mut LruCache<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
) {
for (k, block_output_map) in block_output_cache.iter() {
if let Some(v) = output_cache.get_mut(&k) {
if let Some(v) = output_cache.get_mut(k) {
for (rune_id, balances) in block_output_map.iter() {
if let Some(rune_balance) = v.get_mut(&rune_id) {
if let Some(rune_balance) = v.get_mut(rune_id) {
rune_balance.extend(balances.clone());
} else {
v.insert(*rune_id, balances.clone());
@@ -688,7 +687,10 @@ mod test {
use std::num::NonZeroUsize;
use chainhook_sdk::utils::Context;
use chainhook_types::{bitcoin::{OutPoint, TxIn}, TransactionIdentifier};
use chainhook_types::{
bitcoin::{OutPoint, TxIn},
TransactionIdentifier,
};
use lru::LruCache;
use maplit::hashmap;
use ordinals::RuneId;

View File

@@ -1,24 +1,20 @@
use std::collections::HashMap;
use bitcoin::absolute::LockTime;
use bitcoin::transaction::TxOut;
use bitcoin::transaction::Version;
use bitcoin::Amount;
use bitcoin::Network;
use bitcoin::ScriptBuf;
use bitcoin::Transaction;
use bitcoin::{
absolute::LockTime,
transaction::{TxOut, Version},
Amount, Network, ScriptBuf, Transaction,
};
use chainhook_sdk::utils::Context;
use chainhook_types::BitcoinBlockData;
use chainhook_types::BitcoinTransactionData;
use ordinals::Artifact;
use ordinals::Runestone;
use chainhook_types::{BitcoinBlockData, BitcoinTransactionData};
use ordinals::{Artifact, Runestone};
use tokio_postgres::Client;
use crate::db::cache::transaction_location::TransactionLocation;
use crate::db::pg_roll_back_block;
use crate::try_info;
use super::cache::index_cache::IndexCache;
use crate::{
db::{cache::transaction_location::TransactionLocation, pg_roll_back_block},
try_info,
};
pub fn get_rune_genesis_block_height(network: Network) -> u64 {
match network {

View File

@@ -60,7 +60,7 @@ pub async fn pg_connect(config: &Config, run_migrations: bool, ctx: &Context) ->
Ok((client, connection)) => {
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Postgres connection error: {}", e.to_string());
eprintln!("Postgres connection error: {}", e);
process::exit(1);
}
});
@@ -68,7 +68,7 @@ pub async fn pg_connect(config: &Config, run_migrations: bool, ctx: &Context) ->
break;
}
Err(e) => {
try_error!(ctx, "Error connecting to postgres: {}", e.to_string());
try_error!(ctx, "Error connecting to postgres: {}", e);
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
@@ -80,7 +80,7 @@ pub async fn pg_connect(config: &Config, run_migrations: bool, ctx: &Context) ->
}
pub async fn pg_insert_runes(
rows: &Vec<DbRune>,
rows: &[DbRune],
db_tx: &mut Transaction<'_>,
ctx: &Context,
) -> Result<bool, Error> {
@@ -89,7 +89,7 @@ pub async fn pg_insert_runes(
let mut arg_str = String::new();
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
for row in chunk.iter() {
arg_str.push_str("(");
arg_str.push('(');
for i in 0..19 {
arg_str.push_str(format!("${},", arg_num + i).as_str());
}
@@ -139,7 +139,7 @@ pub async fn pg_insert_runes(
}
pub async fn pg_insert_supply_changes(
rows: &Vec<DbSupplyChange>,
rows: &[DbSupplyChange],
db_tx: &mut Transaction<'_>,
ctx: &Context,
) -> Result<bool, Error> {
@@ -216,7 +216,7 @@ pub async fn pg_insert_supply_changes(
}
pub async fn pg_insert_balance_changes(
rows: &Vec<DbBalanceChange>,
rows: &[DbBalanceChange],
increase: bool,
db_tx: &mut Transaction<'_>,
ctx: &Context,
@@ -281,7 +281,7 @@ pub async fn pg_insert_balance_changes(
}
pub async fn pg_insert_ledger_entries(
rows: &Vec<DbLedgerEntry>,
rows: &[DbLedgerEntry],
db_tx: &mut Transaction<'_>,
ctx: &Context,
) -> Result<bool, Error> {
@@ -290,7 +290,7 @@ pub async fn pg_insert_ledger_entries(
let mut arg_str = String::new();
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
for row in chunk.iter() {
arg_str.push_str("(");
arg_str.push('(');
for i in 0..12 {
arg_str.push_str(format!("${},", arg_num + i).as_str());
}
@@ -380,11 +380,7 @@ pub async fn pg_get_block_height(client: &mut Client, _ctx: &Context) -> Option<
.await
.expect("error getting max block height")?;
let max: Option<PgNumericU64> = row.get("max");
if let Some(max) = max {
Some(max.0)
} else {
None
}
max.map(|max| max.0)
}
pub async fn pg_get_rune_by_id(
@@ -402,9 +398,7 @@ pub async fn pg_get_rune_by_id(
process::exit(1);
}
};
let Some(row) = row else {
return None;
};
let row = row?;
Some(DbRune::from_pg_row(&row))
}
@@ -430,9 +424,7 @@ pub async fn pg_get_rune_total_mints(
process::exit(1);
}
};
let Some(row) = row else {
return None;
};
let row = row?;
let minted: PgNumericU128 = row.get("total_mints");
Some(minted.0)
}

View File

@@ -43,10 +43,10 @@ impl DbLedgerEntry {
tx_index: PgBigIntU32(tx_index),
event_index: PgBigIntU32(event_index),
tx_id: tx_id[2..].to_string(),
output: output.map(|i| PgBigIntU32(i)),
output: output.map(PgBigIntU32),
address: address.cloned(),
receiver_address: receiver_address.cloned(),
amount: amount.map(|i| PgNumericU128(i)),
amount: amount.map(PgNumericU128),
operation,
timestamp: PgBigIntU32(timestamp),
}

View File

@@ -48,12 +48,12 @@ impl DbRune {
let mut terms_offset_start = None;
let mut terms_offset_end = None;
if let Some(terms) = etching.terms {
terms_amount = terms.amount.map(|i| PgNumericU128(i));
terms_cap = terms.cap.map(|i| PgNumericU128(i));
terms_height_start = terms.height.0.map(|i| PgNumericU64(i));
terms_height_end = terms.height.1.map(|i| PgNumericU64(i));
terms_offset_start = terms.offset.0.map(|i| PgNumericU64(i));
terms_offset_end = terms.offset.1.map(|i| PgNumericU64(i));
terms_amount = terms.amount.map(PgNumericU128);
terms_cap = terms.cap.map(PgNumericU128);
terms_height_start = terms.height.0.map(PgNumericU64);
terms_height_end = terms.height.1.map(PgNumericU64);
terms_offset_start = terms.offset.0.map(PgNumericU64);
terms_offset_end = terms.offset.1.map(PgNumericU64);
}
DbRune {
id: format!("{}:{}", location.block_height, location.tx_index),
@@ -66,11 +66,11 @@ impl DbRune {
tx_id: location.tx_id[2..].to_string(),
divisibility: etching
.divisibility
.map(|i| PgSmallIntU8(i))
.map(PgSmallIntU8)
.unwrap_or(PgSmallIntU8(0)),
premine: etching
.premine
.map(|i| PgNumericU128(i))
.map(PgNumericU128)
.unwrap_or(PgNumericU128(0)),
symbol: etching
.symbol
@@ -206,9 +206,8 @@ mod test {
use ordinals::{Etching, SpacedRune, Terms};
use crate::db::cache::transaction_location::TransactionLocation;
use super::DbRune;
use crate::db::cache::transaction_location::TransactionLocation;
#[test]
fn test_from_etching() {

View File

@@ -1,16 +1,22 @@
use crate::db::cache::index_cache::IndexCache;
use crate::db::index::{index_block, roll_back_block};
use crate::try_info;
use chainhook_sdk::indexer::bitcoin::{
build_http_client, download_and_parse_block_with_retry, retrieve_block_hash_with_retry,
standardize_bitcoin_block,
use chainhook_sdk::{
indexer::bitcoin::{
build_http_client, download_and_parse_block_with_retry, retrieve_block_hash_with_retry,
standardize_bitcoin_block,
},
utils::{bitcoind::bitcoind_get_block_height, BlockHeights, Context},
};
use chainhook_sdk::utils::bitcoind::bitcoind_get_block_height;
use chainhook_sdk::utils::{BlockHeights, Context};
use chainhook_types::BitcoinNetwork;
use config::Config;
use tokio_postgres::Client;
use crate::{
db::{
cache::index_cache::IndexCache,
index::{index_block, roll_back_block},
},
try_info,
};
pub async fn drop_blocks(start_block: u64, end_block: u64, pg_client: &mut Client, ctx: &Context) {
for block in start_block..=end_block {
roll_back_block(pg_client, block, ctx).await;
@@ -25,8 +31,8 @@ pub async fn scan_blocks(
ctx: &Context,
) -> Result<(), String> {
let block_heights_to_scan_res = BlockHeights::Blocks(blocks).get_sorted_entries();
let mut block_heights_to_scan =
block_heights_to_scan_res.map_err(|_e| format!("Block start / end block spec invalid"))?;
let mut block_heights_to_scan = block_heights_to_scan_res
.map_err(|_e| "Block start / end block spec invalid".to_string())?;
try_info!(
ctx,

View File

@@ -1,61 +1,68 @@
use std::sync::mpsc::channel;
use std::{cmp::Ordering, sync::mpsc::channel};
use crate::db::cache::index_cache::IndexCache;
use crate::db::index::{get_rune_genesis_block_height, index_block, roll_back_block};
use crate::db::{pg_connect, pg_get_block_height};
use crate::scan::bitcoin::scan_blocks;
use crate::{try_error, try_info};
use chainhook_sdk::observer::BitcoinBlockDataCached;
use chainhook_sdk::utils::bitcoind::bitcoind_get_block_height;
use chainhook_sdk::{
observer::{start_event_observer, ObserverEvent, ObserverSidecar},
utils::Context,
observer::{start_event_observer, BitcoinBlockDataCached, ObserverEvent, ObserverSidecar},
utils::{bitcoind::bitcoind_get_block_height, Context},
};
use chainhook_types::BlockIdentifier;
use config::Config;
use crossbeam_channel::select;
use crate::{
db::{
cache::index_cache::IndexCache,
index::{get_rune_genesis_block_height, index_block, roll_back_block},
pg_connect, pg_get_block_height,
},
scan::bitcoin::scan_blocks,
try_error, try_info,
};
pub async fn get_index_chain_tip(config: &Config, ctx: &Context) -> u64 {
let mut pg_client = pg_connect(&config, true, ctx).await;
let mut pg_client = pg_connect(config, true, ctx).await;
pg_get_block_height(&mut pg_client, ctx)
.await
.unwrap_or(get_rune_genesis_block_height(config.bitcoind.network) - 1)
}
pub async fn catch_up_to_bitcoin_chain_tip(config: &Config, ctx: &Context) -> Result<(), String> {
let mut pg_client = pg_connect(&config, true, ctx).await;
let mut pg_client = pg_connect(config, true, ctx).await;
let mut index_cache = IndexCache::new(config, &mut pg_client, ctx).await;
loop {
let chain_tip = pg_get_block_height(&mut pg_client, ctx)
.await
.unwrap_or(get_rune_genesis_block_height(config.bitcoind.network) - 1);
let bitcoind_chain_tip = bitcoind_get_block_height(&config.bitcoind, ctx);
if bitcoind_chain_tip < chain_tip {
try_info!(
ctx,
"Waiting for bitcoind to reach height {}, currently at {}",
chain_tip,
bitcoind_chain_tip
);
std::thread::sleep(std::time::Duration::from_secs(10));
} else if bitcoind_chain_tip > chain_tip {
try_info!(
ctx,
"Block height is behind bitcoind, scanning block range {} to {}",
chain_tip + 1,
bitcoind_chain_tip
);
scan_blocks(
((chain_tip + 1)..=bitcoind_chain_tip).collect(),
config,
&mut pg_client,
&mut index_cache,
ctx,
)
.await?;
} else {
try_info!(ctx, "Caught up to bitcoind chain tip at {}", chain_tip);
break;
match bitcoind_chain_tip.cmp(&chain_tip) {
Ordering::Less => {
try_info!(
ctx,
"Waiting for bitcoind to reach height {}, currently at {}",
chain_tip,
bitcoind_chain_tip
);
std::thread::sleep(std::time::Duration::from_secs(10));
}
Ordering::Greater => {
try_info!(
ctx,
"Block height is behind bitcoind, scanning block range {} to {}",
chain_tip + 1,
bitcoind_chain_tip
);
scan_blocks(
((chain_tip + 1)..=bitcoind_chain_tip).collect(),
config,
&mut pg_client,
&mut index_cache,
ctx,
)
.await?;
}
Ordering::Equal => {
try_info!(ctx, "Caught up to bitcoind chain tip at {}", chain_tip);
break;
}
}
}
Ok(())
@@ -95,12 +102,9 @@ pub async fn start_service(config: &Config, ctx: &Context) -> Result<(), String>
break;
}
};
match event {
ObserverEvent::Terminate => {
try_info!(ctx, "Received termination event from Chainhook SDK");
break;
}
_ => {}
if let ObserverEvent::Terminate = event {
try_info!(ctx, "Received termination event from Chainhook SDK");
break;
}
}
Ok(())
@@ -156,13 +160,13 @@ pub async fn set_up_observer_sidecar_runloop(
pub async fn chainhook_sidecar_mutate_blocks(
index_cache: &mut IndexCache,
blocks_to_mutate: &mut Vec<BitcoinBlockDataCached>,
block_ids_to_rollback: &Vec<BlockIdentifier>,
blocks_to_mutate: &mut [BitcoinBlockDataCached],
block_ids_to_rollback: &[BlockIdentifier],
config: &Config,
ctx: &Context,
) {
try_info!(ctx, "Received mutate blocks message from Chainhook SDK");
let mut pg_client = pg_connect(&config, false, &ctx).await;
let mut pg_client = pg_connect(config, false, ctx).await;
for block_id in block_ids_to_rollback.iter() {
roll_back_block(&mut pg_client, block_id.index, ctx).await;
}

View File

@@ -1,2 +1,3 @@
[toolchain]
channel = "1.85.0"
channel = "1.85"
components = ["rustfmt", "clippy"]