fix: better handling of database locks (#200)

This commit is contained in:
Ludo Galabru
2023-11-23 07:15:25 -05:00
committed by GitHub
parent 977a30eb8d
commit f820169aa0
17 changed files with 728 additions and 992 deletions

View File

@@ -1,518 +0,0 @@
name: ordhook-sdk-js
env:
DEBUG: napi:*
APP_NAME: ordhook-sdk-js
COMPONENT_PATH: components/ordhook-sdk-js
MACOSX_DEPLOYMENT_TARGET: '13.0'
permissions:
contents: write
id-token: write
'on':
push:
branches:
- feat/ordhook-sdk-js
tags-ignore:
- '**'
paths-ignore:
- '**/*.md'
- LICENSE
- '**/*.gitignore'
- .editorconfig
- docs/**
pull_request: null
jobs:
build:
strategy:
fail-fast: false
matrix:
settings:
- host: macos-latest
target: x86_64-apple-darwin
build: |
yarn build
strip -x *.node
# - host: windows-latest
# build: yarn build
# target: x86_64-pc-windows-msvc
# - host: windows-latest
# build: |
# rustup target add i686-pc-windows-msvc
# yarn build --target i686-pc-windows-msvc
# target: i686-pc-windows-msvc
- host: ubuntu-latest
target: x86_64-unknown-linux-gnu
docker: ghcr.io/napi-rs/napi-rs/nodejs-rust:lts-debian
build: |-
sudo apt-get install libssl-dev &&
set -e &&
yarn --cwd components/ordhook-sdk-js build --target x86_64-unknown-linux-gnu &&
strip -x components/ordhook-sdk-js/*.node
# - host: ubuntu-latest
# target: x86_64-unknown-linux-musl
# docker: ghcr.io/napi-rs/napi-rs/nodejs-rust:lts-alpine
# build: set -e && yarn --cwd components/ordhook-sdk-js build && strip components/ordhook-sdk-js/*.node
- host: macos-latest
target: aarch64-apple-darwin
build: |
rustup target add aarch64-apple-darwin
yarn build --target aarch64-apple-darwin
strip -x *.node
# - host: ubuntu-latest
# target: aarch64-unknown-linux-gnu
# docker: ghcr.io/napi-rs/napi-rs/nodejs-rust:lts-debian-aarch64
# build: |-
# sudo apt-get install libssl-dev &&
# set -e &&
# rustup target add aarch64-unknown-linux-gnu &&
# yarn --cwd components/ordhook-sdk-js build --target aarch64-unknown-linux-gnu &&
# aarch64-unknown-linux-gnu-strip components/ordhook-sdk-js/*.node
# - host: ubuntu-latest
# target: armv7-unknown-linux-gnueabihf
# setup: |
# sudo apt-get update
# sudo apt-get install gcc-arm-linux-gnueabihf -y
# build: |
# rustup target add armv7-unknown-linux-gnueabihf
# yarn --cwd components/ordhook-sdk-js build --target armv7-unknown-linux-gnueabihf
# arm-linux-gnueabihf-strip components/ordhook-sdk-js/*.node
# - host: ubuntu-latest
# target: aarch64-unknown-linux-musl
# docker: ghcr.io/napi-rs/napi-rs/nodejs-rust:lts-alpine
# build: |-
# set -e &&
# rustup target add aarch64-unknown-linux-musl &&
# yarn --cwd components/ordhook-sdk-js build --target aarch64-unknown-linux-musl &&
# /aarch64-linux-musl-cross/bin/aarch64-linux-musl-strip components/ordhook-sdk-js/*.node
# - host: windows-latest
# target: aarch64-pc-windows-msvc
# build: |-
# rustup target add aarch64-pc-windows-msvc
# yarn build --target aarch64-pc-windows-msvc
name: stable - ${{ matrix.settings.target }} - node@18
runs-on: ${{ matrix.settings.host }}
defaults:
run:
working-directory: ./components/ordhook-sdk-js
steps:
- uses: actions/checkout@v3
- name: Setup node
uses: actions/setup-node@v3
if: ${{ !matrix.settings.docker }}
with:
node-version: 18
check-latest: true
cache: yarn
cache-dependency-path: ./components/ordhook-sdk-js/yarn.lock
- name: Install
uses: dtolnay/rust-toolchain@stable
if: ${{ !matrix.settings.docker }}
with:
toolchain: stable
targets: ${{ matrix.settings.target }}
- name: Cache cargo
uses: actions/cache@v3
with:
path: |
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
.cargo-cache
target/
key: ${{ matrix.settings.target }}-cargo-${{ matrix.settings.host }}
# - uses: goto-bus-stop/setup-zig@v2
# if: ${{ matrix.settings.target == 'armv7-unknown-linux-gnueabihf' }}
# with:
# version: 0.10.1
- name: Setup toolchain
run: ${{ matrix.settings.setup }}
if: ${{ matrix.settings.setup }}
shell: bash
# - name: Setup node x86
# if: matrix.settings.target == 'i686-pc-windows-msvc'
# run: yarn config set supportedArchitectures.cpu "ia32"
# shell: bash
- name: Install dependencies
run: yarn install
# - name: Setup node x86
# uses: actions/setup-node@v3
# if: matrix.settings.target == 'i686-pc-windows-msvc'
# with:
# node-version: 18
# check-latest: true
# cache: yarn
# cache-dependency-path: ./components/ordhook-sdk-js/yarn.lock
# architecture: x86
- name: Build in docker
uses: addnab/docker-run-action@v3
if: ${{ matrix.settings.docker }}
with:
image: ${{ matrix.settings.docker }}
options: '--user 0:0 -v ${{ github.workspace }}/.cargo-cache/git/db:/usr/local/cargo/git/db -v ${{ github.workspace }}/.cargo/registry/cache:/usr/local/cargo/registry/cache -v ${{ github.workspace }}/.cargo/registry/index:/usr/local/cargo/registry/index -v ${{ github.workspace }}:/build -w /build'
run: ${{ matrix.settings.build }}
- name: Build
run: ${{ matrix.settings.build }}
if: ${{ !matrix.settings.docker }}
shell: bash
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: bindings-${{ matrix.settings.target }}
path: ${{ env.COMPONENT_PATH }}/${{ env.APP_NAME }}.*.node
if-no-files-found: error
# build-freebsd:
# runs-on: macos-12
# name: Build FreeBSD
# defaults:
# run:
# working-directory: ./components/ordhook-sdk-js
# steps:
# - uses: actions/checkout@v3
# - name: Build
# id: build
# uses: vmactions/freebsd-vm@v0
# env:
# DEBUG: napi:*
# RUSTUP_HOME: /usr/local/rustup
# CARGO_HOME: /usr/local/cargo
# RUSTUP_IO_THREADS: 1
# with:
# envs: DEBUG RUSTUP_HOME CARGO_HOME RUSTUP_IO_THREADS
# usesh: true
# mem: 3000
# prepare: |
# pkg install -y -f curl node libnghttp2 npm yarn
# curl https://sh.rustup.rs -sSf --output rustup.sh
# sh rustup.sh -y --profile minimal --default-toolchain beta
# export PATH="/usr/local/cargo/bin:$PATH"
# echo "~~~~ rustc --version ~~~~"
# rustc --version
# echo "~~~~ node -v ~~~~"
# node -v
# echo "~~~~ yarn --version ~~~~"
# yarn --version
# run: |
# export PATH="/usr/local/cargo/bin:$PATH"
# pwd
# ls -lah
# whoami
# env
# freebsd-version
# cd ./components/ordhook-sdk-js
# yarn install
# yarn build
# strip -x *.node
# yarn test
# rm -rf node_modules
# rm -rf target
# rm -rf .yarn/cache
# - name: Upload artifact
# uses: actions/upload-artifact@v3
# with:
# name: bindings-freebsd
# path: ${{ env.COMPONENT_PATH }}/${{ env.APP_NAME }}.*.node
# if-no-files-found: error
test-macOS-binding:
name: Test bindings on ${{ matrix.settings.target }} - node@${{ matrix.node }}
needs:
- build
strategy:
fail-fast: false
matrix:
settings:
- host: macos-latest
target: x86_64-apple-darwin
# - host: windows-latest
# target: x86_64-pc-windows-msvc
node:
- '14'
- '16'
- '18'
runs-on: ${{ matrix.settings.host }}
steps:
- uses: actions/checkout@v3
- name: Setup node
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node }}
check-latest: true
cache: yarn
cache-dependency-path: ./components/ordhook-sdk-js/yarn.lock
- name: Install dependencies
run: yarn install
- name: Download artifacts
uses: actions/download-artifact@v3
with:
name: bindings-${{ matrix.settings.target }}
path: .
- name: List packages
run: ls -R .
shell: bash
test-linux-x64-gnu-binding:
name: Test bindings on Linux-x64-gnu - node@${{ matrix.node }}
needs:
- build
strategy:
fail-fast: false
matrix:
node:
- '14'
- '16'
- '18'
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./components/ordhook-sdk-js
steps:
- uses: actions/checkout@v3
- name: Setup node
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node }}
check-latest: true
cache: yarn
cache-dependency-path: ./components/ordhook-sdk-js/yarn.lock
- name: Install dependencies
run: yarn install
- name: Download artifacts
uses: actions/download-artifact@v3
with:
name: bindings-x86_64-unknown-linux-gnu
path: .
- name: List packages
run: ls -R .
shell: bash
# - name: Test bindings
# run: docker run --rm -v $(pwd):/build -w /build node:${{ matrix.node }}-slim yarn test
# test-linux-x64-musl-binding:
# name: Test bindings on x86_64-unknown-linux-musl - node@${{ matrix.node }}
# needs:
# - build
# strategy:
# fail-fast: false
# matrix:
# node:
# - '14'
# - '16'
# - '18'
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v3
# - name: Setup node
# uses: actions/setup-node@v3
# with:
# node-version: ${{ matrix.node }}
# check-latest: true
# cache: yarn
# cache-dependency-path: ./components/ordhook-sdk-js/yarn.lock
# - name: Install dependencies
# run: |
# yarn config set supportedArchitectures.libc "musl"
# yarn install
# - name: Download artifacts
# uses: actions/download-artifact@v3
# with:
# name: bindings-x86_64-unknown-linux-musl
# path: .
# - name: List packages
# run: ls -R .
# shell: bash
# - name: Test bindings
# run: docker run --rm -v $(pwd):/build -w /build node:${{ matrix.node }}-alpine yarn test
# test-linux-aarch64-gnu-binding:
# name: Test bindings on aarch64-unknown-linux-gnu - node@${{ matrix.node }}
# needs:
# - build
# strategy:
# fail-fast: false
# matrix:
# node:
# - '14'
# - '16'
# - '18'
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v3
# - name: Download artifacts
# uses: actions/download-artifact@v3
# with:
# name: bindings-aarch64-unknown-linux-gnu
# path: .
# - name: List packages
# run: ls -R .
# shell: bash
# - name: Install dependencies
# run: |
# yarn config set supportedArchitectures.cpu "arm64"
# yarn config set supportedArchitectures.libc "glibc"
# yarn install
# - name: Set up QEMU
# uses: docker/setup-qemu-action@v2
# with:
# platforms: arm64
# - run: docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
# - name: Setup and run tests
# uses: addnab/docker-run-action@v3
# with:
# image: node:${{ matrix.node }}-slim
# options: '--platform linux/arm64 -v ${{ github.workspace }}:/build -w /build'
# run: |
# set -e
# yarn test
# ls -la
# test-linux-aarch64-musl-binding:
# name: Test bindings on aarch64-unknown-linux-musl - node@${{ matrix.node }}
# needs:
# - build
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v3
# - name: Download artifacts
# uses: actions/download-artifact@v3
# with:
# name: bindings-aarch64-unknown-linux-musl
# path: .
# - name: List packages
# run: ls -R .
# shell: bash
# - name: Install dependencies
# run: |
# yarn config set supportedArchitectures.cpu "arm64"
# yarn config set supportedArchitectures.libc "musl"
# yarn install
# - name: Set up QEMU
# uses: docker/setup-qemu-action@v2
# with:
# platforms: arm64
# - run: docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
# - name: Setup and run tests
# uses: addnab/docker-run-action@v3
# with:
# image: node:lts-alpine
# options: '--platform linux/arm64 -v ${{ github.workspace }}:/build -w /build'
# run: |
# set -e
# yarn test
# test-linux-arm-gnueabihf-binding:
# name: Test bindings on armv7-unknown-linux-gnueabihf - node@${{ matrix.node }}
# needs:
# - build
# strategy:
# fail-fast: false
# matrix:
# node:
# - '14'
# - '16'
# - '18'
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v3
# - name: Download artifacts
# uses: actions/download-artifact@v3
# with:
# name: bindings-armv7-unknown-linux-gnueabihf
# path: .
# - name: List packages
# run: ls -R .
# shell: bash
# - name: Install dependencies
# run: |
# yarn config set supportedArchitectures.cpu "arm"
# yarn install
# - name: Set up QEMU
# uses: docker/setup-qemu-action@v2
# with:
# platforms: arm
# - run: docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
# - name: Setup and run tests
# uses: addnab/docker-run-action@v3
# with:
# image: node:${{ matrix.node }}-bullseye-slim
# options: '--platform linux/arm/v7 -v ${{ github.workspace }}:/build -w /build'
# run: |
# set -e
# yarn test
# ls -la
universal-macOS:
name: Build universal macOS binary
needs:
- build
runs-on: macos-latest
steps:
- uses: actions/checkout@v3
- name: Setup node
uses: actions/setup-node@v3
with:
node-version: 18
check-latest: true
cache: yarn
cache-dependency-path: ./components/ordhook-sdk-js/yarn.lock
- name: Install dependencies
run: yarn --cwd components/ordhook-sdk-js install
- name: Download macOS x64 artifact
uses: actions/download-artifact@v3
with:
name: bindings-x86_64-apple-darwin
path: components/ordhook-sdk-js/artifacts
- name: Download macOS arm64 artifact
uses: actions/download-artifact@v3
with:
name: bindings-aarch64-apple-darwin
path: components/ordhook-sdk-js/artifacts
- name: Combine binaries
run: yarn --cwd components/ordhook-sdk-js universal
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: bindings-universal-apple-darwin
path: ${{ env.COMPONENT_PATH }}/${{ env.APP_NAME }}.*.node
if-no-files-found: error
publish:
name: Publish
runs-on: ubuntu-latest
needs:
# - build-freebsd
- test-macOS-binding
- test-linux-x64-gnu-binding
# - test-linux-x64-musl-binding
# - test-linux-aarch64-gnu-binding
# - test-linux-aarch64-musl-binding
# - test-linux-arm-gnueabihf-binding
- universal-macOS
steps:
- uses: actions/checkout@v3
- name: Setup node
uses: actions/setup-node@v3
with:
node-version: 18
check-latest: true
cache: yarn
cache-dependency-path: ./components/ordhook-sdk-js/yarn.lock
- name: Install dependencies
run: yarn --cwd components/ordhook-sdk-js install
- name: Download all artifacts
uses: actions/download-artifact@v3
with:
path: artifacts
- name: Move artifacts
run: yarn --cwd components/ordhook-sdk-js artifacts
- name: List packages
run: ls -R components/ordhook-sdk-js/./npm
shell: bash
- name: Publish
run: |
cd components/ordhook-sdk-js
npm config set provenance true
if git log -1 --pretty=%B | grep "^[0-9]\+\.[0-9]\+\.[0-9]\+$";
then
echo "//registry.npmjs.org/:_authToken=$NPM_TOKEN" >> ~/.npmrc
npm publish --access public
elif git log -1 --pretty=%B | grep "^[0-9]\+\.[0-9]\+\.[0-9]\+";
then
echo "//registry.npmjs.org/:_authToken=$NPM_TOKEN" >> ~/.npmrc
npm publish --tag next --access public
else
echo "Not a release, skipping publish"
fi
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
NPM_TOKEN: ${{ secrets.NPM_TOKEN }}

View File

@@ -9,24 +9,26 @@ use ordhook::chainhook_sdk::chainhooks::types::{
ChainhookFullSpecification, HookAction, OrdinalOperations,
};
use ordhook::chainhook_sdk::indexer::bitcoin::{
download_and_parse_block_with_retry, retrieve_block_hash_with_retry,
build_http_client, download_and_parse_block_with_retry, retrieve_block_hash_with_retry,
};
use ordhook::chainhook_sdk::observer::BitcoinConfig;
use ordhook::chainhook_sdk::types::BitcoinBlockData;
use ordhook::chainhook_sdk::types::{BitcoinBlockData, TransactionIdentifier};
use ordhook::chainhook_sdk::utils::BlockHeights;
use ordhook::chainhook_sdk::utils::Context;
use ordhook::config::Config;
use ordhook::core::new_traversals_lazy_cache;
use ordhook::core::pipeline::download_and_pipeline_blocks;
use ordhook::core::pipeline::processors::block_archiving::start_block_archiving_processor;
use ordhook::core::pipeline::processors::start_inscription_indexing_processor;
use ordhook::core::protocol::inscription_parsing::parse_inscriptions_and_standardize_block;
use ordhook::core::protocol::satoshi_numbering::compute_satoshi_number;
use ordhook::db::{
delete_data_in_ordhook_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
find_latest_inscription_block_height, find_lazy_block_at_block_height,
get_default_ordhook_db_file_path, initialize_ordhook_db, open_readonly_ordhook_db_conn,
open_readonly_ordhook_db_conn_rocks_db, open_readwrite_ordhook_db_conn,
open_readwrite_ordhook_db_conn_rocks_db,
open_readonly_ordhook_db_conn_rocks_db, open_readwrite_ordhook_db_conn, open_ordhook_db_conn_rocks_db_loop,
};
use ordhook::download::download_ordinals_dataset_if_required;
use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
@@ -36,6 +38,9 @@ use std::collections::BTreeMap;
use std::io::{BufReader, Read};
use std::path::PathBuf;
use std::process;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
@@ -68,14 +73,19 @@ enum ScanCommand {
/// Retrieve activities for a given inscription
#[clap(name = "inscription", bin_name = "inscription")]
Inscription(ScanInscriptionCommand),
/// Retrieve activities for a given inscription
#[clap(name = "transaction", bin_name = "transaction")]
Transaction(ScanTransactionCommand),
}
#[derive(Parser, PartialEq, Clone, Debug)]
struct ScanBlocksCommand {
/// Starting block
pub start_block: u64,
/// Ending block
pub end_block: u64,
/// Interval of blocks (--interval 767430:800000)
#[clap(long = "interval", conflicts_with = "blocks")]
pub blocks_interval: Option<String>,
/// List of blocks (--blocks 767430,767431,767433,800000)
#[clap(long = "blocks", conflicts_with = "interval")]
pub blocks: Option<String>,
/// Target Regtest network
#[clap(
long = "regtest",
@@ -148,6 +158,43 @@ struct ScanInscriptionCommand {
pub config_path: Option<String>,
}
#[derive(Parser, PartialEq, Clone, Debug)]
struct ScanTransactionCommand {
/// Block Hash
pub block_height: u64,
/// Inscription Id
pub transaction_id: String,
/// Target Regtest network
#[clap(
long = "regtest",
conflicts_with = "testnet",
conflicts_with = "mainnet"
)]
pub regtest: bool,
/// Target Testnet network
#[clap(
long = "testnet",
conflicts_with = "regtest",
conflicts_with = "mainnet"
)]
pub testnet: bool,
/// Target Mainnet network
#[clap(
long = "mainnet",
conflicts_with = "testnet",
conflicts_with = "regtest"
)]
pub mainnet: bool,
/// Load config file path
#[clap(
long = "config-path",
conflicts_with = "mainnet",
conflicts_with = "testnet",
conflicts_with = "regtest"
)]
pub config_path: Option<String>,
}
#[derive(Subcommand, PartialEq, Clone, Debug)]
enum RepairCommand {
/// Rewrite blocks data in hord.rocksdb
@@ -187,9 +234,9 @@ impl RepairStorageCommand {
pub fn get_blocks(&self) -> Vec<u64> {
let blocks = match (&self.blocks_interval, &self.blocks) {
(Some(interval), None) => {
let blocks = interval.split(":").collect::<Vec<_>>();
let blocks = interval.split(':').collect::<Vec<_>>();
let start_block: u64 = blocks
.get(0)
.first()
.expect("unable to get start_block")
.parse::<u64>()
.expect("unable to parse start_block");
@@ -202,7 +249,7 @@ impl RepairStorageCommand {
}
(None, Some(blocks)) => {
let blocks = blocks
.split(",")
.split(',')
.map(|b| b.parse::<u64>().expect("unable to parse block"))
.collect::<Vec<_>>();
BlockHeights::Blocks(blocks).get_sorted_entries()
@@ -467,14 +514,11 @@ pub fn main() {
}
};
match hiro_system_kit::nestable_block_on(handle_command(opts, &ctx)) {
Err(e) => {
if let Err(e) = hiro_system_kit::nestable_block_on(handle_command(opts, &ctx)) {
error!(ctx.expect_logger(), "{e}");
std::thread::sleep(std::time::Duration::from_millis(500));
process::exit(1);
}
Ok(_) => {}
}
}
async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
@@ -487,9 +531,8 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
// - Replay based on SQLite queries
// If post-to:
// - Replay that requires connection to bitcoind
let mut block_range =
BlockHeights::BlockRange(cmd.start_block, cmd.end_block).get_sorted_entries();
let block_heights = parse_blocks_heights_spec(&cmd.blocks_interval, &cmd.blocks);
let mut block_range = block_heights.get_sorted_entries();
if let Some(ref post_to) = cmd.post_to {
info!(ctx.expect_logger(), "A fully synchronized bitcoind node is required for retrieving inscriptions content.");
info!(
@@ -497,17 +540,20 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
"Checking {}...", config.network.bitcoind_rpc_url
);
let tip = check_bitcoind_connection(&config).await?;
if tip < cmd.end_block {
error!(ctx.expect_logger(), "Unable to scan block range [{}, {}]: underlying bitcoind synchronized until block #{} ", cmd.start_block, cmd.end_block, tip);
if let Some(highest_desired) = block_range.pop_back() {
if tip < highest_desired {
error!(ctx.expect_logger(), "Unable to scan desired block range: underlying bitcoind synchronized until block #{} ", tip);
} else {
info!(ctx.expect_logger(), "Starting scan");
}
block_range.push_back(highest_desired);
}
let predicate_spec = build_predicate_from_cli(
&config,
&post_to,
cmd.start_block,
Some(cmd.end_block),
post_to,
Some(&block_heights),
None,
cmd.auth_token,
)?
.into_selected_network_specification(&config.network.bitcoin_network)?;
@@ -515,7 +561,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
&predicate_spec,
&config,
None,
&ctx,
ctx,
)
.await?;
} else {
@@ -524,12 +570,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut total_transfers = 0;
let inscriptions_db_conn =
initialize_ordhook_db(&config.expected_cache_path(), &ctx);
initialize_ordhook_db(&config.expected_cache_path(), ctx);
while let Some(block_height) = block_range.pop_front() {
let inscriptions =
find_all_inscriptions_in_block(&block_height, &inscriptions_db_conn, &ctx);
find_all_inscriptions_in_block(&block_height, &inscriptions_db_conn, ctx);
let mut locations =
find_all_transfers_in_block(&block_height, &inscriptions_db_conn, &ctx);
find_all_transfers_in_block(&block_height, &inscriptions_db_conn, ctx);
let mut total_transfers_in_block = 0;
@@ -556,7 +602,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
);
}
}
if total_transfers_in_block > 0 && inscriptions.len() > 0 {
if total_transfers_in_block > 0 && !inscriptions.is_empty() {
println!(
"Inscriptions revealed: {}, inscriptions transferred: {total_transfers_in_block}",
inscriptions.len()
@@ -581,9 +627,9 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let _ = download_ordinals_dataset_if_required(&config, ctx).await;
let inscriptions_db_conn =
open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
open_readonly_ordhook_db_conn(&config.expected_cache_path(), ctx)?;
let (inscription, block_height) =
match find_inscription_with_id(&cmd.inscription_id, &inscriptions_db_conn, &ctx)? {
match find_inscription_with_id(&cmd.inscription_id, &inscriptions_db_conn, ctx)? {
Some(entry) => entry,
_ => {
return Err(format!(
@@ -602,7 +648,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let transfers = find_all_inscription_transfers(
&inscription.get_inscription_id(),
&inscriptions_db_conn,
&ctx,
ctx,
);
for (transfer, block_height) in transfers.iter().skip(1) {
println!(
@@ -612,20 +658,51 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
}
println!("Number of transfers: {}", transfers.len() - 1);
}
Command::Scan(ScanCommand::Transaction(cmd)) => {
let config: Config =
ConfigFile::default(cmd.regtest, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
let http_client = build_http_client();
let block = fetch_and_standardize_block(
&http_client,
cmd.block_height,
&config.get_event_observer_config().get_bitcoin_config(),
ctx,
)
.await?;
let transaction_identifier = TransactionIdentifier::new(&cmd.transaction_id);
let cache = new_traversals_lazy_cache(100);
let res = compute_satoshi_number(
&config.get_ordhook_config().db_path,
&block.block_identifier,
&transaction_identifier,
0,
0,
&Arc::new(cache),
ctx,
)?;
println!("{:?}", res);
}
Command::Service(subcmd) => match subcmd {
ServiceCommand::Start(cmd) => {
let maintenance_enabled =
std::env::var("ORDHOOK_MAINTENANCE").unwrap_or("0".into());
if maintenance_enabled.eq("1") {
info!(ctx.expect_logger(), "Entering maintenance mode (default duration = 7 days). Unset ORDHOOK_MAINTENANCE and reboot to resume operations");
sleep(Duration::from_secs(3600 * 24 * 7))
}
let config =
ConfigFile::default(cmd.regtest, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
let _ = initialize_ordhook_db(&config.expected_cache_path(), &ctx);
let _ = initialize_ordhook_db(&config.expected_cache_path(), ctx);
let inscriptions_db_conn =
open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
open_readonly_ordhook_db_conn(&config.expected_cache_path(), ctx)?;
let last_known_block =
find_latest_inscription_block_height(&inscriptions_db_conn, &ctx)?;
find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?;
if last_known_block.is_none() {
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
}
let ordhook_config = config.get_ordhook_config();
@@ -653,19 +730,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let predicate = build_predicate_from_cli(
&config,
post_to,
start_block,
None,
Some(start_block),
cmd.auth_token.clone(),
)?;
predicates.push(ChainhookFullSpecification::Bitcoin(predicate));
}
// let predicates = cmd
// .predicates_paths
// .iter()
// .map(|p| load_predicate_from_path(p))
// .collect::<Result<Vec<ChainhookFullSpecification>, _>>()?;
let mut service = Service::new(config, ctx.clone());
return service.run(predicates, None).await;
}
@@ -687,12 +758,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
},
Command::Db(OrdhookDbCommand::New(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
initialize_ordhook_db(&config.expected_cache_path(), &ctx);
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
initialize_ordhook_db(&config.expected_cache_path(), ctx);
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
}
Command::Db(OrdhookDbCommand::Sync(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
initialize_ordhook_db(&config.expected_cache_path(), &ctx);
initialize_ordhook_db(&config.expected_cache_path(), ctx);
let service = Service::new(config, ctx.clone());
service.update_state(None).await?;
}
@@ -715,7 +786,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
ordhook_config.first_inscription_height,
Some(&block_ingestion_processor),
10_000,
&ctx,
ctx,
)
.await?;
if let Some(true) = cmd.debug {
@@ -749,7 +820,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let block_post_processor = match cmd.repair_observers {
Some(true) => {
let tx_replayer =
start_observer_forwarding(&config.get_event_observer_config(), &ctx);
start_observer_forwarding(&config.get_event_observer_config(), ctx);
Some(tx_replayer)
}
_ => None,
@@ -764,7 +835,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
ordhook_config.first_inscription_height,
Some(&inscription_indexing_processor),
10_000,
&ctx,
ctx,
)
.await?;
}
@@ -773,7 +844,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let block_post_processor = match cmd.repair_observers {
Some(true) => {
let tx_replayer =
start_observer_forwarding(&config.get_event_observer_config(), &ctx);
start_observer_forwarding(&config.get_event_observer_config(), ctx);
Some(tx_replayer)
}
_ => None,
@@ -796,13 +867,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
{
let blocks_db =
open_readonly_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
open_readonly_ordhook_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
let tip = find_last_block_inserted(&blocks_db) as u64;
println!("Tip: {}", tip);
let mut missing_blocks = vec![];
for i in cmd.start_block..=cmd.end_block {
if find_lazy_block_at_block_height(i as u32, 0, false, &blocks_db, &ctx)
if find_lazy_block_at_block_height(i as u32, 0, false, &blocks_db, ctx)
.is_none()
{
println!("Missing block #{i}");
@@ -815,16 +886,16 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(OrdhookDbCommand::Drop(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
let blocks_db =
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
let inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), ctx)?;
delete_data_in_ordhook_db(
cmd.start_block,
cmd.end_block,
&blocks_db,
&inscriptions_db_conn_rw,
&ctx,
ctx,
)?;
info!(
ctx.expect_logger(),
@@ -839,7 +910,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
pub fn load_predicate_from_path(
predicate_path: &str,
) -> Result<ChainhookFullSpecification, String> {
let file = std::fs::File::open(&predicate_path)
let file = std::fs::File::open(predicate_path)
.map_err(|e| format!("unable to read file {}\n{:?}", predicate_path, e))?;
let mut file_reader = BufReader::new(file);
let mut file_buffer = vec![];
@@ -858,30 +929,35 @@ pub async fn fetch_and_standardize_block(
ctx: &Context,
) -> Result<BitcoinBlockData, String> {
let block_hash =
retrieve_block_hash_with_retry(http_client, &block_height, &bitcoin_config, &ctx).await?;
retrieve_block_hash_with_retry(http_client, &block_height, bitcoin_config, ctx).await?;
let block_breakdown =
download_and_parse_block_with_retry(http_client, &block_hash, &bitcoin_config, &ctx)
.await?;
download_and_parse_block_with_retry(http_client, &block_hash, bitcoin_config, ctx).await?;
parse_inscriptions_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx)
parse_inscriptions_and_standardize_block(block_breakdown, &bitcoin_config.network, ctx)
.map_err(|(e, _)| e)
}
pub fn build_predicate_from_cli(
config: &Config,
post_to: &str,
start_block: u64,
end_block: Option<u64>,
block_heights: Option<&BlockHeights>,
start_block: Option<u64>,
auth_token: Option<String>,
) -> Result<BitcoinChainhookFullSpecification, String> {
let mut networks = BTreeMap::new();
// Retrieve last block height known, and display it
let (start_block, end_block, blocks) = match (start_block, block_heights) {
(None, Some(BlockHeights::BlockRange(start, end))) => (Some(*start), Some(*end), None),
(None, Some(BlockHeights::Blocks(blocks))) => (None, None, Some(blocks.clone())),
(Some(start), None) => (Some(start), None, None),
_ => unreachable!(),
};
networks.insert(
config.network.bitcoin_network.clone(),
BitcoinChainhookNetworkSpecification {
start_block: Some(start_block),
start_block,
end_block,
blocks: None,
blocks,
expire_after_occurrence: None,
include_proof: None,
include_inputs: None,
@@ -914,19 +990,47 @@ pub async fn check_bitcoind_connection(config: &Config) -> Result<u64, String> {
let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
Ok(con) => con,
Err(message) => {
return Err(format!(
"unable to connect to bitcoind: {}",
message.to_string()
));
return Err(format!("unable to connect to bitcoind: {}", message));
}
};
let end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!("unable to connect to bitcoind: {}", e.to_string()));
return Err(format!("unable to connect to bitcoind: {}", e));
}
};
Ok(end_block)
}
fn parse_blocks_heights_spec(
blocks_interval: &Option<String>,
blocks: &Option<String>,
) -> BlockHeights {
let blocks = match (blocks_interval, blocks) {
(Some(interval), None) => {
let blocks = interval.split(':').collect::<Vec<_>>();
let start_block: u64 = blocks
.first()
.expect("unable to get start_block")
.parse::<u64>()
.expect("unable to parse start_block");
let end_block: u64 = blocks
.get(1)
.expect("unable to get end_block")
.parse::<u64>()
.expect("unable to parse end_block");
BlockHeights::BlockRange(start_block, end_block)
}
(None, Some(blocks)) => {
let blocks = blocks
.split(',')
.map(|b| b.parse::<u64>().expect("unable to parse block"))
.collect::<Vec<_>>();
BlockHeights::Blocks(blocks)
}
_ => unreachable!(),
};
blocks
}

View File

@@ -41,7 +41,7 @@ impl ConfigFile {
let config_file: ConfigFile = match toml::from_slice(&file_buffer) {
Ok(s) => s,
Err(e) => {
return Err(format!("Config file malformatted {}", e.to_string()));
return Err(format!("Config file malformatted {}", e));
}
};
ConfigFile::from_config_file(config_file)
@@ -153,7 +153,7 @@ impl ConfigFile {
(true, false, false, _) => Config::devnet_default(),
(false, true, false, _) => Config::testnet_default(),
(false, false, true, _) => Config::mainnet_default(),
(false, false, false, Some(config_path)) => ConfigFile::from_file_path(&config_path)?,
(false, false, false, Some(config_path)) => ConfigFile::from_file_path(config_path)?,
_ => Err("Invalid combination of arguments".to_string())?,
};
Ok(config)

View File

@@ -45,5 +45,5 @@ chainhook_internals = true
"#,
network = network.to_lowercase(),
);
return conf;
conf
}

View File

@@ -14,7 +14,7 @@ use chainhook_sdk::{
use crate::{
config::{Config, LogConfig},
db::{find_lazy_block_at_block_height, open_readwrite_ordhook_db_conn_rocks_db},
db::{find_lazy_block_at_block_height, open_ordhook_db_conn_rocks_db_loop},
};
use crate::db::{
@@ -95,7 +95,7 @@ pub fn compute_next_satpoint_data(
}
pub fn should_sync_rocks_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
let last_compressed_block = find_last_block_inserted(&blocks_db) as u64;
let last_indexed_block = match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?
@@ -128,7 +128,7 @@ pub fn should_sync_ordhook_db(
}
};
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let mut start_block = find_last_block_inserted(&blocks_db) as u64;
if start_block == 0 {

View File

@@ -9,7 +9,7 @@ use std::{
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{insert_entry_in_blocks, open_readwrite_ordhook_db_conn_rocks_db, LazyBlock},
db::{insert_entry_in_blocks, LazyBlock, open_ordhook_db_conn_rocks_db_loop},
};
pub fn start_block_archiving_processor(
@@ -26,18 +26,15 @@ pub fn start_block_archiving_processor(
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
.spawn(move || {
let blocks_db_rw =
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
.unwrap();
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let mut processed_blocks = 0;
loop {
debug!(ctx.expect_logger(), "Tick");
let (compacted_blocks, _) = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
(compacted_blocks, blocks)
}
Ok(PostProcessorCommand::Terminate) => {
debug!(ctx.expect_logger(), "Terminating block processor");
let _ = events_tx.send(PostProcessorEvent::Terminated);
break;
}

View File

@@ -83,7 +83,6 @@ pub fn start_inscription_indexing_processor(
(compacted_blocks, blocks)
}
Ok(PostProcessorCommand::Terminate) => {
debug!(ctx.expect_logger(), "Terminating block processor");
let _ = events_tx.send(PostProcessorEvent::Terminated);
break;
}
@@ -91,7 +90,9 @@ pub fn start_inscription_indexing_processor(
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 180 {
warn!(ctx.expect_logger(), "Block processor reached expiration");
ctx.try_log(|logger| {
info!(logger, "Block processor reached expiration")
});
let _ = events_tx.send(PostProcessorEvent::Expired);
break;
}
@@ -117,7 +118,7 @@ pub fn start_inscription_indexing_processor(
);
}
info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());
ctx.try_log(|logger| info!(logger, "Processing {} blocks", blocks.len()));
blocks = process_blocks(
&mut blocks,
@@ -132,19 +133,19 @@ pub fn start_inscription_indexing_processor(
garbage_collect_nth_block += blocks.len();
if garbage_collect_nth_block > garbage_collect_every_n_blocks {
ctx.try_log(|logger| info!(logger, "Performing garbage collecting"));
// Clear L2 cache on a regular basis
info!(
ctx.expect_logger(),
"Clearing cache L2 ({} entries)",
cache_l2.len()
);
ctx.try_log(|logger| {
info!(logger, "Clearing cache L2 ({} entries)", cache_l2.len())
});
cache_l2.clear();
// Recreate sqlite db connection on a regular basis
inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), &ctx)
.unwrap();
inscriptions_db_conn_rw.flush_prepared_statement_cache();
garbage_collect_nth_block = 0;
}
}
@@ -215,7 +216,7 @@ pub fn process_blocks(
if any_existing_activity {
ctx.try_log(|logger| {
warn!(
error!(
logger,
"Dropping updates for block #{}, activities present in database",
block.block_identifier.index,

View File

@@ -44,7 +44,6 @@ pub fn start_transfers_recomputing_processor(
blocks
}
Ok(PostProcessorCommand::Terminate) => {
debug!(ctx.expect_logger(), "Terminating block processor");
let _ = events_tx.send(PostProcessorEvent::Terminated);
break;
}
@@ -52,7 +51,9 @@ pub fn start_transfers_recomputing_processor(
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 10 {
warn!(ctx.expect_logger(), "Block processor reached expiration");
ctx.try_log(|logger| {
warn!(logger, "Block processor reached expiration")
});
let _ = events_tx.send(PostProcessorEvent::Expired);
break;
}
@@ -65,7 +66,7 @@ pub fn start_transfers_recomputing_processor(
},
};
info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());
ctx.try_log(|logger| info!(logger, "Processing {} blocks", blocks.len()));
let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap();
for block in blocks.iter_mut() {

View File

@@ -2,8 +2,6 @@ use std::{
collections::{BTreeMap, HashMap, VecDeque},
hash::BuildHasherDefault,
sync::Arc,
thread::sleep,
time::Duration,
};
use chainhook_sdk::{
@@ -784,8 +782,6 @@ pub fn consolidate_block_with_pre_computed_ordinals_data(
if results.len() == expected_inscriptions_count {
break results;
}
// Handle race conditions: if the db is being updated, the number of expected entries could be un-met.
sleep(Duration::from_secs(3));
ctx.try_log(|logger| {
warn!(
logger,

View File

@@ -2,7 +2,8 @@ use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier,
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier, OrdinalInscriptionTransferDestination,
OrdinalInscriptionTransferData, OrdinalInscriptionTransferDestination, OrdinalOperation,
TransactionIdentifier,
},
utils::Context,
};
@@ -84,14 +85,7 @@ pub fn augment_transaction_with_ordinals_transfers_data(
);
let entries =
match find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &inscriptions_db_tx)
{
Ok(entries) => entries,
Err(e) => {
ctx.try_log(|logger| warn!(logger, "unable query inscriptions: {e}"));
continue;
}
};
find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &inscriptions_db_tx, ctx);
// For each satpoint inscribed retrieved, we need to compute the next
// outpoint to watch
for watched_satpoint in entries.into_iter() {
@@ -124,10 +118,12 @@ pub fn augment_transaction_with_ordinals_transfers_data(
tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match Script::from_hex(&script_pub_key_hex) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(address) => OrdinalInscriptionTransferDestination::Transferred(address.to_string()),
Ok(address) => OrdinalInscriptionTransferDestination::Transferred(
address.to_string(),
),
Err(e) => {
ctx.try_log(|logger| {
warn!(
info!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
@@ -138,13 +134,15 @@ pub fn augment_transaction_with_ordinals_transfers_data(
},
Err(e) => {
ctx.try_log(|logger| {
warn!(
info!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
)
});
OrdinalInscriptionTransferDestination::Burnt(script_pub_key_hex.to_string())
OrdinalInscriptionTransferDestination::Burnt(
script_pub_key_hex.to_string(),
)
}
};
@@ -181,7 +179,12 @@ pub fn augment_transaction_with_ordinals_transfers_data(
offset
)
});
(outpoint, total_offset, OrdinalInscriptionTransferDestination::SpentInFees, None)
(
outpoint,
total_offset,
OrdinalInscriptionTransferDestination::SpentInFees,
None,
)
}
};

View File

@@ -1,7 +1,7 @@
use std::{
collections::BTreeMap,
io::{Read, Write},
path::PathBuf,
path::PathBuf, thread::sleep, time::Duration,
};
use rand::{thread_rng, Rng};
@@ -73,20 +73,20 @@ pub fn initialize_ordhook_db(path: &PathBuf, ctx: &Context) -> Connection {
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_ordinal_number ON inscriptions(ordinal_number);",
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_inscription_number ON inscriptions(inscription_number);",
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_block_height ON inscriptions(block_height);",
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
}
}
if let Err(e) = conn.execute(
@@ -111,19 +111,19 @@ pub fn initialize_ordhook_db(path: &PathBuf, ctx: &Context) -> Connection {
"CREATE INDEX IF NOT EXISTS locations_indexed_on_block_height ON locations(block_height);",
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS locations_indexed_on_outpoint_to_watch ON locations(outpoint_to_watch);",
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS locations_indexed_on_inscription_id ON locations(inscription_id);",
[],
) {
ctx.try_log(|logger| warn!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
}
}
@@ -162,7 +162,8 @@ pub fn create_or_open_readwrite_db(cache_path: &PathBuf, ctx: &Context) -> Conne
std::thread::sleep(std::time::Duration::from_secs(1));
};
// db.profile(Some(trace_profile));
// db.busy_handler(Some(tx_busy_handler))?;
conn.busy_timeout(std::time::Duration::from_secs(300))
.expect("unable to set db timeout");
// let mmap_size: i64 = 256 * 1024 * 1024;
// let page_size: i64 = 16384;
// conn.pragma_update(None, "mmap_size", mmap_size).unwrap();
@@ -190,11 +191,15 @@ fn open_existing_readonly_db(path: &PathBuf, ctx: &Context) -> Connection {
match Connection::open_with_flags(path, open_flags) {
Ok(conn) => break conn,
Err(e) => {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
ctx.try_log(|logger| {
warn!(logger, "unable to open hord.rocksdb: {}", e.to_string())
});
}
};
std::thread::sleep(std::time::Duration::from_secs(1));
};
conn.busy_timeout(std::time::Duration::from_secs(300))
.expect("unable to set db timeout");
return conn;
}
@@ -267,12 +272,12 @@ pub fn open_readwrite_ordhook_dbs(
base_dir: &PathBuf,
ctx: &Context,
) -> Result<(DB, Connection), String> {
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&base_dir, &ctx)?;
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &base_dir, &ctx);
let inscriptions_db = open_readwrite_ordhook_db_conn(&base_dir, &ctx)?;
Ok((blocks_db, inscriptions_db))
}
pub fn open_readwrite_ordhook_db_conn_rocks_db(
fn open_readwrite_ordhook_db_conn_rocks_db(
base_dir: &PathBuf,
_ctx: &Context,
) -> Result<DB, String> {
@@ -288,12 +293,30 @@ pub fn insert_entry_in_blocks(
lazy_block: &LazyBlock,
update_tip: bool,
blocks_db_rw: &DB,
_ctx: &Context,
ctx: &Context,
) {
let block_height_bytes = block_height.to_be_bytes();
blocks_db_rw
.put(&block_height_bytes, &lazy_block.bytes)
.expect("unable to insert blocks");
let mut retries = 0;
loop {
let res = blocks_db_rw.put(&block_height_bytes, &lazy_block.bytes);
match res {
Ok(_) => break,
Err(e) => {
retries += 1;
if retries > 10 {
ctx.try_log(|logger| {
error!(
logger,
"unable to insert block {block_height} ({}). will retry in 5 secs",
e.to_string()
);
});
sleep(Duration::from_secs(5));
}
}
}
}
if update_tip {
blocks_db_rw
.put(b"metadata::last_insert", block_height_bytes)
@@ -386,11 +409,30 @@ pub fn insert_entry_in_inscriptions(
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
if let Err(e) = inscriptions_db_conn_rw.execute(
while let Err(e) = inscriptions_db_conn_rw.execute(
"INSERT INTO inscriptions (inscription_id, ordinal_number, inscription_number, block_height) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![&inscription_data.inscription_id, &inscription_data.ordinal_number, &inscription_data.inscription_number, &block_identifier.index],
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
pub fn insert_inscription_in_locations(
inscription_data: &OrdinalInscriptionRevealData,
block_identifier: &BlockIdentifier,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
let (tx, output_index, offset) =
parse_satpoint_to_watch(&inscription_data.satpoint_post_inscription);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
while let Err(e) = inscriptions_db_conn_rw.execute(
"INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&inscription_data.inscription_id, &outpoint_to_watch, offset, &block_identifier.index, &inscription_data.tx_index],
) {
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
@@ -445,23 +487,6 @@ pub fn insert_new_inscriptions_from_block_in_locations(
}
}
pub fn insert_inscription_in_locations(
inscription_data: &OrdinalInscriptionRevealData,
block_identifier: &BlockIdentifier,
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
let (tx, output_index, offset) =
parse_satpoint_to_watch(&inscription_data.satpoint_post_inscription);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
if let Err(e) = inscriptions_db_conn_rw.execute(
"INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&inscription_data.inscription_id, &outpoint_to_watch, offset, &block_identifier.index, &inscription_data.tx_index],
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
}
}
pub fn insert_transfer_in_locations_tx(
transfer_data: &OrdinalInscriptionTransferData,
block_identifier: &BlockIdentifier,
@@ -470,11 +495,12 @@ pub fn insert_transfer_in_locations_tx(
) {
let (tx, output_index, offset) = parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
if let Err(e) = inscriptions_db_conn_rw.execute(
while let Err(e) = inscriptions_db_conn_rw.execute(
"INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&transfer_data.inscription_id, &outpoint_to_watch, offset, &block_identifier.index, &transfer_data.tx_index],
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
@@ -486,126 +512,198 @@ pub fn insert_transfer_in_locations(
) {
let (tx, output_index, offset) = parse_satpoint_to_watch(&transfer_data.satpoint_post_transfer);
let outpoint_to_watch = format_outpoint_to_watch(&tx, output_index);
if let Err(e) = inscriptions_db_conn_rw.execute(
while let Err(e) = inscriptions_db_conn_rw.execute(
"INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&transfer_data.inscription_id, &outpoint_to_watch, offset, &block_identifier.index, &transfer_data.tx_index],
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
fn perform_query_exists(
query: &str,
args: &[&dyn ToSql],
db_conn: &Connection,
ctx: &Context,
) -> bool {
let res = perform_query(query, args, db_conn, ctx, |_| true, true);
!res.is_empty()
}
fn perform_query_one<F, T>(
query: &str,
args: &[&dyn ToSql],
db_conn: &Connection,
ctx: &Context,
mapping_func: F,
) -> Option<T>
where
F: Fn(&rusqlite::Row<'_>) -> T,
{
let mut res = perform_query(query, args, db_conn, ctx, mapping_func, true);
match res.is_empty() {
true => None,
false => Some(res.remove(0)),
}
}
fn perform_query_set<F, T>(
query: &str,
args: &[&dyn ToSql],
db_conn: &Connection,
ctx: &Context,
mapping_func: F,
) -> Vec<T>
where
F: Fn(&rusqlite::Row<'_>) -> T,
{
perform_query(query, args, db_conn, ctx, mapping_func, false)
}
fn perform_query<F, T>(
query: &str,
args: &[&dyn ToSql],
db_conn: &Connection,
ctx: &Context,
mapping_func: F,
stop_at_first: bool,
) -> Vec<T>
where
F: Fn(&rusqlite::Row<'_>) -> T,
{
let mut results = vec![];
loop {
let mut stmt = match db_conn.prepare(query) {
Ok(stmt) => stmt,
Err(e) => {
ctx.try_log(|logger| {
warn!(logger, "unable to prepare query {query}: {}", e.to_string())
});
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
};
match stmt.query(args) {
Ok(mut rows) => loop {
match rows.next() {
Ok(Some(row)) => {
let r = mapping_func(row);
results.push(r);
if stop_at_first {
break;
}
}
Ok(None) => break,
Err(e) => {
ctx.try_log(|logger| {
warn!(
logger,
"unable to iterate over results from {query}: {}",
e.to_string()
)
});
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
}
},
Err(e) => {
ctx.try_log(|logger| {
warn!(logger, "unable to execute query {query}: {}", e.to_string())
});
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
};
break;
}
results
}
pub fn get_any_entry_in_ordinal_activities(
block_height: &u64,
inscriptions_db_tx: &Connection,
db_conn: &Connection,
ctx: &Context,
) -> bool {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = match inscriptions_db_tx
.prepare("SELECT DISTINCT block_height FROM inscriptions WHERE block_height = ?")
{
Ok(stmt) => stmt,
Err(e) => {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
panic!();
}
};
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(_)) = rows.next() {
return true;
}
let mut stmt = inscriptions_db_tx
.prepare("SELECT DISTINCT block_height FROM locations WHERE block_height = ?")
.unwrap();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(_)) = rows.next() {
let query = "SELECT DISTINCT block_height FROM inscriptions WHERE block_height = ?";
if perform_query_exists(query, args, db_conn, ctx) {
return true;
}
false
let query = "SELECT DISTINCT block_height FROM locations WHERE block_height = ?";
perform_query_exists(query, args, db_conn, ctx)
}
pub fn find_latest_inscription_block_height(
inscriptions_db_conn: &Connection,
_ctx: &Context,
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<u64>, String> {
let args: &[&dyn ToSql] = &[];
let mut stmt = inscriptions_db_conn
.prepare("SELECT block_height FROM inscriptions ORDER BY block_height DESC LIMIT 1")
.unwrap();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let query = "SELECT block_height FROM inscriptions ORDER BY block_height DESC LIMIT 1";
let entry = perform_query_one(query, args, db_conn, ctx, |row| {
let block_height: u64 = row.get(0).unwrap();
return Ok(Some(block_height));
}
Ok(None)
block_height
});
Ok(entry)
}
pub fn find_initial_inscription_transfer_data(
inscription_id: &str,
inscriptions_db_conn: &Connection,
_ctx: &Context,
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<TransferData>, String> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE inscription_id = ? ORDER BY block_height ASC, tx_index ASC LIMIT 1")
.unwrap();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let query = "SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE inscription_id = ? ORDER BY block_height ASC, tx_index ASC LIMIT 1";
let entry = perform_query_one(query, args, db_conn, ctx, |row| {
let outpoint_to_watch: String = row.get(0).unwrap();
let (transaction_identifier_location, output_index) =
parse_outpoint_to_watch(&outpoint_to_watch);
let inscription_offset_intra_output: u64 = row.get(1).unwrap();
let tx_index: u64 = row.get(2).unwrap();
return Ok(Some(TransferData {
TransferData {
transaction_identifier_location,
output_index,
inscription_offset_intra_output,
tx_index,
}));
}
Ok(None)
});
Ok(entry)
}
pub fn find_latest_inscription_transfer_data(
inscription_id: &str,
inscriptions_db_conn: &Connection,
_ctx: &Context,
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<TransferData>, String> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE inscription_id = ? ORDER BY block_height DESC, tx_index DESC LIMIT 1")
.unwrap();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let query = "SELECT outpoint_to_watch, offset, tx_index FROM locations WHERE inscription_id = ? ORDER BY block_height DESC, tx_index DESC LIMIT 1";
let entry = perform_query_one(query, args, db_conn, ctx, |row| {
let outpoint_to_watch: String = row.get(0).unwrap();
let (transaction_identifier_location, output_index) =
parse_outpoint_to_watch(&outpoint_to_watch);
let inscription_offset_intra_output: u64 = row.get(1).unwrap();
let tx_index: u64 = row.get(2).unwrap();
return Ok(Some(TransferData {
TransferData {
transaction_identifier_location,
output_index,
inscription_offset_intra_output,
tx_index,
}));
}
Ok(None)
});
Ok(entry)
}
pub fn find_latest_transfers_block_height(
inscriptions_db_conn: &Connection,
_ctx: &Context,
) -> Option<u64> {
pub fn find_latest_transfers_block_height(db_conn: &Connection, ctx: &Context) -> Option<u64> {
let args: &[&dyn ToSql] = &[];
let mut stmt = inscriptions_db_conn
.prepare("SELECT block_height FROM locations ORDER BY block_height DESC LIMIT 1")
.unwrap();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let query = "SELECT block_height FROM locations ORDER BY block_height DESC LIMIT 1";
let entry = perform_query_one(query, args, db_conn, ctx, |row| {
let block_height: u64 = row.get(0).unwrap();
return Some(block_height);
}
None
block_height
});
entry
}
#[derive(Debug, Clone)]
@@ -618,16 +716,39 @@ pub struct TransferData {
pub fn find_all_transfers_in_block(
block_height: &u64,
inscriptions_db_conn: &Connection,
_ctx: &Context,
db_conn: &Connection,
ctx: &Context,
) -> BTreeMap<String, Vec<TransferData>> {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT inscription_id, offset, outpoint_to_watch, tx_index FROM locations WHERE block_height = ? ORDER BY tx_index ASC")
.unwrap();
let mut stmt = loop {
match db_conn.prepare("SELECT inscription_id, offset, outpoint_to_watch, tx_index FROM locations WHERE block_height = ? ORDER BY tx_index ASC")
{
Ok(stmt) => break stmt,
Err(e) => {
ctx.try_log(|logger| {
warn!(logger, "unable to prepare query hord.sqlite: {}", e.to_string())
});
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
};
let mut results: BTreeMap<String, Vec<TransferData>> = BTreeMap::new();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let mut rows = loop {
match stmt.query(args) {
Ok(rows) => break rows,
Err(e) => {
ctx.try_log(|logger| {
warn!(logger, "unable to query hord.sqlite: {}", e.to_string())
});
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
};
loop {
match rows.next() {
Ok(Some(row)) => {
let inscription_id: String = row.get(0).unwrap();
let inscription_offset_intra_output: u64 = row.get(1).unwrap();
let outpoint_to_watch: String = row.get(2).unwrap();
@@ -645,21 +766,26 @@ pub fn find_all_transfers_in_block(
.and_modify(|v| v.push(transfer.clone()))
.or_insert(vec![transfer]);
}
Ok(None) => break,
Err(e) => {
ctx.try_log(|logger| {
warn!(logger, "unable to query hord.sqlite: {}", e.to_string())
});
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
}
return results;
}
pub fn find_all_inscription_transfers(
inscription_id: &str,
inscriptions_db_conn: &Connection,
_ctx: &Context,
db_conn: &Connection,
ctx: &Context,
) -> Vec<(TransferData, u64)> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT offset, outpoint_to_watch, tx_index, block_height FROM locations WHERE inscription_id = ? ORDER BY block_height ASC, tx_index ASC")
.unwrap();
let mut results = vec![];
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let query = "SELECT offset, outpoint_to_watch, tx_index, block_height FROM locations WHERE inscription_id = ? ORDER BY block_height ASC, tx_index ASC";
perform_query_set(query, args, db_conn, ctx, |row| {
let inscription_offset_intra_output: u64 = row.get(0).unwrap();
let outpoint_to_watch: String = row.get(1).unwrap();
let tx_index: u64 = row.get(2).unwrap();
@@ -673,143 +799,123 @@ pub fn find_all_inscription_transfers(
output_index,
tx_index,
};
results.push((transfer, block_height));
}
return results;
(transfer, block_height)
})
}
pub fn find_latest_inscription_number_at_block_height(
block_height: &u64,
latest_blessed_inscription_heigth: &Option<u64>,
inscriptions_db_conn: &Connection,
_ctx: &Context,
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<i64>, String> {
let (query, params) = match latest_blessed_inscription_heigth {
let (query, hint) = match latest_blessed_inscription_heigth {
Some(hint) => {
(
"SELECT inscription_number FROM inscriptions WHERE block_height = ? ORDER BY inscription_number DESC LIMIT 1",
[hint.to_sql().unwrap()]
*hint
)
}
None => {
(
"SELECT inscription_number FROM inscriptions WHERE block_height < ? ORDER BY inscription_number DESC LIMIT 1",
[block_height.to_sql().unwrap()]
*block_height
)
}
};
let mut stmt = inscriptions_db_conn
.prepare(query)
.map_err(|e| format!("unable to query inscriptions: {}", e.to_string()))?;
let mut rows = stmt
.query(params)
.map_err(|e| format!("unable to query inscriptions: {}", e.to_string()))?;
while let Ok(Some(row)) = rows.next() {
let entry = perform_query_one(query, &[&hint.to_sql().unwrap()], db_conn, ctx, |row| {
let inscription_number: i64 = row.get(0).unwrap();
return Ok(Some(inscription_number));
}
Ok(None)
inscription_number
});
Ok(entry)
}
pub fn find_latest_cursed_inscription_number_at_block_height(
block_height: &u64,
latest_cursed_inscription_heigth: &Option<u64>,
inscriptions_db_conn: &Connection,
_ctx: &Context,
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<i64>, String> {
let (query, params) = match latest_cursed_inscription_heigth {
let (query, hint) = match latest_cursed_inscription_heigth {
Some(hint) => {
(
"SELECT inscription_number FROM inscriptions WHERE block_height = ? ORDER BY inscription_number ASC LIMIT 1",
[hint.to_sql().unwrap()]
*hint
)
}
None => {
(
"SELECT inscription_number FROM inscriptions WHERE block_height < ? ORDER BY inscription_number ASC LIMIT 1",
[block_height.to_sql().unwrap()]
*block_height
)
}
};
let mut stmt = inscriptions_db_conn
.prepare(query)
.map_err(|e| format!("unable to query inscriptions: {}", e.to_string()))?;
let mut rows = stmt
.query(params)
.map_err(|e| format!("unable to query inscriptions: {}", e.to_string()))?;
while let Ok(Some(row)) = rows.next() {
let entry = perform_query_one(query, &[&hint.to_sql().unwrap()], db_conn, ctx, |row| {
let inscription_number: i64 = row.get(0).unwrap();
return Ok(Some(inscription_number));
}
Ok(None)
inscription_number
});
Ok(entry)
}
pub fn find_blessed_inscription_with_ordinal_number(
ordinal_number: &u64,
inscriptions_db_conn: &Connection,
_ctx: &Context,
db_conn: &Connection,
ctx: &Context,
) -> Option<String> {
let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT inscription_id FROM inscriptions WHERE ordinal_number = ? AND inscription_number >= 0")
.unwrap();
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let query = "SELECT inscription_id FROM inscriptions WHERE ordinal_number = ? AND inscription_number >= 0";
perform_query_one(query, args, db_conn, ctx, |row| {
let inscription_id: String = row.get(0).unwrap();
return Some(inscription_id);
}
return None;
inscription_id
})
}
pub fn find_inscription_with_id(
inscription_id: &str,
inscriptions_db_conn: &Connection,
db_conn: &Connection,
ctx: &Context,
) -> Result<Option<(TraversalResult, u64)>, String> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let mut stmt = loop {
match inscriptions_db_conn.prepare(
"SELECT inscription_number, ordinal_number, block_height FROM inscriptions WHERE inscription_id = ?",
) {
Ok(stmt) => break stmt,
Err(e) => {
ctx.try_log(|logger| {
warn!(
logger,
"unable to retrieve inscription with id: {}",
e.to_string(),
)
});
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
let Some(transfer_data) = find_initial_inscription_transfer_data(inscription_id, db_conn, ctx)?
else {
return Err(format!("unable to retrieve location for {inscription_id}"));
};
let mut rows = stmt.query(args).unwrap();
if let Some(transfer_data) =
find_initial_inscription_transfer_data(inscription_id, inscriptions_db_conn, ctx)?
{
while let Ok(Some(row)) = rows.next() {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let query = "SELECT inscription_number, ordinal_number, block_height FROM inscriptions WHERE inscription_id = ?";
let entry = perform_query_one(query, args, db_conn, ctx, move |row| {
let inscription_number: i64 = row.get(0).unwrap();
let ordinal_number: u64 = row.get(1).unwrap();
let block_height: u64 = row.get(2).unwrap();
let (transaction_identifier_inscription, inscription_input_index) =
parse_inscription_id(inscription_id);
let traversal = TraversalResult {
(
inscription_number,
ordinal_number,
inscription_input_index,
transaction_identifier_inscription,
block_height,
)
});
Ok(entry.map(
|(
inscription_number,
ordinal_number,
inscription_input_index,
transaction_identifier_inscription,
block_height,
)| {
(
TraversalResult {
inscription_number,
ordinal_number,
inscription_input_index,
transaction_identifier_inscription,
transfers: 0,
transfer_data,
};
return Ok(Some((traversal, block_height)));
}
}
return Ok(None);
},
block_height,
)
},
))
}
pub fn find_all_inscriptions_in_block(
@@ -817,15 +923,38 @@ pub fn find_all_inscriptions_in_block(
inscriptions_db_tx: &Connection,
ctx: &Context,
) -> BTreeMap<(TransactionIdentifier, usize), TraversalResult> {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = inscriptions_db_tx
.prepare("SELECT inscription_number, ordinal_number, inscription_id FROM inscriptions where block_height = ? ORDER BY inscription_number ASC")
.unwrap();
let mut results = BTreeMap::new();
let mut rows = stmt.query(args).unwrap();
let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_tx, ctx);
while let Ok(Some(row)) = rows.next() {
let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()];
let mut stmt = loop {
match inscriptions_db_tx.prepare("SELECT inscription_number, ordinal_number, inscription_id FROM inscriptions where block_height = ? ORDER BY inscription_number ASC")
{
Ok(stmt) => break stmt,
Err(e) => {
ctx.try_log(|logger| {
warn!(logger, "unable to prepare query hord.sqlite: {}", e.to_string())
});
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
};
let mut rows = loop {
match stmt.query(args) {
Ok(rows) => break rows,
Err(e) => {
ctx.try_log(|logger| {
warn!(logger, "unable to query hord.sqlite: {}", e.to_string())
});
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
};
let mut results = BTreeMap::new();
loop {
match rows.next() {
Ok(Some(row)) => {
let inscription_number: i64 = row.get(0).unwrap();
let ordinal_number: u64 = row.get(1).unwrap();
let inscription_id: String = row.get(2).unwrap();
@@ -838,7 +967,8 @@ pub fn find_all_inscriptions_in_block(
ctx.try_log(|logger| {
error!(
logger,
"unable to retrieve inscription genesis transfer data: {}", inscription_id,
"unable to retrieve inscription genesis transfer data: {}",
inscription_id,
)
});
continue;
@@ -856,6 +986,15 @@ pub fn find_all_inscriptions_in_block(
traversal,
);
}
Ok(None) => break,
Err(e) => {
ctx.try_log(|logger| {
warn!(logger, "unable to query hord.sqlite: {}", e.to_string())
});
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
}
return results;
}
@@ -874,54 +1013,40 @@ impl WatchedSatpoint {
pub fn find_watched_satpoint_for_inscription(
inscription_id: &str,
inscriptions_db_conn: &Connection,
) -> Result<(u64, WatchedSatpoint), String> {
db_conn: &Connection,
ctx: &Context,
) -> Option<(u64, WatchedSatpoint)> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let mut stmt = inscriptions_db_conn
.prepare("SELECT inscription_id, offset, block_height FROM locations WHERE inscription_id = ? ORDER BY offset ASC")
.map_err(|e| format!("unable to query locations table: {}", e.to_string()))?;
let mut rows = stmt
.query(args)
.map_err(|e| format!("unable to query locations table: {}", e.to_string()))?;
while let Ok(Some(row)) = rows.next() {
let query = "SELECT inscription_id, offset, block_height FROM locations WHERE inscription_id = ? ORDER BY offset ASC";
perform_query_one(query, args, db_conn, ctx, |row| {
let inscription_id: String = row.get(0).unwrap();
let offset: u64 = row.get(1).unwrap();
let block_height: u64 = row.get(2).unwrap();
return Ok((
(
block_height,
WatchedSatpoint {
inscription_id,
offset,
},
));
}
return Err(format!(
"unable to find inscription with id {}",
inscription_id
));
)
})
}
pub fn find_inscriptions_at_wached_outpoint(
outpoint: &str,
ordhook_db_conn: &Connection,
) -> Result<Vec<WatchedSatpoint>, String> {
db_conn: &Connection,
ctx: &Context,
) -> Vec<WatchedSatpoint> {
let args: &[&dyn ToSql] = &[&outpoint.to_sql().unwrap()];
let mut stmt = ordhook_db_conn
.prepare("SELECT inscription_id, offset FROM locations WHERE outpoint_to_watch = ? ORDER BY offset ASC")
.map_err(|e| format!("unable to query locations table: {}", e.to_string()))?;
let mut results = vec![];
let mut rows = stmt
.query(args)
.map_err(|e| format!("unable to query locations table: {}", e.to_string()))?;
while let Ok(Some(row)) = rows.next() {
let query = "SELECT inscription_id, offset FROM locations WHERE outpoint_to_watch = ? ORDER BY offset ASC";
perform_query_set(query, args, db_conn, ctx, |row| {
let inscription_id: String = row.get(0).unwrap();
let offset: u64 = row.get(1).unwrap();
results.push(WatchedSatpoint {
WatchedSatpoint {
inscription_id,
offset,
});
}
return Ok(results);
})
}
pub fn delete_inscriptions_in_block_range(
@@ -930,17 +1055,19 @@ pub fn delete_inscriptions_in_block_range(
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) {
if let Err(e) = inscriptions_db_conn_rw.execute(
while let Err(e) = inscriptions_db_conn_rw.execute(
"DELETE FROM inscriptions WHERE block_height >= ?1 AND block_height <= ?2",
rusqlite::params![&start_block, &end_block],
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
if let Err(e) = inscriptions_db_conn_rw.execute(
while let Err(e) = inscriptions_db_conn_rw.execute(
"DELETE FROM locations WHERE block_height >= ?1 AND block_height <= ?2",
rusqlite::params![&start_block, &end_block],
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
@@ -949,17 +1076,19 @@ pub fn remove_entry_from_inscriptions(
inscriptions_db_rw_conn: &Connection,
ctx: &Context,
) {
if let Err(e) = inscriptions_db_rw_conn.execute(
while let Err(e) = inscriptions_db_rw_conn.execute(
"DELETE FROM inscriptions WHERE inscription_id = ?1",
rusqlite::params![&inscription_id],
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
if let Err(e) = inscriptions_db_rw_conn.execute(
while let Err(e) = inscriptions_db_rw_conn.execute(
"DELETE FROM locations WHERE inscription_id = ?1",
rusqlite::params![&inscription_id],
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
@@ -968,11 +1097,12 @@ pub fn remove_entries_from_locations_at_block_height(
inscriptions_db_rw_conn: &Transaction,
ctx: &Context,
) {
if let Err(e) = inscriptions_db_rw_conn.execute(
while let Err(e) = inscriptions_db_rw_conn.execute(
"DELETE FROM locations WHERE block_height = ?1",
rusqlite::params![&block_height],
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
@@ -987,11 +1117,12 @@ pub fn insert_entry_in_locations(
&transfer_data.transaction_identifier_location,
transfer_data.output_index,
);
if let Err(e) = inscriptions_db_rw_conn.execute(
while let Err(e) = inscriptions_db_rw_conn.execute(
"INSERT INTO locations (inscription_id, outpoint_to_watch, offset, block_height, tx_index) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&inscription_id, &outpoint_to_watch, &transfer_data.inscription_offset_intra_output, &block_height, &transfer_data.tx_index],
) {
ctx.try_log(|logger| error!(logger, "{}", e.to_string()));
ctx.try_log(|logger| warn!(logger, "unable to query hord.sqlite: {}", e.to_string()));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
@@ -1002,15 +1133,19 @@ pub fn delete_data_in_ordhook_db(
inscriptions_db_conn_rw: &Connection,
ctx: &Context,
) -> Result<(), String> {
ctx.try_log(|logger| {
info!(
ctx.expect_logger(),
logger,
"Deleting entries from block #{start_block} to block #{end_block}"
);
)
});
delete_blocks_in_block_range(start_block as u32, end_block as u32, blocks_db_rw, &ctx);
ctx.try_log(|logger| {
info!(
ctx.expect_logger(),
logger,
"Deleting inscriptions and locations from block #{start_block} to block #{end_block}"
);
)
});
delete_inscriptions_in_block_range(
start_block as u32,
end_block as u32,

View File

@@ -1,6 +1,7 @@
use crate::config::{Config, PredicatesApi};
use crate::core::protocol::inscription_parsing::{
get_inscriptions_revealed_in_block, parse_inscriptions_and_standardize_block,
get_inscriptions_revealed_in_block, get_inscriptions_transferred_in_block,
parse_inscriptions_and_standardize_block,
};
use crate::core::protocol::inscription_sequencing::consolidate_block_with_pre_computed_ordinals_data;
use crate::db::{get_any_entry_in_ordinal_activities, open_readonly_ordhook_db_conn};
@@ -75,9 +76,6 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
BlockHeights::BlockRange(start_block, end_block).get_sorted_entries()
};
let mut inscriptions_db_conn =
open_readonly_ordhook_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
"Starting predicate evaluation on Bitcoin blocks",
@@ -96,6 +94,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
let http_client = build_http_client();
while let Some(current_block_height) = block_heights_to_scan.pop_front() {
let mut inscriptions_db_conn =
open_readonly_ordhook_db_conn(&config.expected_cache_path(), ctx)?;
number_of_blocks_scanned += 1;
if !get_any_entry_in_ordinal_activities(&current_block_height, &inscriptions_db_conn, &ctx)
@@ -143,9 +144,11 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();
let inscriptions_transferred = get_inscriptions_transferred_in_block(&block).len();
info!(
ctx.expect_logger(),
"Processing block #{current_block_height} through {} predicate ({} inscriptions revealed: [{}])",
"Processing block #{current_block_height} through {} predicate revealed {} new inscriptions [{}] and {inscriptions_transferred} transfers",
predicate_spec.uuid,
inscriptions_revealed.len(),
inscriptions_revealed.join(", ")

View File

@@ -278,19 +278,23 @@ pub fn get_entries_from_predicates_db(
let chainhook = match get_entry_from_predicates_db(predicate_key, predicate_db_conn, ctx) {
Ok(Some((spec, status))) => (spec, status),
Ok(None) => {
ctx.try_log(|logger| {
warn!(
ctx.expect_logger(),
logger,
"unable to load predicate associated with key {}", predicate_key,
);
)
});
continue;
}
Err(e) => {
ctx.try_log(|logger| {
error!(
ctx.expect_logger(),
logger,
"unable to load predicate associated with key {}: {}",
predicate_key,
e.to_string()
);
)
});
continue;
}
};

View File

@@ -9,13 +9,14 @@ use crate::core::pipeline::processors::inscription_indexing::process_block;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor;
use crate::core::protocol::inscription_parsing::{
get_inscriptions_revealed_in_block, parse_inscriptions_in_standardized_block,
get_inscriptions_revealed_in_block, get_inscriptions_transferred_in_block,
parse_inscriptions_in_standardized_block,
};
use crate::core::protocol::inscription_sequencing::SequenceCursor;
use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db, should_sync_rocks_db};
use crate::db::{
delete_data_in_ordhook_db, insert_entry_in_blocks, open_readwrite_ordhook_db_conn,
open_readwrite_ordhook_db_conn_rocks_db, open_readwrite_ordhook_dbs,
open_ordhook_db_conn_rocks_db_loop, open_readwrite_ordhook_dbs,
update_inscriptions_with_block, update_locations_with_block, LazyBlock, LazyBlockTransaction,
};
use crate::scan::bitcoin::process_block_with_predicates;
@@ -441,16 +442,17 @@ impl Service {
event_observer_config: &EventObserverConfig,
) -> Result<(), String> {
if rebuild_from_scratch {
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
true,
&self.config.expected_cache_path(),
&self.ctx,
)?;
);
let inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&self.config.expected_cache_path(), &self.ctx)?;
delete_data_in_ordhook_db(
767430,
800000,
820000,
&blocks_db,
&inscriptions_db_conn_rw,
&self.ctx,
@@ -574,10 +576,12 @@ fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ct
match command {
HandleBlock::UndoBlock(block) => {
ctx.try_log(|logger| {
info!(
ctx.expect_logger(),
logger,
"Re-org handling: reverting changes in block #{}", block.block_identifier.index
);
)
});
if let Err(e) = delete_data_in_ordhook_db(
block.block_identifier.index,
block.block_identifier.index,
@@ -685,7 +689,7 @@ pub fn chainhook_sidecar_mutate_blocks(
block_id_to_rollback.index,
&blocks_db_rw,
&inscriptions_db_tx,
&Context::empty(),
&ctx,
) {
ctx.try_log(|logger| {
error!(
@@ -750,10 +754,13 @@ pub fn chainhook_sidecar_mutate_blocks(
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();
let inscriptions_transferred =
get_inscriptions_transferred_in_block(&cache.block).len();
ctx.try_log(|logger| {
info!(
logger,
"Block #{} processed, mutated and revealed {} inscriptions [{}]",
"Block #{} processed, mutated and revealed {} inscriptions [{}] and {inscriptions_transferred} transfers",
cache.block.block_identifier.index,
inscriptions_revealed.len(),
inscriptions_revealed.join(", ")

View File

@@ -39,10 +39,12 @@ pub fn start_bitcoin_scan_runloop(
match hiro_system_kit::nestable_block_on(op) {
Ok(_) => {}
Err(e) => {
moved_ctx.try_log(|logger| {
error!(
moved_ctx.expect_logger(),
logger,
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
)
});
// Update predicate status in redis
if let PredicatesApi::On(ref api_config) = moved_config.http_api {

View File

@@ -52,11 +52,6 @@ impl OrdinalsIndexingRunloop {
tracer: false,
};
// Initialize service
// {
// let _ = initialize_ordhook_db(&ordhook_config.expected_cache_path(), &ctx);
// let _ = open_readwrite_ordhook_db_conn_rocks_db(&ordhook_config.expected_cache_path(), &ctx);
// }
let mut service: Service = Service::new(ordhook_config, ctx);
// Set-up the observer sidecar - used for augmenting the bitcoin blocks with
@@ -91,10 +86,14 @@ impl OrdinalsIndexingRunloop {
for to_rollback in payload.rollback.into_iter() {
loop {
let (tx, rx) = crossbeam_channel::bounded(1);
callback.call_with_return_value::<bool, _>(to_rollback.clone(), ThreadsafeFunctionCallMode::Blocking, move |p| {
callback.call_with_return_value::<bool, _>(
to_rollback.clone(),
ThreadsafeFunctionCallMode::Blocking,
move |p| {
let _ = tx.send(p);
Ok(())
});
},
);
match rx.recv() {
Ok(true) => break,
Ok(false) => continue,
@@ -108,10 +107,14 @@ impl OrdinalsIndexingRunloop {
for to_apply in payload.apply.into_iter() {
loop {
let (tx, rx) = crossbeam_channel::bounded(1);
callback.call_with_return_value::<bool, _>(to_apply.clone(), ThreadsafeFunctionCallMode::Blocking, move |p| {
callback.call_with_return_value::<bool, _>(
to_apply.clone(),
ThreadsafeFunctionCallMode::Blocking,
move |p| {
let _ = tx.send(p);
Ok(())
});
},
);
match rx.recv() {
Ok(true) => break,
Ok(false) => continue,

View File

@@ -30,11 +30,11 @@ COPY ./components/ordhook-cli /src/components/ordhook-cli
WORKDIR /src/components/ordhook-sdk-js
RUN yarn install
# RUN yarn install
RUN yarn build
# RUN yarn build
RUN cp *.node /out
# RUN cp *.node /out
WORKDIR /src/components/ordhook-cli
@@ -46,11 +46,9 @@ FROM debian:bullseye-slim
WORKDIR /ordhook-sdk-js
RUN apt update && apt install -y ca-certificates libssl-dev
RUN apt-get update && apt-get install -y ca-certificates libssl-dev
COPY --from=build /out/*.node /ordhook-sdk-js/
COPY --from=build /out/ordhook /bin/ordhook
# COPY --from=build /out/*.node /ordhook-sdk-js/
COPY --from=build /out/ordhook /bin/ordhook