Compare commits

..

7 Commits

Author SHA1 Message Date
PanJ
4b2da298ba read private key from file 2024-05-21 15:18:55 +07:00
Panjamapong Sermsawatsri
bd9acb8841 feat: fix reporting errors 2024-05-15 17:09:48 +07:00
PanJ
58761b1ef5 update reporting api 2024-05-15 15:33:59 +07:00
Panjamapong Sermsawatsri
85fdf79a00 fix: wif file name to add mainnet 2024-05-08 10:35:21 +07:00
Panjamapong Sermsawatsri
e0e2934160 feat: add wif generation 2024-05-08 10:28:47 +07:00
Panjamapong Sermsawatsri
d56c58334a feat: add new reporting client 2024-04-26 16:20:24 +07:00
Panjamapong Sermsawatsri
be316442ea feat: generate key command and crypto pkg 2024-04-23 16:44:16 +07:00
62 changed files with 2146 additions and 1804 deletions

View File

@@ -1 +0,0 @@
# Contributor Covenant Code of Conduct

View File

@@ -1,34 +0,0 @@
# Contributing
Please note: we have a [code of conduct](https://github.com/gaze-network/gaze-indexer/blob/main/.github/CODE_OF_CONDUCT.md), please follow it in all your interactions with the Gaze Network project.
## Pull Requests or Commits
#### Message structured
```plaintext
<type>(optional scope):<description>
```
The `<type>` must be one of the following:
> feat:, refactor:, fix:, doc:, style:, perf:, test:, chore:, ci:, build:
- feat(runes): add Runes module to the project
- refactor: change project structure
- fix(btc): fix chain reorganization issue
- doc: update \`run\` command documentation
- style: fix linting issues
- perf: improve performance of the bitcoin node datasource
- test(runes): add unit tests for etching logic
- chore: bump dependencies versions
- ci: update CI configuration
- build: update Dockerfile to use alpine
# 👍 Contribute
If you want to say **thank you** and/or support the active development of `Fiber`:
1. Add a [GitHub Star](https://github.com/gaze-network/gaze-indexer/stargazers) to the project.
2. Follow and mention our [Twitter (𝕏)](https://twitter.com/Gaze_Network).
3. Write a review or tutorial on [Medium](https://medium.com/), [Dev.to](https://dev.to/) or personal blog.

View File

@@ -1,22 +0,0 @@
## Description
Please provide a clear and concise description of the changes you've made and the problem they address. Include the purpose of the change, any relevant issues it solves, and the benefits it brings to the project. If this change introduces new features or adjustments, highlight them here.
Fixes # (issue)
## Type of change
What types of changes does your code introduce to Appium?
_Put an `x` in the boxes that apply_
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Enhancement (improvement to existing features and functionality)
- [ ] Documentation update (changes to documentation)
- [ ] Performance improvement (non-breaking change which improves efficiency)
- [ ] Code consistency (non-breaking change which improves code reliability and robustness)
## Commit formatting
Please follow the commit message conventions for an easy way to identify the purpose or intention of a commit. Check out our commit message conventions in the [CONTRIBUTING.md](https://github.com/gaze-network/gaze-indexer/blob/main/.github/CONTRIBUTING.md#pull-requests-or-commits)

View File

@@ -1,77 +0,0 @@
name: Code Analysis & Test
on:
workflow_dispatch:
pull_request:
branches:
- develop
- main
paths:
- "go.mod"
- "go.sum"
- "**.go"
- ".golangci.yaml"
- ".github/workflows/code-analysis.yml"
jobs:
lint:
strategy:
matrix:
os: ["ubuntu-latest"]
name: Lint (${{ matrix.os }})
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: "0"
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: "go.mod"
cache-dependency-path: "**/*.sum"
cache: true # caching and restoring go modules and build outputs.
- name: Lint
uses: reviewdog/action-golangci-lint@v2
with: # https://github.com/reviewdog/action-golangci-lint#inputs
go_version_file: "go.mod"
workdir: ./
golangci_lint_flags: "--config=./.golangci.yaml --verbose --new-from-rev=${{ github.event.pull_request.base.sha }}"
fail_on_error: true
test:
strategy:
matrix:
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
go-version: ["1.22.x", "1.x"] # minimum version and latest version
name: Test (${{ matrix.os }}/${{ matrix.go-version }})
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: "0"
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
cache: true # caching and restoring go modules and build outputs.
- run: echo "GOVERSION=$(go version)" >> $GITHUB_ENV
- name: Build
run: go build -v ./...
- name: Test
run: go test -json ./... > test_output.json
- name: Summary Test Results
if: always()
uses: robherley/go-test-action@v0
with:
fromJSONFile: test_output.json
- name: Annotate Test Results
if: always()
uses: guyarb/golang-test-annotations@v0.5.1
with:
test-results: test_output.json

View File

@@ -32,6 +32,6 @@ jobs:
uses: ./.github/workflows/reusable-build-and-push-ghcr.yml
with:
context: .
dockerfile: ./docker/Dockerfile
dockerfile: Dockerfile
image-repo: "ghcr.io/gaze-network/gaze-indexer"
image-tag: ${{ needs.prepare.outputs.tag }}

View File

@@ -1,28 +0,0 @@
name: Sqlc ORM Framework Verify
on:
workflow_dispatch:
pull_request:
branches:
- develop
- main
paths:
- "sqlc.yaml"
- "**.sql"
- ".github/workflows/sqlc-verify.yml"
jobs:
sqlc-diff:
name: Sqlc Diff Checker
runs-on: "ubuntu-latest" # "self-hosted", "ubuntu-latest", "macos-latest", "windows-latest"
steps:
- uses: actions/checkout@v4
with:
fetch-depth: "0"
- name: Setup Sqlc
uses: sqlc-dev/setup-sqlc@v4
with:
sqlc-version: "1.26.0"
- name: Check Diff
run: sqlc diff

2
.gitignore vendored
View File

@@ -3,6 +3,8 @@
# Eg. ignore.foo_test.go, ignore.credentials.json, ignore.config.yml
ignore.*
/key
**/cmd.local/**
**/cmd.local.**/**

140
README.md
View File

@@ -1,139 +1 @@
<!-- omit from toc -->
# Gaze Indexer
Gaze Indexer is an open-source and modular indexing client for Bitcoin meta-protocols. It has support for Bitcoin and Runes out of the box, with **Unified Consistent APIs** across fungible token protocols.
Gaze Indexer is built with **modularity** in mind, allowing users to run all modules in one monolithic instance with a single command, or as a distributed cluster of micro-services.
Gaze Indexer serves as a foundation for building ANY meta-protocol indexers, with efficient data fetching, reorg detection, and database migration tool.
This allows developers to focus on what **truly** matters: Meta-protocol indexing logic. New meta-protocols can be easily added by implementing new modules.
Gaze Indexer also comes with a block reporting system for verifying data integrity of indexers. Visit the [Gaze Network dashboard](https://dash.gaze.network) to see the status of other indexers.
- [Modules](#modules)
- [1. Bitcoin](#1-bitcoin)
- [2. Runes](#2-runes)
- [Installation](#installation)
- [Prerequisites](#prerequisites)
- [1. Hardware Requirements](#1-hardware-requirements)
- [2. Prepare Bitcoin Core RPC server.](#2-prepare-bitcoin-core-rpc-server)
- [3. Prepare database.](#3-prepare-database)
- [4. Prepare `config.yaml` file.](#4-prepare-configyaml-file)
- [Install with Docker (recommended)](#install-with-docker-recommended)
- [Install from source](#install-from-source)
## Modules
### 1. Bitcoin
The Bitcoin Indexer, the heart of every meta-protocol, is responsible for indexing **Bitcoin transactions, blocks, and UTXOs**. It requires a Bitcoin Core RPC as source of Bitcoin transactions,
and stores the indexed data in database to be used by other modules.
### 2. Runes
The Runes Indexer is our first meta-protocol indexer. It indexes Runes states, transactions, runestones, and balances using Bitcoin transactions.
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://documenter.getpostman.com/view/28396285/2sA3Bn7Cxr) for full details.
## Installation
### Prerequisites
#### 1. Hardware Requirements
Each module requires different hardware requirements.
| Module | CPU | RAM |
| ------- | ---------- | ------ |
| Bitcoin | 0.25 cores | 256 MB |
| Runes | 0.5 cores | 1 GB |
#### 2. Prepare Bitcoin Core RPC server.
Gaze Indexer needs to fetch transaction data from a Bitcoin Core RPC, either self-hosted or using managed providers like QuickNode.
To self host a Bitcoin Core, see https://bitcoin.org/en/full-node.
#### 3. Prepare database.
Gaze Indexer has first-class support for PostgreSQL. If you wish to use other databases, you can implement your own database repository that satisfies each module's Data Gateway interface.
Here is our minimum database disk space requirement for each module.
| Module | Database Storage |
| ------- | ---------------- |
| Bitcoin | 240 GB |
| Runes | 150 GB |
#### 4. Prepare `config.yaml` file.
```yaml
# config.yaml
logger:
output: text # Output format for logs. current supported formats: "text" | "json" | "gcp"
debug: false
bitcoin_node:
host: "" # [Required] Host of Bitcoin Core RPC (without https://)
user: "" # Username to authenticate with Bitcoin Core RPC
pass: "" # Password to authenticate with Bitcoin Core RPC
disable_tls: false # Set to true to disable tls
network: mainnet # Network to run the indexer on. Current supported networks: "mainnet" | "testnet"
reporting: # Block reporting configuration options. See Block Reporting section for more details.
disabled: false # Set to true to disable block reporting to Gaze Network. Default is false.
base_url: "https://indexer.api.gaze.network" # Defaults to "https://indexer.api.gaze.network" if left empty
name: "" # [Required if not disabled] Name of this indexer to show on the Gaze Network dashboard
website_url: "" # Public website URL to show on the dashboard. Can be left empty.
indexer_api_url: "" # Public url to access this indexer's API. Can be left empty if you want to keep your indexer private.
http_server:
port: 8080 # Port to run the HTTP server on for modules with HTTP API handlers.
modules:
bitcoin: # Configuration options for Bitcoin module. Can be removed if not used.
database: "postgres" # Database to store bitcoin data. current supported databases: "postgres"
postgres:
host: "localhost"
port: 5432
user: "postgres"
password: "password"
db_name: "postgres"
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database credentials above.
runes: # Configuration options for Runes module. Can be removed if not used.
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
datasource: "database" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node" | "database". If "database" is used, it will use the database config in bitcoin module as datasource.
api_handlers: # API handlers to enable. current supported handlers: "http"
- http
postgres:
host: "localhost"
port: 5432
user: "postgres"
password: "password"
db_name: "postgres"
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database credentials above.
```
### Install with Docker (recommended)
We will be using `docker-compose` for our installation guide. Make sure the `docker-compose.yaml` file is in the same directory as the `config.yaml` file.
```yaml
# docker-compose.yaml
services:
gaze-indexer:
image: ghcr.io/gaze-network/gaze-indexer:v1.0.0
container_name: gaze-indexer
restart: unless-stopped
volumes:
- './config.yaml:/app/config.yaml' # mount config.yaml file to the container as "/app/config.yaml"
command: ["/app/main", "run", "--bitcoin", "--runes"] # Put module flags after "run" commands to select which modules to run.
```
### Install from source
1. Install `go` version 1.22 or higher. See Go installation guide [here](https://go.dev/doc/install).
2. Clone this repository.
```bash
git clone https://github.com/gaze-network/gaze-indexer.git
cd gaze-indexer
```
3. Build the main binary.
```bash
# Get dependencies
go mod download
# Build the main binary
go build -o gaze main.go
```
4. Run the main binary with the `run` command and module flags.
```bash
./gaze run --bitcoin --runes
```
If `config.yaml` is not located at `./app/config.yaml`, use the `--config` flag to specify the path to the `config.yaml` file.
```bash
./gaze run --bitcoin --runes --config /path/to/config.yaml
```
# Gaze Indexer Network

View File

@@ -14,14 +14,14 @@ var (
// root command
cmd = &cobra.Command{
Use: "gaze",
Long: `Description of gaze indexer`,
Long: `Gaze in a Bitcoin meta-protocol indexer`,
}
// sub-commands
// sub-commandsf
cmds = []*cobra.Command{
NewVersionCommand(),
NewRunCommand(),
NewMigrateCommand(),
NewGenerateKeypairCommand(),
}
)
@@ -44,7 +44,7 @@ func Execute(ctx context.Context) {
// Initialize logger
if err := logger.Init(config.Logger); err != nil {
logger.PanicContext(ctx, "Something went wrong, can't init logger", slogx.Error(err), slog.Any("config", config.Logger))
logger.Panic("Failed to initialize logger: %v", slogx.Error(err), slog.Any("config", config.Logger))
}
})
@@ -53,7 +53,7 @@ func Execute(ctx context.Context) {
// Execute command
if err := cmd.ExecuteContext(ctx); err != nil {
// Cobra will print the error message by default
logger.DebugContext(ctx, "Error executing command", slogx.Error(err))
// use cobra to log error message by default
logger.Debug("Failed to execute root command", slogx.Error(err))
}
}

100
cmd/cmd_generate_keypair.go Normal file
View File

@@ -0,0 +1,100 @@
package cmd
import (
"crypto/rand"
"encoding/hex"
"fmt"
"os"
"path"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/pkg/crypto"
"github.com/spf13/cobra"
)
type generateKeypairCmdOptions struct {
Path string
}
func NewGenerateKeypairCommand() *cobra.Command {
opts := &generateKeypairCmdOptions{}
cmd := &cobra.Command{
Use: "generate-keypair",
Short: "Generate new public/private keypair for encryption and signature generation",
RunE: func(cmd *cobra.Command, args []string) error {
return generateKeypairHandler(opts, cmd, args)
},
}
flags := cmd.Flags()
flags.StringVar(&opts.Path, "path", "/data/keys", `Path to save to key pair file`)
return cmd
}
func generateKeypairHandler(opts *generateKeypairCmdOptions, _ *cobra.Command, _ []string) error {
fmt.Printf("Generating key pair\n")
privKeyBytes := make([]byte, 32)
_, err := rand.Read(privKeyBytes)
if err != nil {
return errors.Wrap(errs.SomethingWentWrong, "random bytes")
}
_, pubKey := btcec.PrivKeyFromBytes(privKeyBytes)
serializedPubKey := pubKey.SerializeCompressed()
// fmt.Println(hex.EncodeToString(privKeyBytes))
fmt.Printf("Public key: %s\n", hex.EncodeToString(serializedPubKey))
err = os.MkdirAll(opts.Path, 0o755)
if err != nil {
return errors.Wrap(errs.SomethingWentWrong, "create directory")
}
privateKeyPath := path.Join(opts.Path, "priv.key")
_, err = os.Stat(privateKeyPath)
if err == nil {
fmt.Printf("Existing private key found at %s\n[WARNING] THE EXISTING PRIVATE KEY WILL BE LOST\nType [replace] to replace existing private key: ", privateKeyPath)
var ans string
fmt.Scanln(&ans)
if ans != "replace" {
fmt.Printf("Keypair generation aborted\n")
return nil
}
}
err = os.WriteFile(privateKeyPath, []byte(hex.EncodeToString(privKeyBytes)), 0o644)
if err != nil {
return errors.Wrap(err, "write private key file")
}
fmt.Printf("Private key saved at %s\n", privateKeyPath)
wifKeyPath := path.Join(opts.Path, "priv_wif_mainnet.key")
client, err := crypto.New(hex.EncodeToString(privKeyBytes))
if err != nil {
return errors.Wrap(err, "new crypto client")
}
wifKey, err := client.WIF(&chaincfg.MainNetParams)
if err != nil {
return errors.Wrap(err, "get WIF key")
}
err = os.WriteFile(wifKeyPath, []byte(wifKey), 0o644)
if err != nil {
return errors.Wrap(err, "write WIF private key file")
}
fmt.Printf("WIF private key saved at %s\n", wifKeyPath)
publicKeyPath := path.Join(opts.Path, "pub.key")
err = os.WriteFile(publicKeyPath, []byte(hex.EncodeToString(serializedPubKey)), 0o644)
if err != nil {
return errors.Wrap(errs.SomethingWentWrong, "write public key file")
}
fmt.Printf("Public key saved at %s\n", publicKeyPath)
return nil
}

View File

@@ -1,20 +0,0 @@
package cmd
import (
"github.com/gaze-network/indexer-network/cmd/migrate"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/spf13/cobra"
)
func NewMigrateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate",
Short: "Migrate database schema",
}
cmd.AddCommand(
migrate.NewMigrateUpCommand(),
migrate.NewMigrateDownCommand(),
)
return cmd
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"runtime"
@@ -31,16 +30,13 @@ import (
"github.com/gaze-network/indexer-network/pkg/errorhandler"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gaze-network/indexer-network/pkg/reportingclientv2"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/compress"
fiberrecover "github.com/gofiber/fiber/v2/middleware/recover"
"github.com/samber/lo"
"github.com/spf13/cobra"
)
const (
shutdownTimeout = 60 * time.Second
"golang.org/x/sync/errgroup"
)
type runCmdOptions struct {
@@ -87,23 +83,22 @@ type HttpHandler interface {
func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
conf := config.Load()
// Validate inputs
{
if !conf.Network.IsSupported() {
return errors.Wrapf(errs.Unsupported, "%q network is not supported", conf.Network.String())
}
}
// Initialize application process context
// Initialize context
ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Initialize worker context to separate worker's lifecycle from main process
ctxWorker, stopWorker := context.WithCancel(context.Background())
defer stopWorker()
// Add logger context
ctxWorker = logger.WithContext(ctxWorker, slogx.Stringer("network", conf.Network))
ctx = logger.WithContext(ctx, slogx.Stringer("network", conf.Network))
// Load private key
privKeyPath := conf.NodeKey.Path
if privKeyPath == "" {
privKeyPath = "/data/keys/priv.key"
}
privKeyByte, err := os.ReadFile(privKeyPath)
if err != nil {
logger.PanicContext(ctx, "Failed to read private key file", slogx.Error(err))
}
// Initialize Bitcoin Core RPC Client
client, err := rpcclient.New(&rpcclient.ConnConfig{
@@ -114,18 +109,19 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
HTTPPostMode: true,
}, nil)
if err != nil {
logger.PanicContext(ctx, "Invalid Bitcoin node configuration", slogx.Error(err))
logger.PanicContext(ctx, "Failed to create Bitcoin Core RPC Client", slogx.Error(err))
}
defer client.Shutdown()
// Check Bitcoin RPC connection
{
start := time.Now()
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
if err := client.Ping(); err != nil {
logger.PanicContext(ctx, "Can't connect to Bitcoin Core RPC Server", slogx.String("host", conf.BitcoinNode.Host), slogx.Error(err))
}
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server", slog.Duration("latency", time.Since(start)))
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
if err := client.Ping(); err != nil {
logger.PanicContext(ctx, "Failed to ping Bitcoin Core RPC Server", slogx.Error(err))
}
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server")
// Validate network
if !conf.Network.IsSupported() {
return errors.Wrapf(errs.Unsupported, "%q network is not supported", conf.Network.String())
}
// TODO: create module command package.
@@ -134,20 +130,19 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
// TODO: refactor module name to specific type instead of string?
httpHandlers := make(map[string]HttpHandler, 0)
var reportingClient *reportingclient.ReportingClient
// use gracefulEG to coordinate graceful shutdown after context is done. (e.g. shut down http server, shutdown logic of each module, etc.)
gracefulEG, gctx := errgroup.WithContext(context.Background())
var reportingClient *reportingclientv2.ReportingClient
if !conf.Reporting.Disabled {
reportingClient, err = reportingclient.New(conf.Reporting)
reportingClient, err = reportingclientv2.New(conf.Reporting, string(privKeyByte)) // TODO: read private key from file
if err != nil {
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid reporting configuration", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create reporting client", slogx.Error(err))
logger.PanicContext(ctx, "Failed to create reporting client", slogx.Error(err))
}
}
// Initialize Bitcoin Indexer
if opts.Bitcoin {
ctx := logger.WithContext(ctx, slogx.String("module", "bitcoin"))
var (
btcDB btcdatagateway.BitcoinDataGateway
indexerInfoDB btcdatagateway.IndexerInformationDataGateway
@@ -156,70 +151,55 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
if err != nil {
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid Postgres configuration for indexer", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
repo := btcpostgres.NewRepository(pg)
btcDB = repo
indexerInfoDB = repo
default:
return errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Bitcoin.Database)
return errors.Wrapf(errs.Unsupported, "%q database is not supported", conf.Modules.Bitcoin.Database)
}
if !opts.APIOnly {
processor := bitcoin.NewProcessor(conf, btcDB, indexerInfoDB)
datasource := datasources.NewBitcoinNode(client)
indexer := indexers.NewBitcoinIndexer(processor, datasource)
defer func() {
if err := indexer.ShutdownWithTimeout(shutdownTimeout); err != nil {
logger.ErrorContext(ctx, "Error during shutdown indexer", slogx.Error(err))
return
}
logger.InfoContext(ctx, "Indexer stopped gracefully")
}()
bitcoinProcessor := bitcoin.NewProcessor(conf, btcDB, indexerInfoDB)
bitcoinNodeDatasource := datasources.NewBitcoinNode(client)
bitcoinIndexer := indexers.NewBitcoinIndexer(bitcoinProcessor, bitcoinNodeDatasource)
// Verify states before running Indexer
if err := processor.VerifyStates(ctx); err != nil {
if err := bitcoinProcessor.VerifyStates(ctx); err != nil {
return errors.WithStack(err)
}
// Run Indexer
go func() {
// stop main process if indexer stopped
defer stop()
logger.InfoContext(ctx, "Starting Gaze Indexer")
if err := indexer.Run(ctxWorker); err != nil {
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
logger.InfoContext(ctx, "Starting Bitcoin Indexer")
if err := bitcoinIndexer.Run(ctx); err != nil {
logger.ErrorContext(ctx, "Failed to run Bitcoin Indexer", slogx.Error(err))
}
// stop main process if Bitcoin Indexer failed
logger.InfoContext(ctx, "Bitcoin Indexer stopped. Stopping main process...")
stop()
}()
}
}
// Initialize Runes Indexer
if opts.Runes {
ctx := logger.WithContext(ctx, slogx.String("module", "runes"))
var (
runesDg runesdatagateway.RunesDataGateway
indexerInfoDg runesdatagateway.IndexerInfoDataGateway
)
var runesDg runesdatagateway.RunesDataGateway
var indexerInfoDg runesdatagateway.IndexerInfoDataGateway
switch strings.ToLower(conf.Modules.Runes.Database) {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
if err != nil {
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid Postgres configuration for indexer", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
runesRepo := runespostgres.NewRepository(pg)
runesDg = runesRepo
indexerInfoDg = runesRepo
default:
return errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Runes.Database)
logger.PanicContext(ctx, "Unsupported database", slogx.String("database", conf.Modules.Runes.Database))
}
var bitcoinDatasource indexers.BitcoinDatasource
var bitcoinClient btcclient.Contract
@@ -229,12 +209,9 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
bitcoinDatasource = bitcoinNodeDatasource
bitcoinClient = bitcoinNodeDatasource
case "database":
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
if err != nil {
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid Postgres configuration for datasource", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
btcRepo := btcpostgres.NewRepository(pg)
@@ -244,31 +221,24 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
default:
return errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
}
if !opts.APIOnly {
processor := runes.NewProcessor(runesDg, indexerInfoDg, bitcoinClient, bitcoinDatasource, conf.Network, reportingClient)
indexer := indexers.NewBitcoinIndexer(processor, bitcoinDatasource)
defer func() {
if err := indexer.ShutdownWithTimeout(shutdownTimeout); err != nil {
logger.ErrorContext(ctx, "Error during shutdown indexer", slogx.Error(err))
return
}
logger.InfoContext(ctx, "Indexer stopped gracefully")
}()
runesProcessor := runes.NewProcessor(runesDg, indexerInfoDg, bitcoinClient, bitcoinDatasource, conf.Network, reportingClient)
runesIndexer := indexers.NewBitcoinIndexer(runesProcessor, bitcoinDatasource)
if err := processor.VerifyStates(ctx); err != nil {
if err := runesProcessor.VerifyStates(ctx); err != nil {
return errors.WithStack(err)
}
// Run Indexer
go func() {
// stop main process if indexer stopped
defer stop()
logger.InfoContext(ctx, "Starting Gaze Indexer")
if err := indexer.Run(ctxWorker); err != nil {
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
logger.InfoContext(ctx, "Started Runes Indexer")
if err := runesIndexer.Run(ctx); err != nil {
logger.ErrorContext(ctx, "Failed to run Runes Indexer", slogx.Error(err))
}
// stop main process if Runes Indexer failed
logger.InfoContext(ctx, "Runes Indexer stopped. Stopping main process...")
stop()
}()
}
@@ -281,7 +251,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
runesHTTPHandler := runesapi.NewHTTPHandler(conf.Network, runesUsecase)
httpHandlers["runes"] = runesHTTPHandler
default:
logger.PanicContext(ctx, "Something went wrong, unsupported API handler", slogx.String("handler", handler))
logger.PanicContext(ctx, "Unsupported API handler", slogx.String("handler", handler))
}
}
}
@@ -290,7 +260,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
// Setup HTTP server if there are any HTTP handlers
if len(httpHandlers) > 0 {
app := fiber.New(fiber.Config{
AppName: "Gaze Indexer",
AppName: "gaze",
ErrorHandler: errorhandler.NewHTTPErrorHandler(),
})
app.
@@ -299,72 +269,44 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
StackTraceHandler: func(c *fiber.Ctx, e interface{}) {
buf := make([]byte, 1024) // bufLen = 1024
buf = buf[:runtime.Stack(buf, false)]
logger.ErrorContext(c.UserContext(), "Something went wrong, panic in http handler", slogx.Any("panic", e), slog.String("stacktrace", string(buf)))
logger.ErrorContext(c.UserContext(), "panic in http handler", slogx.Any("panic", e), slog.String("stacktrace", string(buf)))
},
})).
Use(compress.New(compress.Config{
Level: compress.LevelDefault,
}))
defer func() {
if err := app.ShutdownWithTimeout(shutdownTimeout); err != nil {
logger.ErrorContext(ctx, "Error during shutdown HTTP server", slogx.Error(err))
return
}
logger.InfoContext(ctx, "HTTP server stopped gracefully")
}()
// Health check
app.Get("/", func(c *fiber.Ctx) error {
return errors.WithStack(c.SendStatus(http.StatusOK))
})
// mount http handlers from each http-enabled module
for module, handler := range httpHandlers {
if err := handler.Mount(app); err != nil {
logger.PanicContext(ctx, "Something went wrong, can't mount HTTP handler", slogx.Error(err), slogx.String("module", module))
logger.PanicContext(ctx, "Failed to mount HTTP handler", slogx.Error(err), slogx.String("module", module))
}
logger.InfoContext(ctx, "Mounted HTTP handler", slogx.String("module", module))
}
go func() {
// stop main process if API stopped
defer stop()
logger.InfoContext(ctx, "Started HTTP server", slog.Int("port", conf.HTTPServer.Port))
if err := app.Listen(fmt.Sprintf(":%d", conf.HTTPServer.Port)); err != nil {
logger.PanicContext(ctx, "Something went wrong, error during running HTTP server", slogx.Error(err))
logger.PanicContext(ctx, "Failed to start HTTP server", slogx.Error(err))
}
}()
// handle graceful shutdown
gracefulEG.Go(func() error {
<-ctx.Done()
logger.InfoContext(gctx, "Stopping HTTP server...")
if err := app.ShutdownWithTimeout(60 * time.Second); err != nil {
logger.ErrorContext(gctx, "Error during shutdown HTTP server", slogx.Error(err))
}
logger.InfoContext(gctx, "HTTP server stopped gracefully")
return nil
})
}
// Stop application if worker context is done
go func() {
<-ctxWorker.Done()
defer stop()
logger.InfoContext(ctx, "Gaze Indexer Worker is stopped. Stopping application...")
}()
logger.InfoContext(ctxWorker, "Gaze Indexer started")
// Wait for interrupt signal to gracefully stop the server
<-ctx.Done()
// Force shutdown if timeout exceeded or got signal again
go func() {
defer os.Exit(1)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer stop()
select {
case <-ctx.Done():
logger.FatalContext(ctx, "Received exit signal again. Force shutdown...")
case <-time.After(shutdownTimeout + 15*time.Second):
logger.FatalContext(ctx, "Shutdown timeout exceeded. Force shutdown...")
}
}()
// wait until all graceful shutdown goroutines are done before returning
if err := gracefulEG.Wait(); err != nil {
logger.ErrorContext(ctx, "Failed to shutdown gracefully", slogx.Error(err))
} else {
logger.InfoContext(ctx, "Successfully shut down gracefully")
}
return nil
}

View File

@@ -1,128 +0,0 @@
package migrate
import (
"fmt"
"net/url"
"strconv"
"strings"
"github.com/cockroachdb/errors"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/samber/lo"
"github.com/spf13/cobra"
)
type migrateDownCmdOptions struct {
DatabaseURL string
Bitcoin bool
Runes bool
All bool
}
type migrateDownCmdArgs struct {
N int
}
func (a *migrateDownCmdArgs) ParseArgs(args []string) error {
if len(args) > 0 {
// assume args already validated by cobra to be len(args) <= 1
n, err := strconv.Atoi(args[0])
if err != nil {
return errors.Wrap(err, "failed to parse N")
}
if n < 0 {
return errors.New("N must be a positive integer")
}
a.N = n
}
return nil
}
func NewMigrateDownCommand() *cobra.Command {
opts := &migrateDownCmdOptions{}
cmd := &cobra.Command{
Use: "down [N]",
Short: "Apply all or N down migrations",
Args: cobra.MaximumNArgs(1),
Example: `gaze migrate down --database "postgres://postgres:postgres@localhost:5432/gaze-indexer?sslmode=disable"`,
RunE: func(cmd *cobra.Command, args []string) error {
// args already validated by cobra
var downArgs migrateDownCmdArgs
if err := downArgs.ParseArgs(args); err != nil {
return errors.Wrap(err, "failed to parse args")
}
return migrateDownHandler(opts, cmd, downArgs)
},
}
flags := cmd.Flags()
flags.BoolVar(&opts.Bitcoin, "bitcoin", false, "Apply Bitcoin down migrations")
flags.BoolVar(&opts.Runes, "runes", false, "Apply Runes down migrations")
flags.StringVar(&opts.DatabaseURL, "database", "", "Database url to run migration on")
flags.BoolVar(&opts.All, "all", false, "Confirm apply ALL down migrations without prompt")
return cmd
}
func migrateDownHandler(opts *migrateDownCmdOptions, _ *cobra.Command, args migrateDownCmdArgs) error {
if opts.DatabaseURL == "" {
return errors.New("--database is required")
}
databaseURL, err := url.Parse(opts.DatabaseURL)
if err != nil {
return errors.Wrap(err, "failed to parse database URL")
}
if _, ok := supportedDrivers[databaseURL.Scheme]; !ok {
return errors.Errorf("unsupported database driver: %s", databaseURL.Scheme)
}
// prevent accidental down all migrations
if args.N == 0 && !opts.All {
input := ""
fmt.Print("Are you sure you want to apply all down migrations? (y/N):")
fmt.Scanln(&input)
if !lo.Contains([]string{"y", "yes"}, strings.ToLower(input)) {
return nil
}
}
applyDownMigrations := func(module string, sourcePath string, migrationTable string) error {
newDatabaseURL := cloneURLWithQuery(databaseURL, url.Values{"x-migrations-table": {migrationTable}})
sourceURL := "file://" + sourcePath
m, err := migrate.New(sourceURL, newDatabaseURL.String())
m.Log = &consoleLogger{
prefix: fmt.Sprintf("[%s] ", module),
}
if err != nil {
return errors.Wrap(err, "failed to create Migrate instance")
}
if args.N == 0 {
m.Log.Printf("Applying down migrations...\n")
err = m.Down()
} else {
m.Log.Printf("Applying %d down migrations...\n", args.N)
err = m.Steps(-args.N)
}
if err != nil {
if !errors.Is(err, migrate.ErrNoChange) {
return errors.Wrapf(err, "failed to apply %s down migrations", module)
}
m.Log.Printf("No more down migrations to apply\n")
}
return nil
}
if opts.Bitcoin {
if err := applyDownMigrations("Bitcoin", bitcoinMigrationSource, "bitcoin_schema_migrations"); err != nil {
return errors.WithStack(err)
}
}
if opts.Runes {
if err := applyDownMigrations("Runes", runesMigrationSource, "runes_schema_migrations"); err != nil {
return errors.WithStack(err)
}
}
return nil
}

View File

@@ -1,112 +0,0 @@
package migrate
import (
"fmt"
"net/url"
"strconv"
"github.com/cockroachdb/errors"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/spf13/cobra"
)
type migrateUpCmdOptions struct {
DatabaseURL string
Bitcoin bool
Runes bool
}
type migrateUpCmdArgs struct {
N int
}
func (a *migrateUpCmdArgs) ParseArgs(args []string) error {
if len(args) > 0 {
// assume args already validated by cobra to be len(args) <= 1
n, err := strconv.Atoi(args[0])
if err != nil {
return errors.Wrap(err, "failed to parse N")
}
a.N = n
}
return nil
}
func NewMigrateUpCommand() *cobra.Command {
opts := &migrateUpCmdOptions{}
cmd := &cobra.Command{
Use: "up [N]",
Short: "Apply all or N up migrations",
Args: cobra.MaximumNArgs(1),
Example: `gaze migrate up --database "postgres://postgres:postgres@localhost:5432/gaze-indexer?sslmode=disable"`,
RunE: func(cmd *cobra.Command, args []string) error {
// args already validated by cobra
var upArgs migrateUpCmdArgs
if err := upArgs.ParseArgs(args); err != nil {
return errors.Wrap(err, "failed to parse args")
}
return migrateUpHandler(opts, cmd, upArgs)
},
}
flags := cmd.Flags()
flags.BoolVar(&opts.Bitcoin, "bitcoin", false, "Apply Bitcoin up migrations")
flags.BoolVar(&opts.Runes, "runes", false, "Apply Runes up migrations")
flags.StringVar(&opts.DatabaseURL, "database", "", "Database url to run migration on")
return cmd
}
func migrateUpHandler(opts *migrateUpCmdOptions, _ *cobra.Command, args migrateUpCmdArgs) error {
if opts.DatabaseURL == "" {
return errors.New("--database is required")
}
databaseURL, err := url.Parse(opts.DatabaseURL)
if err != nil {
return errors.Wrap(err, "failed to parse database URL")
}
if _, ok := supportedDrivers[databaseURL.Scheme]; !ok {
return errors.Errorf("unsupported database driver: %s", databaseURL.Scheme)
}
applyUpMigrations := func(module string, sourcePath string, migrationTable string) error {
newDatabaseURL := cloneURLWithQuery(databaseURL, url.Values{"x-migrations-table": {migrationTable}})
sourceURL := "file://" + sourcePath
m, err := migrate.New(sourceURL, newDatabaseURL.String())
m.Log = &consoleLogger{
prefix: fmt.Sprintf("[%s] ", module),
}
if err != nil {
return errors.Wrap(err, "failed to create Migrate instance")
}
if args.N == 0 {
m.Log.Printf("Applying up migrations...\n")
err = m.Up()
} else {
m.Log.Printf("Applying %d up migrations...\n", args.N)
err = m.Steps(args.N)
}
if err != nil {
if !errors.Is(err, migrate.ErrNoChange) {
return errors.Wrapf(err, "failed to apply %s up migrations", module)
}
m.Log.Printf("Migrations already up-to-date\n")
}
return nil
}
if opts.Bitcoin {
if err := applyUpMigrations("Bitcoin", bitcoinMigrationSource, "bitcoin_schema_migrations"); err != nil {
return errors.WithStack(err)
}
}
if opts.Runes {
if err := applyUpMigrations("Runes", runesMigrationSource, "runes_schema_migrations"); err != nil {
return errors.WithStack(err)
}
}
return nil
}

View File

@@ -1,22 +0,0 @@
package migrate
import (
"fmt"
"github.com/golang-migrate/migrate/v4"
)
var _ migrate.Logger = (*consoleLogger)(nil)
type consoleLogger struct {
prefix string
verbose bool
}
func (l *consoleLogger) Printf(format string, v ...interface{}) {
fmt.Printf(l.prefix+format, v...)
}
func (l *consoleLogger) Verbose() bool {
return l.verbose
}

View File

@@ -1,25 +0,0 @@
package migrate
import "net/url"
const (
bitcoinMigrationSource = "modules/bitcoin/database/postgresql/migrations"
runesMigrationSource = "modules/runes/database/postgresql/migrations"
)
func cloneURLWithQuery(u *url.URL, newQuery url.Values) *url.URL {
clone := *u
query := clone.Query()
for key, values := range newQuery {
for _, value := range values {
query.Add(key, value)
}
}
clone.RawQuery = query.Encode()
return &clone
}
var supportedDrivers = map[string]struct{}{
"postgres": {},
"postgresql": {},
}

View File

@@ -32,7 +32,7 @@ modules:
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database connection configurations
runes:
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
datasource: "postgres" # Data source to be used (to fetch bitcoin blocks). current supported data sources: "bitcoin-node" | "postgres"
datasource: "bitcoin-node" # Data source to be used. current supported data sources: "bitcoin-node" | "postgres"
api_handlers: # API handlers to be used. current supported handlers: "http"
- http
postgres:

View File

@@ -39,7 +39,7 @@ func NewBitcoinNode(btcclient *rpcclient.Client) *BitcoinNodeDatasource {
}
func (p BitcoinNodeDatasource) Name() string {
return "bitcoin_node"
return "BitcoinNode"
}
// Fetch polling blocks from Bitcoin node
@@ -83,11 +83,6 @@ func (d *BitcoinNodeDatasource) Fetch(ctx context.Context, from, to int64) ([]*t
// - from: block height to start fetching, if -1, it will start from genesis block
// - to: block height to stop fetching, if -1, it will fetch until the latest block
func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
ctx = logger.WithContext(ctx,
slogx.String("package", "datasources"),
slogx.String("datasource", d.Name()),
)
from, to, skip, err := d.prepareRange(from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
@@ -143,10 +138,10 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
if errors.Is(err, errs.Closed) {
return
}
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
logger.WarnContext(ctx, "failed while dispatch block",
slogx.Error(err),
slogx.Int64("start", data[0].Header.Height),
slogx.Int64("end", data[len(data)-1].Header.Height),
slogx.Error(err),
)
}
case <-ctx.Done():
@@ -162,7 +157,6 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
done := subscription.Done()
chunks := lo.Chunk(blockHeights, blockStreamChunkSize)
for _, chunk := range chunks {
// TODO: Implement throttling logic to control the rate of fetching blocks (block/sec)
chunk := chunk
select {
case <-done:
@@ -173,11 +167,12 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
stream.Go(func() []*types.Block {
startAt := time.Now()
defer func() {
logger.DebugContext(ctx, "Fetched chunk of blocks from Bitcoin node",
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched blocks",
slogx.Int("total_blocks", len(chunk)),
slogx.Int64("from", chunk[0]),
slogx.Int64("to", chunk[len(chunk)-1]),
slogx.Duration("duration", time.Since(startAt)),
slogx.Stringer("duration", time.Since(startAt)),
slogx.Int64("duration_ms", time.Since(startAt).Milliseconds()),
)
}()
// TODO: should concurrent fetch block or not ?
@@ -185,21 +180,22 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
for _, height := range chunk {
hash, err := d.btcclient.GetBlockHash(height)
if err != nil {
logger.ErrorContext(ctx, "Can't get block hash from Bitcoin node rpc", slogx.Error(err), slogx.Int64("height", height))
logger.ErrorContext(ctx, "failed to get block hash", slogx.Error(err), slogx.Int64("height", height))
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get block hash: height: %d", height)); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
}
return nil
}
block, err := d.btcclient.GetBlock(hash)
if err != nil {
logger.ErrorContext(ctx, "Can't get block data from Bitcoin node rpc", slogx.Error(err), slogx.Int64("height", height))
logger.ErrorContext(ctx, "failed to get block", slogx.Error(err), slogx.Int64("height", height))
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get block: height: %d, hash: %s", height, hash)); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
}
return nil
}
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched block", slogx.Int64("height", height), slogx.String("hash", hash.String()))
blocks = append(blocks, types.ParseMsgBlock(block, height))
}

View File

@@ -3,7 +3,6 @@ package indexers
import (
"context"
"log/slog"
"sync"
"time"
"github.com/cockroachdb/errors"
@@ -31,10 +30,6 @@ type BitcoinIndexer struct {
Processor BitcoinProcessor
Datasource BitcoinDatasource
currentBlock types.BlockHeader
quitOnce sync.Once
quit chan struct{}
done chan struct{}
}
// NewBitcoinIndexer create new BitcoinIndexer
@@ -42,46 +37,12 @@ func NewBitcoinIndexer(processor BitcoinProcessor, datasource BitcoinDatasource)
return &BitcoinIndexer{
Processor: processor,
Datasource: datasource,
quit: make(chan struct{}),
done: make(chan struct{}),
}
}
func (*BitcoinIndexer) Type() string {
return "bitcoin"
}
func (i *BitcoinIndexer) Shutdown() error {
return i.ShutdownWithContext(context.Background())
}
func (i *BitcoinIndexer) ShutdownWithTimeout(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return i.ShutdownWithContext(ctx)
}
func (i *BitcoinIndexer) ShutdownWithContext(ctx context.Context) (err error) {
i.quitOnce.Do(func() {
close(i.quit)
select {
case <-i.done:
case <-time.After(180 * time.Second):
err = errors.Wrap(errs.Timeout, "indexer shutdown timeout")
case <-ctx.Done():
err = errors.Wrap(ctx.Err(), "indexer shutdown context canceled")
}
})
return
}
func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
defer close(i.done)
ctx = logger.WithContext(ctx,
slog.String("package", "indexers"),
slog.String("indexer", i.Type()),
slog.String("indexer", "bitcoin"),
slog.String("processor", i.Processor.Name()),
slog.String("datasource", i.Datasource.Name()),
)
@@ -95,41 +56,38 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
i.currentBlock.Height = -1
}
ticker := time.NewTicker(pollingInterval)
// TODO:
// - compare db version in constants and database
// - compare current network and local indexed network
// - update indexer stats
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-i.quit:
logger.InfoContext(ctx, "Got quit signal, stopping indexer")
return nil
case <-ctx.Done():
return nil
case <-ticker.C:
if err := i.process(ctx); err != nil {
logger.ErrorContext(ctx, "Indexer failed while processing", slogx.Error(err))
return errors.Wrap(err, "process failed")
logger.ErrorContext(ctx, "failed to process", slogx.Error(err))
return errors.Wrap(err, "failed to process")
}
logger.DebugContext(ctx, "Waiting for next polling interval")
}
}
}
func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
// height range to fetch data
from, to := i.currentBlock.Height+1, int64(-1)
logger.InfoContext(ctx, "Start fetching bitcoin blocks", slog.Int64("from", from))
ch := make(chan []*types.Block)
from, to := i.currentBlock.Height+1, int64(-1)
logger.InfoContext(ctx, "Fetching blocks", slog.Int64("from", from), slog.Int64("to", to))
subscription, err := i.Datasource.FetchAsync(ctx, from, to, ch)
if err != nil {
return errors.Wrap(err, "failed to fetch data")
return errors.Wrap(err, "failed to call fetch async")
}
defer subscription.Unsubscribe()
for {
select {
case <-i.quit:
return nil
case blocks := <-ch:
// empty blocks
if len(blocks) == 0 {
@@ -138,6 +96,7 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
startAt := time.Now()
ctx := logger.WithContext(ctx,
slog.Int("total_blocks", len(blocks)),
slogx.Int64("from", blocks[0].Header.Height),
slogx.Int64("to", blocks[len(blocks)-1].Header.Height),
)
@@ -146,8 +105,7 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
{
remoteBlockHeader := blocks[0].Header
if !remoteBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
logger.WarnContext(ctx, "Detected chain reorganization. Searching for fork point...",
slogx.String("event", "reorg_detected"),
logger.WarnContext(ctx, "Reorg detected",
slogx.Stringer("current_hash", i.currentBlock.Hash),
slogx.Stringer("expected_hash", remoteBlockHeader.PrevBlock),
)
@@ -186,15 +144,12 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
return errors.Wrap(errs.SomethingWentWrong, "reorg look back limit reached")
}
logger.InfoContext(ctx, "Found reorg fork point, starting to revert data...",
slogx.String("event", "reorg_forkpoint"),
slogx.Int64("since", beforeReorgBlockHeader.Height+1),
slogx.Int64("total_blocks", i.currentBlock.Height-beforeReorgBlockHeader.Height),
slogx.Duration("search_duration", time.Since(start)),
)
// Revert all data since the reorg block
start = time.Now()
logger.WarnContext(ctx, "reverting reorg data",
slogx.Int64("reorg_from", beforeReorgBlockHeader.Height+1),
slogx.Int64("total_reorg_blocks", i.currentBlock.Height-beforeReorgBlockHeader.Height),
slogx.Stringer("detect_duration", time.Since(start)),
)
if err := i.Processor.RevertData(ctx, beforeReorgBlockHeader.Height+1); err != nil {
return errors.Wrap(err, "failed to revert data")
}
@@ -202,9 +157,10 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
// Set current block to before reorg block and
// end current round to fetch again
i.currentBlock = beforeReorgBlockHeader
logger.Info("Fixing chain reorganization completed",
slogx.Int64("current_block", i.currentBlock.Height),
slogx.Duration("duration", time.Since(start)),
logger.Info("Reverted data successfully",
slogx.Any("current_block", i.currentBlock),
slogx.Stringer("duration", time.Since(start)),
slogx.Int64("duration_ms", time.Since(start).Milliseconds()),
)
return nil
}
@@ -217,15 +173,12 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
}
if !blocks[i].Header.PrevBlock.IsEqual(&blocks[i-1].Header.Hash) {
logger.WarnContext(ctx, "Chain Reorganization occurred in the middle of batch fetching blocks, need to try to fetch again")
logger.WarnContext(ctx, "reorg occurred while batch fetching blocks, need to try to fetch again")
// end current round
return nil
}
}
ctx = logger.WithContext(ctx, slog.Int("total_blocks", len(blocks)))
// Start processing blocks
logger.InfoContext(ctx, "Processing blocks")
if err := i.Processor.Process(ctx, blocks); err != nil {
@@ -236,9 +189,8 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
i.currentBlock = blocks[len(blocks)-1].Header
logger.InfoContext(ctx, "Processed blocks successfully",
slogx.String("event", "processed_blocks"),
slogx.Int64("current_block", i.currentBlock.Height),
slogx.Duration("duration", time.Since(startAt)),
slogx.Stringer("duration", time.Since(startAt)),
slogx.Int64("duration_ms", time.Since(startAt).Milliseconds()),
)
case <-subscription.Done():
// end current round

View File

@@ -2,22 +2,12 @@ package indexers
import (
"context"
"time"
"github.com/gaze-network/indexer-network/core/types"
)
const (
// pollingInterval is the default polling interval for the indexer polling worker
pollingInterval = 15 * time.Second
)
type IndexerWorker interface {
Type() string
Run(ctx context.Context) error
Shutdown() error
ShutdownWithTimeout(timeout time.Duration) error
ShutdownWithContext(ctx context.Context) error
}
type Processor[T any] interface {

1
docker/.gitignore vendored
View File

@@ -1 +0,0 @@
volumes

View File

@@ -1,43 +0,0 @@
logger:
output: text
debug: false
bitcoin_node:
host: "bitcoin-mainnet-archive.allthatnode.com"
user: "user"
pass: "pass"
disable_tls: false
network: mainnet
reporting:
disabled: false
base_url: "https://staging.indexer.api.gaze.network" # defaults to "https://indexer.api.gaze.network" if empty
name: "Local Indexer"
website_url: "" # public website URL to show on the dashboard. Can be left empty.
indexer_api_url: "" # public url to access this api. Can be left empty.
้http_server:
port: 8080
modules:
bitcoin:
database: "postgres" # Store bitcoin data in postgres
postgres:
host: "db"
port: 5432
user: "postgres"
password: "password"
db_name: "postgres"
runes:
database: "postgres" # Store Runes data in postgres
datasource: "postgres" # Fetch bitcoin blocks from postgres
api_handlers:
- http
postgres:
host: "db"
port: 5432
user: "postgres"
password: "password"
db_name: "postgres"

View File

@@ -1,47 +0,0 @@
services:
# TODO: need to mount migrations folder
gaze-migrator:
# image: ghcr.io/gaze-network/gaze-indexer:v0.1.0
build:
context: ../
dockerfile: ./docker/Dockerfile
restart: on-failure
depends_on:
- db
networks:
- indexer
command:
["/app/main", "migrate", "up", "--bitcoin", "--runes", "--database", "postgres://postgres:password@db:5432/postgres?sslmode=disable"]
gaze-indexer:
# image: ghcr.io/gaze-network/gaze-indexer:v0.1.0
build:
context: ../
dockerfile: ./docker/Dockerfile
restart: unless-stopped
depends_on:
- db
- gaze-migrator:
condition: service_completed_successfully
networks:
- indexer
ports:
- 8080:8080
volumes:
- "./config.example.yaml:/app/config.yaml"
command: ["/app/main", "run", "--bitcoin", "--runes"]
db:
image: postgres:16-alpine
volumes:
- "./volumes/postgresql/data:/var/lib/postgresql/data"
networks:
- indexer
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
- POSTGRES_DB=postgres
networks:
indexer:
driver: bridge

18
go.mod
View File

@@ -10,7 +10,6 @@ require (
github.com/cockroachdb/errors v1.11.1
github.com/gaze-network/uint128 v1.3.0
github.com/gofiber/fiber/v2 v2.52.4
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/jackc/pgx v3.6.2+incompatible
github.com/jackc/pgx/v5 v5.5.5
github.com/mcosta74/pgx-slog v0.3.0
@@ -21,28 +20,27 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.8.4
github.com/valyala/fasthttp v1.51.0
go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.5.0
)
require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/ecies/go/v2 v2.0.9 // indirect
github.com/ethereum/go-ethereum v1.13.5 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
@@ -51,7 +49,6 @@ require (
github.com/klauspost/compress v1.17.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
@@ -69,12 +66,13 @@ require (
github.com/spf13/cast v1.6.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.51.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

1095
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -12,15 +12,14 @@ import (
runesconfig "github.com/gaze-network/indexer-network/modules/runes/config"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gaze-network/indexer-network/pkg/reportingclientv2"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)
var (
isInit bool
mu sync.Mutex
config = &Config{
configOnce sync.Once
config = &Config{
Logger: logger.Config{
Output: "TEXT",
},
@@ -33,12 +32,17 @@ var (
)
type Config struct {
Logger logger.Config `mapstructure:"logger"`
BitcoinNode BitcoinNodeClient `mapstructure:"bitcoin_node"`
Network common.Network `mapstructure:"network"`
HTTPServer HTTPServerConfig `mapstructure:"http_server"`
Modules Modules `mapstructure:"modules"`
Reporting reportingclient.Config `mapstructure:"reporting"`
Logger logger.Config `mapstructure:"logger"`
BitcoinNode BitcoinNodeClient `mapstructure:"bitcoin_node"`
Network common.Network `mapstructure:"network"`
HTTPServer HTTPServerConfig `mapstructure:"http_server"`
Modules Modules `mapstructure:"modules"`
Reporting reportingclientv2.Config `mapstructure:"reporting"`
NodeKey NodeKey `mapstructure:"node_key"`
}
type NodeKey struct {
Path string `mapstructure:"path"`
}
type BitcoinNodeClient struct {
@@ -59,38 +63,6 @@ type HTTPServerConfig struct {
// Parse parse the configuration from environment variables
func Parse(configFile ...string) Config {
mu.Lock()
defer mu.Unlock()
return parse(configFile...)
}
// Load returns the loaded configuration
func Load() Config {
mu.Lock()
defer mu.Unlock()
if isInit {
return *config
}
return parse()
}
// BindPFlag binds a specific key to a pflag (as used by cobra).
// Example (where serverCmd is a Cobra instance):
//
// serverCmd.Flags().Int("port", 1138, "Port to run Application server on")
// Viper.BindPFlag("port", serverCmd.Flags().Lookup("port"))
func BindPFlag(key string, flag *pflag.Flag) {
if err := viper.BindPFlag(key, flag); err != nil {
logger.Panic("Something went wrong, failed to bind flag for config", slog.String("package", "config"), slogx.Error(err))
}
}
// SetDefault sets the default value for this key.
// SetDefault is case-insensitive for a key.
// Default only used when no value is provided by the user via flag, config or ENV.
func SetDefault(key string, value any) { viper.SetDefault(key, value) }
func parse(configFile ...string) Config {
ctx := logger.WithContext(context.Background(), slog.String("package", "config"))
if len(configFile) > 0 && configFile[0] != "" {
@@ -105,16 +77,39 @@ func parse(configFile ...string) Config {
if err := viper.ReadInConfig(); err != nil {
var errNotfound viper.ConfigFileNotFoundError
if errors.As(err, &errNotfound) {
logger.WarnContext(ctx, "Config file not found, use default config value", slogx.Error(err))
logger.WarnContext(ctx, "config file not found, use default value", slogx.Error(err))
} else {
logger.PanicContext(ctx, "Invalid config file", slogx.Error(err))
logger.PanicContext(ctx, "invalid config file", slogx.Error(err))
}
}
if err := viper.Unmarshal(&config); err != nil {
logger.PanicContext(ctx, "Something went wrong, failed to unmarshal config", slogx.Error(err))
logger.PanicContext(ctx, "failed to unmarshal config", slogx.Error(err))
}
isInit = true
return *config
}
// Load returns the loaded configuration
func Load() Config {
configOnce.Do(func() {
_ = Parse()
})
return *config
}
// BindPFlag binds a specific key to a pflag (as used by cobra).
// Example (where serverCmd is a Cobra instance):
//
// serverCmd.Flags().Int("port", 1138, "Port to run Application server on")
// Viper.BindPFlag("port", serverCmd.Flags().Lookup("port"))
func BindPFlag(key string, flag *pflag.Flag) {
if err := viper.BindPFlag(key, flag); err != nil {
logger.Panic("Failed to bind pflag for config", slogx.Error(err))
}
}
// SetDefault sets the default value for this key.
// SetDefault is case-insensitive for a key.
// Default only used when no value is provided by the user via flag, config or ENV.
func SetDefault(key string, value any) { viper.SetDefault(key, value) }

View File

@@ -6,7 +6,6 @@ import (
"github.com/Cleverse/go-utilities/utils"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
@@ -63,7 +62,7 @@ func NewPool(ctx context.Context, conf Config) (*pgxpool.Pool, error) {
// Prepare connection pool configuration
connConfig, err := pgxpool.ParseConfig(conf.String())
if err != nil {
return nil, errors.Join(errs.InvalidArgument, errors.Wrap(err, "failed while parse config"))
return nil, errors.Wrap(err, "failed to parse config to create a new connection pool")
}
connConfig.MaxConns = utils.Default(conf.MaxConns, DefaultMaxConns)
connConfig.MinConns = utils.Default(conf.MinConns, DefaultMinConns)

View File

@@ -37,13 +37,13 @@ func NewClientDatabase(bitcoinDg datagateway.BitcoinDataGateway) *ClientDatabase
}
}
func (d ClientDatabase) Name() string {
return "bitcoin_database"
func (c ClientDatabase) Name() string {
return "BitcoinDatabase"
}
func (d *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
func (c *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
ch := make(chan []*types.Block)
subscription, err := d.FetchAsync(ctx, from, to, ch)
subscription, err := c.FetchAsync(ctx, from, to, ch)
if err != nil {
return nil, errors.WithStack(err)
}
@@ -73,13 +73,8 @@ func (d *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Bl
}
}
func (d *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
ctx = logger.WithContext(ctx,
slogx.String("package", "datasources"),
slogx.String("datasource", d.Name()),
)
from, to, skip, err := d.prepareRange(ctx, from, to)
func (c *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
from, to, skip, err := c.prepareRange(ctx, from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
}
@@ -134,10 +129,10 @@ func (d *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan
if errors.Is(err, errs.Closed) {
return
}
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
logger.WarnContext(ctx, "failed while dispatch block",
slogx.Error(err),
slogx.Int64("start", data[0].Header.Height),
slogx.Int64("end", data[len(data)-1].Header.Height),
slogx.Error(err),
)
}
case <-ctx.Done():
@@ -164,26 +159,16 @@ func (d *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan
continue
}
stream.Go(func() []*types.Block {
startAt := time.Now()
defer func() {
logger.DebugContext(ctx, "Fetched chunk of blocks from Bitcoin node",
slogx.Int("total_blocks", len(chunk)),
slogx.Int64("from", chunk[0]),
slogx.Int64("to", chunk[len(chunk)-1]),
slogx.Duration("duration", time.Since(startAt)),
)
}()
fromHeight, toHeight := chunk[0], chunk[len(chunk)-1]
blocks, err := d.bitcoinDg.GetBlocksByHeightRange(ctx, fromHeight, toHeight)
blocks, err := c.bitcoinDg.GetBlocksByHeightRange(ctx, fromHeight, toHeight)
if err != nil {
logger.ErrorContext(ctx, "Can't get block data from Bitcoin database",
logger.ErrorContext(ctx, "failed to get blocks",
slogx.Error(err),
slogx.Int64("from", fromHeight),
slogx.Int64("to", toHeight),
slogx.Int64("from_height", fromHeight),
slogx.Int64("to_height", toHeight),
)
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get blocks: from_height: %d, to_height: %d", fromHeight, toHeight)); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
}
return nil
}

View File

@@ -1,8 +1,6 @@
package bitcoin
import (
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/core/types"
)
@@ -12,15 +10,8 @@ const (
DBVersion = 1
)
var (
// defaultCurrentBlockHeight is the default value for the current block height for first time indexing
defaultCurrentBlock = types.BlockHeader{
Hash: common.ZeroHash,
Height: -1,
}
lastV1Block = types.BlockHeader{
Hash: *utils.Must(chainhash.NewHashFromStr("00000000000001aa077d7aa84c532a4d69bdbff519609d1da0835261b7a74eb6")),
Height: 227835,
}
)
// DefaultCurrentBlockHeight is the default value for the current block height for first time indexing
var defaultCurrentBlock = types.BlockHeader{
Hash: common.ZeroHash,
Height: -1,
}

View File

@@ -2,8 +2,8 @@ BEGIN;
-- DROP INDEX
DROP INDEX IF EXISTS bitcoin_blocks_block_hash_idx;
DROP INDEX IF EXISTS bitcoin_transactions_tx_hash_idx;
DROP INDEX IF EXISTS bitcoin_transactions_block_hash_idx;
DROP INDEX IF EXISTS bitcoin_transactions_block_height_idx;
DROP INDEX IF EXISTS bitcoin_transaction_txouts_pkscript_idx;
DROP INDEX IF EXISTS bitcoin_transaction_txins_prevout_idx;

View File

@@ -32,17 +32,16 @@ CREATE TABLE IF NOT EXISTS "bitcoin_blocks" (
CREATE INDEX IF NOT EXISTS bitcoin_blocks_block_hash_idx ON "bitcoin_blocks" USING HASH ("block_hash");
CREATE TABLE IF NOT EXISTS "bitcoin_transactions" (
"tx_hash" TEXT NOT NULL, -- can't use as primary key because block v1 has duplicate tx hashes (coinbase tx). See: https://github.com/bitcoin/bitcoin/commit/a206b0ea12eb4606b93323268fc81a4f1f952531
"tx_hash" TEXT NOT NULL PRIMARY KEY,
"version" INT NOT NULL,
"locktime" BIGINT NOT NULL,
"block_height" INT NOT NULL,
"block_hash" TEXT NOT NULL,
"idx" INT NOT NULL,
PRIMARY KEY ("block_height", "idx")
"idx" INT NOT NULL
);
CREATE INDEX IF NOT EXISTS bitcoin_transactions_tx_hash_idx ON "bitcoin_transactions" USING HASH ("tx_hash");
CREATE INDEX IF NOT EXISTS bitcoin_transactions_block_hash_idx ON "bitcoin_transactions" USING HASH ("block_hash");
CREATE INDEX IF NOT EXISTS bitcoin_transactions_block_height_idx ON "bitcoin_transactions" USING BTREE ("block_height");
CREATE INDEX IF NOT EXISTS bitcoin_transactions_block_hash_idx ON "bitcoin_transactions" USING BTREE ("block_hash");
CREATE TABLE IF NOT EXISTS "bitcoin_transaction_txouts" (
"tx_hash" TEXT NOT NULL,
@@ -62,7 +61,7 @@ CREATE TABLE IF NOT EXISTS "bitcoin_transaction_txins" (
"prevout_tx_idx" INT NOT NULL,
"prevout_pkscript" TEXT NULL, -- Hex String, Can be NULL if the prevout is a coinbase transaction
"scriptsig" TEXT NOT NULL, -- Hex String
"witness" TEXT NOT NULL DEFAULT '', -- Hex String
"witness" TEXT, -- Hex String
"sequence" BIGINT NOT NULL,
PRIMARY KEY ("tx_hash", "tx_idx")
);

View File

@@ -4,61 +4,21 @@ SELECT * FROM bitcoin_blocks ORDER BY block_height DESC LIMIT 1;
-- name: InsertBlock :exec
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce") VALUES ($1, $2, $3, $4, $5, $6, $7, $8);
-- name: BatchInsertBlocks :exec
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce")
VALUES (
unnest(@block_height_arr::INT[]),
unnest(@block_hash_arr::TEXT[]),
unnest(@version_arr::INT[]),
unnest(@merkle_root_arr::TEXT[]),
unnest(@prev_block_hash_arr::TEXT[]),
unnest(@timestamp_arr::TIMESTAMP WITH TIME ZONE[]), -- or use TIMESTAMPTZ
unnest(@bits_arr::BIGINT[]),
unnest(@nonce_arr::BIGINT[])
);
-- name: InsertTransaction :exec
INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height","block_hash","idx") VALUES ($1, $2, $3, $4, $5, $6);
-- name: BatchInsertTransactions :exec
INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height","block_hash","idx")
VALUES (
unnest(@tx_hash_arr::TEXT[]),
unnest(@version_arr::INT[]),
unnest(@locktime_arr::BIGINT[]),
unnest(@block_height_arr::INT[]),
unnest(@block_hash_arr::TEXT[]),
unnest(@idx_arr::INT[])
);
-- name: InsertTransactionTxOut :exec
INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value") VALUES ($1, $2, $3, $4);
-- name: BatchInsertTransactionTxIns :exec
-- name: InsertTransactionTxIn :exec
WITH update_txout AS (
UPDATE "bitcoin_transaction_txouts"
SET "is_spent" = true
FROM (SELECT unnest(@prevout_tx_hash_arr::TEXT[]) as tx_hash, unnest(@prevout_tx_idx_arr::INT[]) as tx_idx) as txin
WHERE "bitcoin_transaction_txouts"."tx_hash" = txin.tx_hash AND "bitcoin_transaction_txouts"."tx_idx" = txin.tx_idx AND "is_spent" = false
RETURNING "bitcoin_transaction_txouts"."tx_hash", "bitcoin_transaction_txouts"."tx_idx", "pkscript"
), prepare_insert AS (
SELECT input.tx_hash, input.tx_idx, prevout_tx_hash, prevout_tx_idx, update_txout.pkscript as prevout_pkscript, scriptsig, witness, sequence
FROM (
SELECT
unnest(@tx_hash_arr::TEXT[]) as tx_hash,
unnest(@tx_idx_arr::INT[]) as tx_idx,
unnest(@prevout_tx_hash_arr::TEXT[]) as prevout_tx_hash,
unnest(@prevout_tx_idx_arr::INT[]) as prevout_tx_idx,
unnest(@scriptsig_arr::TEXT[]) as scriptsig,
unnest(@witness_arr::TEXT[]) as witness,
unnest(@sequence_arr::INT[]) as sequence
) input LEFT JOIN update_txout ON "update_txout"."tx_hash" = "input"."prevout_tx_hash" AND "update_txout"."tx_idx" = "input"."prevout_tx_idx"
WHERE "tx_hash" = $3 AND "tx_idx" = $4 AND "is_spent" = false -- TODO: should throw an error if already spent
RETURNING "pkscript"
)
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx", "prevout_pkscript","scriptsig","witness","sequence")
SELECT "tx_hash", "tx_idx", "prevout_tx_hash", "prevout_tx_idx", "prevout_pkscript", "scriptsig", "witness", "sequence" FROM prepare_insert;
-- name: BatchInsertTransactionTxOuts :exec
INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value")
VALUES (
unnest(@tx_hash_arr::TEXT[]),
unnest(@tx_idx_arr::INT[]),
unnest(@pkscript_arr::TEXT[]),
unnest(@value_arr::BIGINT[])
);
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx","prevout_pkscript","scriptsig","witness","sequence")
VALUES ($1, $2, $3, $4, (SELECT "pkscript" FROM update_txout), $5, $6, $7);
-- name: RevertData :exec
WITH delete_tx AS (

View File

@@ -13,7 +13,7 @@ type BitcoinDataGateway interface {
}
type BitcoinWriterDataDataGateway interface {
InsertBlocks(ctx context.Context, blocks []*types.Block) error
InsertBlock(context.Context, *types.Block) error
RevertBlocks(context.Context, int64) error
}

View File

@@ -1,7 +1,10 @@
package bitcoin
import (
"cmp"
"context"
"log/slog"
"slices"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
@@ -9,6 +12,8 @@ import (
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/config"
"github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
)
// Make sure to implement the BitcoinProcessor interface
@@ -29,7 +34,7 @@ func NewProcessor(config config.Config, bitcoinDg datagateway.BitcoinDataGateway
}
func (p Processor) Name() string {
return "bitcoin"
return "Bitcoin"
}
func (p *Processor) Process(ctx context.Context, inputs []*types.Block) error {
@@ -37,15 +42,36 @@ func (p *Processor) Process(ctx context.Context, inputs []*types.Block) error {
return nil
}
// Process the given blocks before inserting to the database
inputs, err := p.process(ctx, inputs)
// Sort ASC by block height
slices.SortFunc(inputs, func(t1, t2 *types.Block) int {
return cmp.Compare(t1.Header.Height, t2.Header.Height)
})
latestBlock, err := p.CurrentBlock(ctx)
if err != nil {
return errors.WithStack(err)
return errors.Wrap(err, "failed to get latest indexed block header")
}
// check if the given blocks are continue from the latest indexed block
// return an error to prevent inserting out-of-order blocks or duplicate blocks
if inputs[0].Header.Height != latestBlock.Height+1 {
return errors.New("given blocks are not continue from the latest indexed block")
}
// check if the given blocks are in sequence and not missing any block
for i := 1; i < len(inputs); i++ {
if inputs[i].Header.Height != inputs[i-1].Header.Height+1 {
return errors.New("given blocks are not in sequence")
}
}
// Insert blocks
if err := p.bitcoinDg.InsertBlocks(ctx, inputs); err != nil {
return errors.Wrapf(err, "error during insert blocks, from: %d, to: %d", inputs[0].Header.Height, inputs[len(inputs)-1].Header.Height)
for _, b := range inputs {
err := p.bitcoinDg.InsertBlock(ctx, b)
if err != nil {
return errors.Wrapf(err, "failed to insert block, height: %d, hash: %s", b.Header.Height, b.Header.Hash)
}
logger.InfoContext(ctx, "Block inserted", slog.Int64("height", b.Header.Height), slogx.Stringer("hash", b.Header.Hash))
}
return nil
@@ -71,12 +97,6 @@ func (p *Processor) GetIndexedBlock(ctx context.Context, height int64) (types.Bl
}
func (p *Processor) RevertData(ctx context.Context, from int64) error {
// to prevent remove txin/txout of duplicated coinbase transaction in the blocks 91842 and 91880
// if you really want to revert the data before the block `227835`, you should reset the database and reindex the data instead.
if from <= lastV1Block.Height {
return errors.Wrapf(errs.InvalidArgument, "can't revert data before block version 2, height: %d", lastV1Block.Height)
}
if err := p.bitcoinDg.RevertBlocks(ctx, from); err != nil {
return errors.WithStack(err)
}

View File

@@ -1,91 +0,0 @@
package bitcoin
import (
"cmp"
"context"
"slices"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/core/types"
)
// process is a processing rules for the given blocks before inserting to the database
//
// this function will modify the given data directly.
func (p *Processor) process(ctx context.Context, blocks []*types.Block) ([]*types.Block, error) {
if len(blocks) == 0 {
return blocks, nil
}
// Sort ASC by block height
slices.SortFunc(blocks, func(t1, t2 *types.Block) int {
return cmp.Compare(t1.Header.Height, t2.Header.Height)
})
if !p.isContinueFromLatestIndexedBlock(ctx, blocks[0]) {
return nil, errors.New("given blocks are not continue from the latest indexed block")
}
if !p.isBlocksSequential(blocks) {
return nil, errors.New("given blocks are not in sequence")
}
p.removeDuplicateCoinbaseTxInputsOutputs(blocks)
return blocks, nil
}
// check if the given blocks are continue from the latest indexed block
// to prevent inserting out-of-order blocks or duplicate blocks
func (p *Processor) isBlocksSequential(blocks []*types.Block) bool {
if len(blocks) == 0 {
return true
}
for i, block := range blocks {
if i == 0 {
continue
}
if block.Header.Height != blocks[i-1].Header.Height+1 {
return false
}
}
return true
}
// check if the given blocks are continue from the latest indexed block
// to prevent inserting out-of-order blocks or duplicate blocks
func (p *Processor) isContinueFromLatestIndexedBlock(ctx context.Context, block *types.Block) bool {
latestBlock, err := p.CurrentBlock(ctx)
if err != nil {
return false
}
return block.Header.Height == latestBlock.Height+1
}
// there 2 coinbase transaction that are duplicated in the blocks 91842 and 91880.
// if the given block version is v1 and height is `91842` or `91880`,
// then remove transaction inputs/outputs to prevent duplicate txin/txout error when inserting to the database.
//
// Theses duplicated coinbase transactions are having the same transaction input/output and
// utxo from these 2 duplicated coinbase txs can redeem only once. so, it's safe to remove them and can
// use inputs/outputs from the previous block.
//
// Duplicate Coinbase Transactions:
// - `454279874213763724535987336644243549a273058910332236515429488599` in blocks 91812, 91842
// - `e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468` in blocks 91722, 91880
//
// This function will modify the given data directly.
func (p *Processor) removeDuplicateCoinbaseTxInputsOutputs(blocks []*types.Block) {
for _, block := range blocks {
header := block.Header
if header.Version == 1 && (header.Height == 91842 || header.Height == 91880) {
// remove transaction inputs/outputs from coinbase transaction (first transaction)
block.Transactions[0].TxIn = nil
block.Transactions[0].TxOut = nil
}
}
}

View File

@@ -1,144 +0,0 @@
package bitcoin
import (
"fmt"
"testing"
"github.com/gaze-network/indexer-network/core/types"
"github.com/stretchr/testify/assert"
)
func TestDuplicateCoinbaseTxHashHandling(t *testing.T) {
processor := Processor{}
generator := func() []*types.Block {
return []*types.Block{
{
Header: types.BlockHeader{Height: 91842, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
},
{
Header: types.BlockHeader{Height: 91880, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
},
}
}
t.Run("all_duplicated_txs", func(t *testing.T) {
blocks := generator()
processor.removeDuplicateCoinbaseTxInputsOutputs(blocks)
assert.Len(t, blocks, 2, "should not remove any blocks")
for _, block := range blocks {
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
assert.Len(t, block.Transactions[0].TxIn, 0, "should remove tx inputs from coinbase transaction")
assert.Len(t, block.Transactions[0].TxOut, 0, "should remove tx outputs from coinbase transaction")
}
})
t.Run("not_duplicated_txs", func(t *testing.T) {
blocks := []*types.Block{
{
Header: types.BlockHeader{Height: 91812, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
},
{
Header: types.BlockHeader{Height: 91722, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
},
}
processor.removeDuplicateCoinbaseTxInputsOutputs(blocks)
assert.Len(t, blocks, 2, "should not remove any blocks")
for _, block := range blocks {
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
assert.Len(t, block.Transactions[0].TxIn, 4, "should not remove tx inputs from coinbase transaction")
assert.Len(t, block.Transactions[0].TxOut, 4, "should not remove tx outputs from coinbase transaction")
}
})
t.Run("mixed", func(t *testing.T) {
blocks := []*types.Block{
{
Header: types.BlockHeader{Height: 91812, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
},
}
blocks = append(blocks, generator()...)
blocks = append(blocks, &types.Block{
Header: types.BlockHeader{Height: 91722, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
})
processor.removeDuplicateCoinbaseTxInputsOutputs(blocks)
assert.Len(t, blocks, 4, "should not remove any blocks")
// only 2nd and 3rd blocks should be modified
for i, block := range blocks {
t.Run(fmt.Sprint(i), func(t *testing.T) {
if i == 1 || i == 2 {
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
assert.Len(t, block.Transactions[0].TxIn, 0, "should remove tx inputs from coinbase transaction")
assert.Len(t, block.Transactions[0].TxOut, 0, "should remove tx outputs from coinbase transaction")
} else {
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
assert.Lenf(t, block.Transactions[0].TxIn, 4, "should not remove tx inputs from coinbase transaction")
assert.Len(t, block.Transactions[0].TxOut, 4, "should not remove tx outputs from coinbase transaction")
}
})
}
})
}

View File

@@ -28,12 +28,8 @@ func (r *Repository) GetLatestBlockHeader(ctx context.Context) (types.BlockHeade
return data, nil
}
func (r *Repository) InsertBlocks(ctx context.Context, blocks []*types.Block) error {
if len(blocks) == 0 {
return nil
}
blockParams, txParams, txoutParams, txinParams := mapBlocksTypeToParams(blocks)
func (r *Repository) InsertBlock(ctx context.Context, block *types.Block) error {
blockParams, txParams, txoutParams, txinParams := mapBlockTypeToParams(block)
tx, err := r.db.Begin(ctx)
if err != nil {
@@ -43,22 +39,28 @@ func (r *Repository) InsertBlocks(ctx context.Context, blocks []*types.Block) er
queries := r.queries.WithTx(tx)
if err := queries.BatchInsertBlocks(ctx, blockParams); err != nil {
return errors.Wrap(err, "failed to batch insert block headers")
if err := queries.InsertBlock(ctx, blockParams); err != nil {
return errors.Wrapf(err, "failed to insert block, height: %d, hash: %s", blockParams.BlockHeight, blockParams.BlockHash)
}
if err := queries.BatchInsertTransactions(ctx, txParams); err != nil {
return errors.Wrap(err, "failed to batch insert transactions")
for _, params := range txParams {
if err := queries.InsertTransaction(ctx, params); err != nil {
return errors.Wrapf(err, "failed to insert transaction, hash: %s", params.TxHash)
}
}
// Should insert txout first, then txin
// Because txin references txout
if err := queries.BatchInsertTransactionTxOuts(ctx, txoutParams); err != nil {
return errors.Wrap(err, "failed to batch insert transaction txins")
for _, params := range txoutParams {
if err := queries.InsertTransactionTxOut(ctx, params); err != nil {
return errors.Wrapf(err, "failed to insert transaction txout, %v:%v", params.TxHash, params.TxIdx)
}
}
if err := queries.BatchInsertTransactionTxIns(ctx, txinParams); err != nil {
return errors.Wrap(err, "failed to batch insert transaction txins")
for _, params := range txinParams {
if err := queries.InsertTransactionTxIn(ctx, params); err != nil {
return errors.Wrapf(err, "failed to insert transaction txin, %v:%v", params.TxHash, params.TxIdx)
}
}
if err := tx.Commit(ctx); err != nil {

View File

@@ -11,152 +11,6 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
const batchInsertBlocks = `-- name: BatchInsertBlocks :exec
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce")
VALUES (
unnest($1::INT[]),
unnest($2::TEXT[]),
unnest($3::INT[]),
unnest($4::TEXT[]),
unnest($5::TEXT[]),
unnest($6::TIMESTAMP WITH TIME ZONE[]), -- or use TIMESTAMPTZ
unnest($7::BIGINT[]),
unnest($8::BIGINT[])
)
`
type BatchInsertBlocksParams struct {
BlockHeightArr []int32
BlockHashArr []string
VersionArr []int32
MerkleRootArr []string
PrevBlockHashArr []string
TimestampArr []pgtype.Timestamptz
BitsArr []int64
NonceArr []int64
}
func (q *Queries) BatchInsertBlocks(ctx context.Context, arg BatchInsertBlocksParams) error {
_, err := q.db.Exec(ctx, batchInsertBlocks,
arg.BlockHeightArr,
arg.BlockHashArr,
arg.VersionArr,
arg.MerkleRootArr,
arg.PrevBlockHashArr,
arg.TimestampArr,
arg.BitsArr,
arg.NonceArr,
)
return err
}
const batchInsertTransactionTxIns = `-- name: BatchInsertTransactionTxIns :exec
WITH update_txout AS (
UPDATE "bitcoin_transaction_txouts"
SET "is_spent" = true
FROM (SELECT unnest($1::TEXT[]) as tx_hash, unnest($2::INT[]) as tx_idx) as txin
WHERE "bitcoin_transaction_txouts"."tx_hash" = txin.tx_hash AND "bitcoin_transaction_txouts"."tx_idx" = txin.tx_idx AND "is_spent" = false
RETURNING "bitcoin_transaction_txouts"."tx_hash", "bitcoin_transaction_txouts"."tx_idx", "pkscript"
), prepare_insert AS (
SELECT input.tx_hash, input.tx_idx, prevout_tx_hash, prevout_tx_idx, update_txout.pkscript as prevout_pkscript, scriptsig, witness, sequence
FROM (
SELECT
unnest($3::TEXT[]) as tx_hash,
unnest($4::INT[]) as tx_idx,
unnest($1::TEXT[]) as prevout_tx_hash,
unnest($2::INT[]) as prevout_tx_idx,
unnest($5::TEXT[]) as scriptsig,
unnest($6::TEXT[]) as witness,
unnest($7::INT[]) as sequence
) input LEFT JOIN update_txout ON "update_txout"."tx_hash" = "input"."prevout_tx_hash" AND "update_txout"."tx_idx" = "input"."prevout_tx_idx"
)
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx", "prevout_pkscript","scriptsig","witness","sequence")
SELECT "tx_hash", "tx_idx", "prevout_tx_hash", "prevout_tx_idx", "prevout_pkscript", "scriptsig", "witness", "sequence" FROM prepare_insert
`
type BatchInsertTransactionTxInsParams struct {
PrevoutTxHashArr []string
PrevoutTxIdxArr []int32
TxHashArr []string
TxIdxArr []int32
ScriptsigArr []string
WitnessArr []string
SequenceArr []int32
}
func (q *Queries) BatchInsertTransactionTxIns(ctx context.Context, arg BatchInsertTransactionTxInsParams) error {
_, err := q.db.Exec(ctx, batchInsertTransactionTxIns,
arg.PrevoutTxHashArr,
arg.PrevoutTxIdxArr,
arg.TxHashArr,
arg.TxIdxArr,
arg.ScriptsigArr,
arg.WitnessArr,
arg.SequenceArr,
)
return err
}
const batchInsertTransactionTxOuts = `-- name: BatchInsertTransactionTxOuts :exec
INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value")
VALUES (
unnest($1::TEXT[]),
unnest($2::INT[]),
unnest($3::TEXT[]),
unnest($4::BIGINT[])
)
`
type BatchInsertTransactionTxOutsParams struct {
TxHashArr []string
TxIdxArr []int32
PkscriptArr []string
ValueArr []int64
}
func (q *Queries) BatchInsertTransactionTxOuts(ctx context.Context, arg BatchInsertTransactionTxOutsParams) error {
_, err := q.db.Exec(ctx, batchInsertTransactionTxOuts,
arg.TxHashArr,
arg.TxIdxArr,
arg.PkscriptArr,
arg.ValueArr,
)
return err
}
const batchInsertTransactions = `-- name: BatchInsertTransactions :exec
INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height","block_hash","idx")
VALUES (
unnest($1::TEXT[]),
unnest($2::INT[]),
unnest($3::BIGINT[]),
unnest($4::INT[]),
unnest($5::TEXT[]),
unnest($6::INT[])
)
`
type BatchInsertTransactionsParams struct {
TxHashArr []string
VersionArr []int32
LocktimeArr []int64
BlockHeightArr []int32
BlockHashArr []string
IdxArr []int32
}
func (q *Queries) BatchInsertTransactions(ctx context.Context, arg BatchInsertTransactionsParams) error {
_, err := q.db.Exec(ctx, batchInsertTransactions,
arg.TxHashArr,
arg.VersionArr,
arg.LocktimeArr,
arg.BlockHeightArr,
arg.BlockHashArr,
arg.IdxArr,
)
return err
}
const getBlockByHeight = `-- name: GetBlockByHeight :one
SELECT block_height, block_hash, version, merkle_root, prev_block_hash, timestamp, bits, nonce FROM bitcoin_blocks WHERE block_height = $1
`
@@ -381,6 +235,86 @@ func (q *Queries) InsertBlock(ctx context.Context, arg InsertBlockParams) error
return err
}
const insertTransaction = `-- name: InsertTransaction :exec
INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height","block_hash","idx") VALUES ($1, $2, $3, $4, $5, $6)
`
type InsertTransactionParams struct {
TxHash string
Version int32
Locktime int64
BlockHeight int32
BlockHash string
Idx int32
}
func (q *Queries) InsertTransaction(ctx context.Context, arg InsertTransactionParams) error {
_, err := q.db.Exec(ctx, insertTransaction,
arg.TxHash,
arg.Version,
arg.Locktime,
arg.BlockHeight,
arg.BlockHash,
arg.Idx,
)
return err
}
const insertTransactionTxIn = `-- name: InsertTransactionTxIn :exec
WITH update_txout AS (
UPDATE "bitcoin_transaction_txouts"
SET "is_spent" = true
WHERE "tx_hash" = $3 AND "tx_idx" = $4 AND "is_spent" = false -- TODO: should throw an error if already spent
RETURNING "pkscript"
)
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx","prevout_pkscript","scriptsig","witness","sequence")
VALUES ($1, $2, $3, $4, (SELECT "pkscript" FROM update_txout), $5, $6, $7)
`
type InsertTransactionTxInParams struct {
TxHash string
TxIdx int32
PrevoutTxHash string
PrevoutTxIdx int32
Scriptsig string
Witness pgtype.Text
Sequence int64
}
func (q *Queries) InsertTransactionTxIn(ctx context.Context, arg InsertTransactionTxInParams) error {
_, err := q.db.Exec(ctx, insertTransactionTxIn,
arg.TxHash,
arg.TxIdx,
arg.PrevoutTxHash,
arg.PrevoutTxIdx,
arg.Scriptsig,
arg.Witness,
arg.Sequence,
)
return err
}
const insertTransactionTxOut = `-- name: InsertTransactionTxOut :exec
INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value") VALUES ($1, $2, $3, $4)
`
type InsertTransactionTxOutParams struct {
TxHash string
TxIdx int32
Pkscript string
Value int64
}
func (q *Queries) InsertTransactionTxOut(ctx context.Context, arg InsertTransactionTxOutParams) error {
_, err := q.db.Exec(ctx, insertTransactionTxOut,
arg.TxHash,
arg.TxIdx,
arg.Pkscript,
arg.Value,
)
return err
}
const revertData = `-- name: RevertData :exec
WITH delete_tx AS (
DELETE FROM "bitcoin_transactions" WHERE "block_height" >= $1

View File

@@ -48,7 +48,7 @@ type BitcoinTransactionTxin struct {
PrevoutTxIdx int32
PrevoutPkscript pgtype.Text
Scriptsig string
Witness string
Witness pgtype.Text
Sequence int64
}

View File

@@ -14,6 +14,22 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
func mapBlockHeaderTypeToModel(src types.BlockHeader) gen.BitcoinBlock {
return gen.BitcoinBlock{
BlockHeight: int32(src.Height),
BlockHash: src.Hash.String(),
Version: src.Version,
MerkleRoot: src.MerkleRoot.String(),
PrevBlockHash: src.PrevBlock.String(),
Timestamp: pgtype.Timestamptz{
Time: src.Timestamp,
Valid: true,
},
Bits: int64(src.Bits),
Nonce: int64(src.Nonce),
}
}
func mapBlockHeaderModelToType(src gen.BitcoinBlock) (types.BlockHeader, error) {
hash, err := chainhash.NewHashFromStr(src.BlockHash)
if err != nil {
@@ -39,93 +55,63 @@ func mapBlockHeaderModelToType(src gen.BitcoinBlock) (types.BlockHeader, error)
}, nil
}
func mapBlocksTypeToParams(src []*types.Block) (gen.BatchInsertBlocksParams, gen.BatchInsertTransactionsParams, gen.BatchInsertTransactionTxOutsParams, gen.BatchInsertTransactionTxInsParams) {
blocks := gen.BatchInsertBlocksParams{
BlockHeightArr: make([]int32, 0, len(src)),
BlockHashArr: make([]string, 0, len(src)),
VersionArr: make([]int32, 0, len(src)),
MerkleRootArr: make([]string, 0, len(src)),
PrevBlockHashArr: make([]string, 0, len(src)),
TimestampArr: make([]pgtype.Timestamptz, 0, len(src)),
BitsArr: make([]int64, 0, len(src)),
NonceArr: make([]int64, 0, len(src)),
}
txs := gen.BatchInsertTransactionsParams{
TxHashArr: []string{},
VersionArr: []int32{},
LocktimeArr: []int64{},
BlockHeightArr: []int32{},
BlockHashArr: []string{},
IdxArr: []int32{},
}
txouts := gen.BatchInsertTransactionTxOutsParams{
TxHashArr: []string{},
TxIdxArr: []int32{},
PkscriptArr: []string{},
ValueArr: []int64{},
}
txins := gen.BatchInsertTransactionTxInsParams{
PrevoutTxHashArr: []string{},
PrevoutTxIdxArr: []int32{},
TxHashArr: []string{},
TxIdxArr: []int32{},
ScriptsigArr: []string{},
WitnessArr: []string{},
SequenceArr: []int32{},
}
for _, block := range src {
blockHash := block.Header.Hash.String()
// Batch insert blocks
blocks.BlockHeightArr = append(blocks.BlockHeightArr, int32(block.Header.Height))
blocks.BlockHashArr = append(blocks.BlockHashArr, blockHash)
blocks.VersionArr = append(blocks.VersionArr, block.Header.Version)
blocks.MerkleRootArr = append(blocks.MerkleRootArr, block.Header.MerkleRoot.String())
blocks.PrevBlockHashArr = append(blocks.PrevBlockHashArr, block.Header.PrevBlock.String())
blocks.TimestampArr = append(blocks.TimestampArr, pgtype.Timestamptz{
Time: block.Header.Timestamp,
func mapBlockTypeToParams(src *types.Block) (gen.InsertBlockParams, []gen.InsertTransactionParams, []gen.InsertTransactionTxOutParams, []gen.InsertTransactionTxInParams) {
txs := make([]gen.InsertTransactionParams, 0, len(src.Transactions))
txouts := make([]gen.InsertTransactionTxOutParams, 0)
txins := make([]gen.InsertTransactionTxInParams, 0)
block := gen.InsertBlockParams{
BlockHeight: int32(src.Header.Height),
BlockHash: src.Header.Hash.String(),
Version: src.Header.Version,
MerkleRoot: src.Header.MerkleRoot.String(),
PrevBlockHash: src.Header.PrevBlock.String(),
Timestamp: pgtype.Timestamptz{
Time: src.Header.Timestamp,
Valid: true,
})
blocks.BitsArr = append(blocks.BitsArr, int64(block.Header.Bits))
blocks.NonceArr = append(blocks.NonceArr, int64(block.Header.Nonce))
},
Bits: int64(src.Header.Bits),
Nonce: int64(src.Header.Nonce),
}
for txIdx, srcTx := range src.Transactions {
tx := gen.InsertTransactionParams{
TxHash: srcTx.TxHash.String(),
Version: srcTx.Version,
Locktime: int64(srcTx.LockTime),
BlockHeight: int32(src.Header.Height),
BlockHash: src.Header.Hash.String(),
Idx: int32(txIdx),
}
txs = append(txs, tx)
for txIdx, srcTx := range block.Transactions {
txHash := srcTx.TxHash.String()
// Batch insert transactions
txs.TxHashArr = append(txs.TxHashArr, txHash)
txs.VersionArr = append(txs.VersionArr, srcTx.Version)
txs.LocktimeArr = append(txs.LocktimeArr, int64(srcTx.LockTime))
txs.BlockHeightArr = append(txs.BlockHeightArr, int32(block.Header.Height))
txs.BlockHashArr = append(txs.BlockHashArr, blockHash)
txs.IdxArr = append(txs.IdxArr, int32(txIdx))
// Batch insert txins
for idx, txin := range srcTx.TxIn {
var witness string
if len(txin.Witness) > 0 {
witness = btcutils.WitnessToString(txin.Witness)
for idx, txin := range srcTx.TxIn {
var witness pgtype.Text
if len(txin.Witness) > 0 {
witness = pgtype.Text{
String: btcutils.WitnessToString(txin.Witness),
Valid: true,
}
txins.TxHashArr = append(txins.TxHashArr, txHash)
txins.TxIdxArr = append(txins.TxIdxArr, int32(idx))
txins.PrevoutTxHashArr = append(txins.PrevoutTxHashArr, txin.PreviousOutTxHash.String())
txins.PrevoutTxIdxArr = append(txins.PrevoutTxIdxArr, int32(txin.PreviousOutIndex))
txins.ScriptsigArr = append(txins.ScriptsigArr, hex.EncodeToString(txin.SignatureScript))
txins.WitnessArr = append(txins.WitnessArr, witness)
txins.SequenceArr = append(txins.SequenceArr, int32(txin.Sequence))
}
txins = append(txins, gen.InsertTransactionTxInParams{
TxHash: tx.TxHash,
TxIdx: int32(idx),
PrevoutTxHash: txin.PreviousOutTxHash.String(),
PrevoutTxIdx: int32(txin.PreviousOutIndex),
Scriptsig: hex.EncodeToString(txin.SignatureScript),
Witness: witness,
Sequence: int64(txin.Sequence),
})
}
// Batch insert txouts
for idx, txout := range srcTx.TxOut {
txouts.TxHashArr = append(txouts.TxHashArr, txHash)
txouts.TxIdxArr = append(txouts.TxIdxArr, int32(idx))
txouts.PkscriptArr = append(txouts.PkscriptArr, hex.EncodeToString(txout.PkScript))
txouts.ValueArr = append(txouts.ValueArr, txout.Value)
}
for idx, txout := range srcTx.TxOut {
txouts = append(txouts, gen.InsertTransactionTxOutParams{
TxHash: tx.TxHash,
TxIdx: int32(idx),
Pkscript: hex.EncodeToString(txout.PkScript),
Value: txout.Value,
})
}
}
return blocks, txs, txouts, txins
return block, txs, txouts, txins
}
func mapTransactionModelToType(src gen.BitcoinTransaction, txInModel []gen.BitcoinTransactionTxin, txOutModels []gen.BitcoinTransactionTxout) (types.Transaction, error) {
@@ -160,9 +146,13 @@ func mapTransactionModelToType(src gen.BitcoinTransaction, txInModel []gen.Bitco
return types.Transaction{}, errors.Wrap(err, "failed to parse prevout tx hash")
}
witness, err := btcutils.WitnessFromString(txInModel.Witness)
if err != nil {
return types.Transaction{}, errors.Wrap(err, "failed to parse witness from hex string")
var witness [][]byte
if txInModel.Witness.Valid {
w, err := btcutils.WitnessFromString(txInModel.Witness.String)
if err != nil {
return types.Transaction{}, errors.Wrap(err, "failed to parse witness from hex string")
}
witness = w
}
txIns = append(txIns, &types.TxIn{

View File

@@ -1,12 +1,13 @@
package httphandler
import (
"bytes"
"encoding/hex"
"slices"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
"github.com/gaze-network/indexer-network/modules/runes/runes"
"github.com/gaze-network/uint128"
"github.com/gofiber/fiber/v2"
@@ -83,7 +84,6 @@ type amountWithDecimal struct {
type transaction struct {
TxHash chainhash.Hash `json:"txHash"`
BlockHeight uint64 `json:"blockHeight"`
Index uint32 `json:"index"`
Timestamp int64 `json:"timestamp"`
Inputs []txInputOutput `json:"inputs"`
Outputs []txInputOutput `json:"outputs"`
@@ -116,6 +116,15 @@ func (h *HttpHandler) GetTransactions(ctx *fiber.Ctx) (err error) {
}
}
blockHeight := req.BlockHeight
if blockHeight == 0 {
blockHeader, err := h.usecase.GetLatestBlock(ctx.UserContext())
if err != nil {
return errors.Wrap(err, "error during GetLatestBlock")
}
blockHeight = uint64(blockHeader.Height)
}
var runeId runes.RuneId
if req.Id != "" {
var ok bool
@@ -125,23 +134,68 @@ func (h *HttpHandler) GetTransactions(ctx *fiber.Ctx) (err error) {
}
}
blockHeight := req.BlockHeight
// set blockHeight to the latest block height blockHeight, pkScript, and runeId are not provided
if blockHeight == 0 && pkScript == nil && runeId == (runes.RuneId{}) {
blockHeader, err := h.usecase.GetLatestBlock(ctx.UserContext())
if err != nil {
return errors.Wrap(err, "error during GetLatestBlock")
}
blockHeight = uint64(blockHeader.Height)
}
txs, err := h.usecase.GetRuneTransactions(ctx.UserContext(), pkScript, runeId, blockHeight)
txs, err := h.usecase.GetTransactionsByHeight(ctx.UserContext(), blockHeight)
if err != nil {
return errors.Wrap(err, "error during GetRuneTransactions")
return errors.Wrap(err, "error during GetTransactionsByHeight")
}
var allRuneIds []runes.RuneId
filteredTxs := make([]*entity.RuneTransaction, 0)
isTxContainPkScript := func(tx *entity.RuneTransaction) bool {
for _, input := range tx.Inputs {
if bytes.Equal(input.PkScript, pkScript) {
return true
}
}
for _, output := range tx.Outputs {
if bytes.Equal(output.PkScript, pkScript) {
return true
}
}
return false
}
isTxContainRuneId := func(tx *entity.RuneTransaction) bool {
for _, input := range tx.Inputs {
if input.RuneId == runeId {
return true
}
}
for _, output := range tx.Outputs {
if output.RuneId == runeId {
return true
}
}
for mintedRuneId := range tx.Mints {
if mintedRuneId == runeId {
return true
}
}
for burnedRuneId := range tx.Burns {
if burnedRuneId == runeId {
return true
}
}
if tx.Runestone != nil {
if tx.Runestone.Mint != nil && *tx.Runestone.Mint == runeId {
return true
}
// returns true if this tx etched this runeId
if tx.RuneEtched && tx.BlockHeight == runeId.BlockHeight && tx.Index == runeId.TxIndex {
return true
}
}
return false
}
for _, tx := range txs {
if pkScript != nil && !isTxContainPkScript(tx) {
continue
}
if runeId != (runes.RuneId{}) && !isTxContainRuneId(tx) {
continue
}
filteredTxs = append(filteredTxs, tx)
}
var allRuneIds []runes.RuneId
for _, tx := range filteredTxs {
for id := range tx.Mints {
allRuneIds = append(allRuneIds, id)
}
@@ -161,12 +215,11 @@ func (h *HttpHandler) GetTransactions(ctx *fiber.Ctx) (err error) {
return errors.Wrap(err, "error during GetRuneEntryByRuneIdBatch")
}
txList := make([]transaction, 0, len(txs))
for _, tx := range txs {
txList := make([]transaction, 0, len(filteredTxs))
for _, tx := range filteredTxs {
respTx := transaction{
TxHash: tx.Hash,
BlockHeight: tx.BlockHeight,
Index: tx.Index,
Timestamp: tx.Timestamp.Unix(),
Inputs: make([]txInputOutput, 0, len(tx.Inputs)),
Outputs: make([]txInputOutput, 0, len(tx.Outputs)),
@@ -256,13 +309,6 @@ func (h *HttpHandler) GetTransactions(ctx *fiber.Ctx) (err error) {
}
txList = append(txList, respTx)
}
// sort by block height ASC, then index ASC
slices.SortFunc(txList, func(t1, t2 transaction) int {
if t1.BlockHeight != t2.BlockHeight {
return int(t1.BlockHeight - t2.BlockHeight)
}
return int(t1.Index - t2.Index)
})
resp := getTransactionsResponse{
Result: &getTransactionsResult{

View File

@@ -72,7 +72,6 @@ CREATE TABLE IF NOT EXISTS "runes_transactions" (
"rune_etched" BOOLEAN NOT NULL
);
CREATE INDEX IF NOT EXISTS runes_transactions_block_height_idx ON "runes_transactions" USING BTREE ("block_height");
CREATE INDEX IF NOT EXISTS runes_transactions_jsonb_idx ON "runes_transactions" USING GIN ("inputs", "outputs", "mints", "burns");
CREATE TABLE IF NOT EXISTS "runes_runestones" (
"tx_hash" TEXT NOT NULL PRIMARY KEY,

View File

@@ -40,23 +40,10 @@ SELECT * FROM runes_entries
-- name: GetRuneIdFromRune :one
SELECT rune_id FROM runes_entries WHERE rune = $1;
-- name: GetRuneTransactions :many
SELECT * FROM runes_transactions
LEFT JOIN runes_runestones ON runes_transactions.hash = runes_runestones.tx_hash
WHERE (
@filter_pk_script::BOOLEAN = FALSE -- if @filter_pk_script is TRUE, apply pk_script filter
OR runes_transactions.outputs @> @pk_script_param::JSONB
OR runes_transactions.inputs @> @pk_script_param::JSONB
) AND (
@filter_rune_id::BOOLEAN = FALSE -- if @filter_rune_id is TRUE, apply rune_id filter
OR runes_transactions.outputs @> @rune_id_param::JSONB
OR runes_transactions.inputs @> @rune_id_param::JSONB
OR runes_transactions.mints ? @rune_id
OR runes_transactions.burns ? @rune_id
OR (runes_transactions.rune_etched = TRUE AND runes_transactions.block_height = @rune_id_block_height AND runes_transactions.index = @rune_id_tx_index)
) AND (
@block_height::INT = 0 OR runes_transactions.block_height = @block_height::INT -- if @block_height > 0, apply block_height filter
);
-- name: GetRuneTransactionsByHeight :many
SELECT * FROM runes_transactions
LEFT JOIN runes_runestones ON runes_transactions.hash = runes_runestones.tx_hash
WHERE runes_transactions.block_height = $1;
-- name: CountRuneEntries :one
SELECT COUNT(*) FROM runes_entries;

View File

@@ -26,8 +26,7 @@ type RunesDataGatewayWithTx interface {
type RunesReaderDataGateway interface {
GetLatestBlock(ctx context.Context) (types.BlockHeader, error)
GetIndexedBlockByHeight(ctx context.Context, height int64) (*entity.IndexedBlock, error)
// GetRuneTransactions returns the runes transactions, filterable by pkScript, runeId and height. If pkScript, runeId or height is zero value, that filter is ignored.
GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, height uint64) ([]*entity.RuneTransaction, error)
GetRuneTransactionsByHeight(ctx context.Context, height uint64) ([]*entity.RuneTransaction, error)
GetRunesBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[runes.RuneId]*entity.OutPointBalance, error)
GetUnspentOutPointBalancesByPkScript(ctx context.Context, pkScript []byte, blockHeight uint64) ([]*entity.OutPointBalance, error)

View File

@@ -82,7 +82,6 @@ func (p *Processor) getHashPayload(header types.BlockHeader) ([]byte, error) {
func serializeNewRuneEntry(entry *runes.RuneEntry) []byte {
var sb strings.Builder
sb.WriteString("newRuneEntry:")
// nolint:goconst
sb.WriteString("runeId:" + entry.RuneId.String())
sb.WriteString("number:" + strconv.Itoa(int(entry.Number)))
sb.WriteString("divisibility:" + strconv.Itoa(int(entry.Divisibility)))
@@ -94,7 +93,6 @@ func serializeNewRuneEntry(entry *runes.RuneEntry) []byte {
sb.WriteString("terms:")
terms := entry.Terms
if terms.Amount != nil {
// nolint:goconst
sb.WriteString("amount:" + terms.Amount.String())
}
if terms.Cap != nil {

View File

@@ -16,8 +16,7 @@ import (
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
"github.com/gaze-network/indexer-network/modules/runes/runes"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gaze-network/indexer-network/pkg/reportingclientv2"
"github.com/gaze-network/uint128"
"github.com/samber/lo"
)
@@ -30,7 +29,7 @@ type Processor struct {
bitcoinClient btcclient.Contract
bitcoinDataSource indexers.BitcoinDatasource
network common.Network
reportingClient *reportingclient.ReportingClient
reportingClient *reportingclientv2.ReportingClient
newRuneEntries map[runes.RuneId]*runes.RuneEntry
newRuneEntryStates map[runes.RuneId]*runes.RuneEntry
@@ -40,7 +39,7 @@ type Processor struct {
newRuneTxs []*entity.RuneTransaction
}
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, bitcoinDataSource indexers.BitcoinDatasource, network common.Network, reportingClient *reportingclient.ReportingClient) *Processor {
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, bitcoinDataSource indexers.BitcoinDatasource, network common.Network, reportingClient *reportingclientv2.ReportingClient) *Processor {
return &Processor{
runesDg: runesDg,
indexerInfoDg: indexerInfoDg,
@@ -73,7 +72,7 @@ func (p *Processor) VerifyStates(ctx context.Context) error {
}
}
if p.reportingClient != nil {
if err := p.reportingClient.SubmitNodeReport(ctx, "runes", p.network); err != nil {
if err := p.reportingClient.SubmitNodeReport(ctx, "runes", p.network, Version); err != nil {
return errors.Wrap(err, "failed to submit node report")
}
}
@@ -159,7 +158,7 @@ func (p *Processor) ensureGenesisRune(ctx context.Context) error {
}
func (p *Processor) Name() string {
return "runes"
return "Runes"
}
func (p *Processor) CurrentBlock(ctx context.Context) (types.BlockHeader, error) {
@@ -193,10 +192,7 @@ func (p *Processor) RevertData(ctx context.Context, from int64) error {
}
defer func() {
if err := runesDgTx.Rollback(ctx); err != nil {
logger.WarnContext(ctx, "failed to rollback transaction",
slogx.Error(err),
slogx.String("event", "rollback_runes_revert"),
)
logger.ErrorContext(ctx, "failed to rollback transaction", err)
}
}()

View File

@@ -18,27 +18,24 @@ import (
"github.com/gaze-network/indexer-network/modules/runes/runes"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gaze-network/indexer-network/pkg/reportingclientv2"
"github.com/gaze-network/uint128"
"github.com/samber/lo"
)
func (p *Processor) Process(ctx context.Context, blocks []*types.Block) error {
for _, block := range blocks {
ctx := logger.WithContext(ctx, slog.Int64("height", block.Header.Height))
logger.DebugContext(ctx, "Processing new block", slog.Int("txs", len(block.Transactions)))
ctx := logger.WithContext(ctx, slog.Int("block_height", int(block.Header.Height)))
logger.DebugContext(ctx, "[RunesProcessor] Processing block", slog.Int("txs", len(block.Transactions)))
for _, tx := range block.Transactions {
if err := p.processTx(ctx, tx, block.Header); err != nil {
return errors.Wrap(err, "failed to process tx")
}
}
if err := p.flushBlock(ctx, block.Header); err != nil {
return errors.Wrap(err, "failed to flush block")
}
logger.DebugContext(ctx, "Inserted new block")
}
return nil
}
@@ -69,6 +66,13 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
for runeId, balance := range balances {
unallocated[runeId] = unallocated[runeId].Add(balance.Amount)
p.newSpendOutPoints = append(p.newSpendOutPoints, balance.OutPoint)
logger.DebugContext(ctx, "[RunesProcessor] Found runes in tx input",
slogx.Any("runeId", runeId),
slogx.Stringer("amount", balance.Amount),
slogx.Stringer("txHash", balance.OutPoint.Hash),
slog.Int("txOutIndex", int(balance.OutPoint.Index)),
slog.Int("blockHeight", int(tx.BlockHeight)),
)
}
}
@@ -88,6 +92,13 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
}
allocated[output][runeId] = allocated[output][runeId].Add(amount)
unallocated[runeId] = unallocated[runeId].Sub(amount)
logger.DebugContext(ctx, "[RunesProcessor] Allocated runes to tx output",
slogx.Any("runeId", runeId),
slogx.Stringer("amount", amount),
slog.Int("output", output),
slogx.Stringer("txHash", tx.TxHash),
slog.Int("blockHeight", int(tx.BlockHeight)),
)
}
mints := make(map[runes.RuneId]uint128.Uint128)
@@ -120,6 +131,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
if !premine.IsZero() {
unallocated[etchedRuneId] = unallocated[etchedRuneId].Add(premine)
mints[etchedRuneId] = mints[etchedRuneId].Add(premine)
logger.DebugContext(ctx, "[RunesProcessor] Minted premine", slogx.Any("runeId", etchedRuneId), slogx.Stringer("amount", premine))
}
}
@@ -194,6 +206,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
// all input runes and minted runes in a tx with cenotaph are burned
for runeId, amount := range unallocated {
burns[runeId] = burns[runeId].Add(amount)
logger.DebugContext(ctx, "[RunesProcessor] Burned runes in cenotaph", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
}
} else {
// assign all un-allocated runes to the default output (pointer), or the first non
@@ -223,6 +236,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
// if pointer is still nil, then no output is available. Burn all unallocated runes.
for runeId, amount := range unallocated {
burns[runeId] = burns[runeId].Add(amount)
logger.DebugContext(ctx, "[RunesProcessor] Burned runes to no output", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
}
}
}
@@ -233,6 +247,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
// burn all allocated runes to OP_RETURN outputs
for runeId, amount := range balances {
burns[runeId] = burns[runeId].Add(amount)
logger.DebugContext(ctx, "[RunesProcessor] Burned runes to OP_RETURN output", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
}
continue
}
@@ -301,6 +316,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
}
}
p.newRuneTxs = append(p.newRuneTxs, &runeTx)
logger.DebugContext(ctx, "[RunesProcessor] created RuneTransaction", slogx.Any("runeTx", runeTx))
return nil
}
@@ -397,6 +413,7 @@ func (p *Processor) mint(ctx context.Context, runeId runes.RuneId, blockHeader t
if err := p.incrementMintCount(ctx, runeId, blockHeader); err != nil {
return uint128.Zero, errors.Wrap(err, "failed to increment mint count")
}
logger.DebugContext(ctx, "[RunesProcessor] Minted rune", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount), slogx.Stringer("mintCount", runeEntry.Mints), slogx.Stringer("cap", lo.FromPtr(runeEntry.Terms.Cap)))
return amount, nil
}
@@ -408,9 +425,11 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
if rune != nil {
minimumRune := runes.MinimumRuneAtHeight(p.network, uint64(tx.BlockHeight))
if rune.Cmp(minimumRune) < 0 {
logger.DebugContext(ctx, "invalid etching: rune is lower than minimum rune at this height", slogx.Any("rune", rune), slogx.Any("minimumRune", minimumRune))
return nil, runes.RuneId{}, runes.Rune{}, nil
}
if rune.IsReserved() {
logger.DebugContext(ctx, "invalid etching: rune is reserved", slogx.Any("rune", rune))
return nil, runes.RuneId{}, runes.Rune{}, nil
}
@@ -419,6 +438,7 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
return nil, runes.RuneId{}, runes.Rune{}, errors.Wrap(err, "error during check rune existence")
}
if ok {
logger.DebugContext(ctx, "invalid etching: rune already exists", slogx.Any("rune", rune))
return nil, runes.RuneId{}, runes.Rune{}, nil
}
@@ -428,6 +448,7 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
return nil, runes.RuneId{}, runes.Rune{}, errors.Wrap(err, "error during check tx commits to rune")
}
if !commit {
logger.DebugContext(ctx, "invalid etching: tx does not commit to the rune", slogx.Any("rune", rune))
return nil, runes.RuneId{}, runes.Rune{}, nil
}
} else {
@@ -443,7 +464,7 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
func (p *Processor) txCommitsToRune(ctx context.Context, tx *types.Transaction, rune runes.Rune) (bool, error) {
commitment := rune.Commitment()
for i, txIn := range tx.TxIn {
for _, txIn := range tx.TxIn {
tapscript, ok := extractTapScript(txIn.Witness)
if !ok {
continue
@@ -471,7 +492,8 @@ func (p *Processor) txCommitsToRune(ctx context.Context, tx *types.Transaction,
continue
}
if err != nil {
return false, errors.Wrapf(err, "can't get previous txout for txin `%v:%v`", tx.TxHash.String(), i)
logger.ErrorContext(ctx, "failed to get pk script at out point", err)
continue
}
pkScript := prevTx.TxOut[txIn.PreviousOutIndex].PkScript
// input utxo must be P2TR
@@ -554,6 +576,7 @@ func (p *Processor) createRuneEntry(ctx context.Context, runestone *runes.Runest
}
p.newRuneEntries[runeId] = runeEntry
p.newRuneEntryStates[runeId] = runeEntry
logger.DebugContext(ctx, "[RunesProcessor] created RuneEntry", slogx.Any("runeEntry", runeEntry))
return nil
}
@@ -607,6 +630,7 @@ func (p *Processor) incrementBurnedAmount(ctx context.Context, burned map[runes.
continue
}
runeEntry.BurnedAmount = runeEntry.BurnedAmount.Add(amount)
logger.DebugContext(ctx, "[RunesProcessor] burned amount incremented", slogx.Any("runeId", runeId), slogx.Any("amount", amount))
p.newRuneEntryStates[runeId] = runeEntry
}
return nil
@@ -674,10 +698,7 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
}
defer func() {
if err := runesDgTx.Rollback(ctx); err != nil {
logger.WarnContext(ctx, "failed to rollback transaction",
slogx.Error(err),
slogx.String("event", "rollback_runes_insertion"),
)
logger.ErrorContext(ctx, "[RunesProcessor] failed to rollback runes tx", err)
}
}()
@@ -789,7 +810,7 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
// submit event to reporting system
if p.reportingClient != nil {
if err := p.reportingClient.SubmitBlockReport(ctx, reportingclient.SubmitBlockReportPayload{
if err := p.reportingClient.SubmitBlockReport(ctx, reportingclientv2.SubmitBlockReportPayloadData{
Type: "runes",
ClientVersion: Version,
DBVersion: DBVersion,
@@ -803,5 +824,6 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
return errors.Wrap(err, "failed to submit block report")
}
}
logger.InfoContext(ctx, "[RunesProcessor] block flushed")
return nil
}

View File

@@ -631,37 +631,13 @@ func (q *Queries) GetRuneIdFromRune(ctx context.Context, rune string) (string, e
return rune_id, err
}
const getRuneTransactions = `-- name: GetRuneTransactions :many
SELECT hash, runes_transactions.block_height, index, timestamp, inputs, outputs, mints, burns, rune_etched, tx_hash, runes_runestones.block_height, etching, etching_divisibility, etching_premine, etching_rune, etching_spacers, etching_symbol, etching_terms, etching_terms_amount, etching_terms_cap, etching_terms_height_start, etching_terms_height_end, etching_terms_offset_start, etching_terms_offset_end, etching_turbo, edicts, mint, pointer, cenotaph, flaws FROM runes_transactions
LEFT JOIN runes_runestones ON runes_transactions.hash = runes_runestones.tx_hash
WHERE (
$1::BOOLEAN = FALSE -- if @filter_pk_script is TRUE, apply pk_script filter
OR runes_transactions.outputs @> $2::JSONB
OR runes_transactions.inputs @> $2::JSONB
) AND (
$3::BOOLEAN = FALSE -- if @filter_rune_id is TRUE, apply rune_id filter
OR runes_transactions.outputs @> $4::JSONB
OR runes_transactions.inputs @> $4::JSONB
OR runes_transactions.mints ? $5
OR runes_transactions.burns ? $5
OR (runes_transactions.rune_etched = TRUE AND runes_transactions.block_height = $6 AND runes_transactions.index = $7)
) AND (
$8::INT = 0 OR runes_transactions.block_height = $8::INT -- if @block_height > 0, apply block_height filter
)
const getRuneTransactionsByHeight = `-- name: GetRuneTransactionsByHeight :many
SELECT hash, runes_transactions.block_height, index, timestamp, inputs, outputs, mints, burns, rune_etched, tx_hash, runes_runestones.block_height, etching, etching_divisibility, etching_premine, etching_rune, etching_spacers, etching_symbol, etching_terms, etching_terms_amount, etching_terms_cap, etching_terms_height_start, etching_terms_height_end, etching_terms_offset_start, etching_terms_offset_end, etching_turbo, edicts, mint, pointer, cenotaph, flaws FROM runes_transactions
LEFT JOIN runes_runestones ON runes_transactions.hash = runes_runestones.tx_hash
WHERE runes_transactions.block_height = $1
`
type GetRuneTransactionsParams struct {
FilterPkScript bool
PkScriptParam []byte
FilterRuneID bool
RuneIDParam []byte
RuneID []byte
RuneIDBlockHeight int32
RuneIDTxIndex int32
BlockHeight int32
}
type GetRuneTransactionsRow struct {
type GetRuneTransactionsByHeightRow struct {
Hash string
BlockHeight int32
Index int32
@@ -694,24 +670,15 @@ type GetRuneTransactionsRow struct {
Flaws pgtype.Int4
}
func (q *Queries) GetRuneTransactions(ctx context.Context, arg GetRuneTransactionsParams) ([]GetRuneTransactionsRow, error) {
rows, err := q.db.Query(ctx, getRuneTransactions,
arg.FilterPkScript,
arg.PkScriptParam,
arg.FilterRuneID,
arg.RuneIDParam,
arg.RuneID,
arg.RuneIDBlockHeight,
arg.RuneIDTxIndex,
arg.BlockHeight,
)
func (q *Queries) GetRuneTransactionsByHeight(ctx context.Context, blockHeight int32) ([]GetRuneTransactionsByHeightRow, error) {
rows, err := q.db.Query(ctx, getRuneTransactionsByHeight, blockHeight)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetRuneTransactionsRow
var items []GetRuneTransactionsByHeightRow
for rows.Next() {
var i GetRuneTransactionsRow
var i GetRuneTransactionsByHeightRow
if err := rows.Scan(
&i.Hash,
&i.BlockHeight,

View File

@@ -306,7 +306,7 @@ func mapRuneTransactionTypeToParams(src entity.RuneTransaction) (gen.CreateRuneT
}, runestoneParams, nil
}
func extractModelRuneTxAndRunestone(src gen.GetRuneTransactionsRow) (gen.RunesTransaction, *gen.RunesRunestone, error) {
func extractModelRuneTxAndRunestone(src gen.GetRuneTransactionsByHeightRow) (gen.RunesTransaction, *gen.RunesRunestone, error) {
var runestone *gen.RunesRunestone
if src.TxHash.Valid {
// these fields should never be null

View File

@@ -3,7 +3,6 @@ package postgres
import (
"context"
"encoding/hex"
"fmt"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
@@ -62,21 +61,8 @@ func (r *Repository) GetIndexedBlockByHeight(ctx context.Context, height int64)
return indexedBlock, nil
}
func (r *Repository) GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, height uint64) ([]*entity.RuneTransaction, error) {
pkScriptParam := []byte(fmt.Sprintf(`[{"pkScript":"%s"}]`, hex.EncodeToString(pkScript)))
runeIdParam := []byte(fmt.Sprintf(`[{"runeId":"%s"}]`, runeId.String()))
rows, err := r.queries.GetRuneTransactions(ctx, gen.GetRuneTransactionsParams{
FilterPkScript: pkScript != nil,
PkScriptParam: pkScriptParam,
FilterRuneID: runeId != runes.RuneId{},
RuneIDParam: runeIdParam,
RuneID: []byte(runeId.String()),
RuneIDBlockHeight: int32(runeId.BlockHeight),
RuneIDTxIndex: int32(runeId.TxIndex),
BlockHeight: int32(height),
})
func (r *Repository) GetRuneTransactionsByHeight(ctx context.Context, height uint64) ([]*entity.RuneTransaction, error) {
rows, err := r.queries.GetRuneTransactionsByHeight(ctx, int32(height))
if err != nil {
return nil, errors.Wrap(err, "error during query")
}

View File

@@ -55,7 +55,7 @@ func (r *Repository) Rollback(ctx context.Context) error {
return errors.Wrap(err, "failed to rollback transaction")
}
if err == nil {
logger.DebugContext(ctx, "rolled back transaction")
logger.InfoContext(ctx, "rolled back transaction")
}
r.tx = nil
return nil

View File

@@ -5,11 +5,10 @@ import (
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
"github.com/gaze-network/indexer-network/modules/runes/runes"
)
func (u *Usecase) GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, height uint64) ([]*entity.RuneTransaction, error) {
txs, err := u.runesDg.GetRuneTransactions(ctx, pkScript, runeId, height)
func (u *Usecase) GetTransactionsByHeight(ctx context.Context, height uint64) ([]*entity.RuneTransaction, error) {
txs, err := u.runesDg.GetRuneTransactionsByHeight(ctx, height)
if err != nil {
return nil, errors.Wrap(err, "error during GetTransactionsByHeight")
}

105
pkg/crypto/crypto.go Normal file
View File

@@ -0,0 +1,105 @@
package crypto
import (
"encoding/base64"
"encoding/hex"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcec/v2/ecdsa"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/cockroachdb/errors"
ecies "github.com/ecies/go/v2"
)
type Client struct {
privateKey *btcec.PrivateKey
eciesPrivateKey *ecies.PrivateKey
}
func New(privateKeyStr string) (*Client, error) {
if privateKeyStr != "" {
privateKeyBytes, err := hex.DecodeString(privateKeyStr)
if err != nil {
return nil, errors.Wrap(err, "decode private key")
}
privateKey, _ := btcec.PrivKeyFromBytes(privateKeyBytes)
eciesPrivateKey := ecies.NewPrivateKeyFromBytes(privateKeyBytes)
return &Client{
privateKey: privateKey,
eciesPrivateKey: eciesPrivateKey,
}, nil
}
return &Client{}, nil
}
func (c *Client) PublicKey() string {
return hex.EncodeToString(c.privateKey.PubKey().SerializeCompressed())
}
func (c *Client) WIF(params *chaincfg.Params) (string, error) {
wif, err := btcutil.NewWIF(c.privateKey, params, true)
if err != nil {
return "", errors.Wrap(err, "wif convert failed")
}
return wif.String(), nil
}
func (c *Client) Sign(message string) string {
messageHash := chainhash.DoubleHashB([]byte(message))
signature := ecdsa.Sign(c.privateKey, messageHash)
return hex.EncodeToString(signature.Serialize())
}
func (c *Client) Verify(message, sigStr, pubKeyStr string) (bool, error) {
sigBytes, err := hex.DecodeString(sigStr)
if err != nil {
return false, errors.Wrap(err, "signature decode")
}
pubBytes, err := hex.DecodeString(pubKeyStr)
if err != nil {
return false, errors.Wrap(err, "pubkey decode")
}
pubKey, err := btcec.ParsePubKey(pubBytes)
if err != nil {
return false, errors.Wrap(err, "pubkey parse")
}
messageHash := chainhash.DoubleHashB([]byte(message))
signature, err := ecdsa.ParseSignature(sigBytes)
if err != nil {
return false, errors.Wrap(err, "signature parse")
}
return signature.Verify(messageHash, pubKey), nil
}
func (c *Client) Encrypt(message, pubKeyStr string) (string, error) {
pubKey, err := ecies.NewPublicKeyFromHex(pubKeyStr)
if err != nil {
return "", errors.Wrap(err, "parse pubkey")
}
ciphertext, err := ecies.Encrypt(pubKey, []byte(message))
if err != nil {
return "", errors.Wrap(err, "encrypt message")
}
ciphertextStr := base64.StdEncoding.EncodeToString(ciphertext)
return ciphertextStr, nil
}
func (c *Client) Decrypt(ciphertextStr string) (string, error) {
ciphertext, err := base64.StdEncoding.DecodeString(ciphertextStr)
if err != nil {
return "", errors.Wrap(err, "decode ciphertext")
}
plaintext, err := ecies.Decrypt(c.eciesPrivateKey, ciphertext)
if err != nil {
return "", errors.Wrap(err, "decrypt")
}
return string(plaintext), nil
}

65
pkg/crypto/crypto_test.go Normal file
View File

@@ -0,0 +1,65 @@
package crypto
import (
"testing"
"github.com/btcsuite/btcd/chaincfg"
"github.com/stretchr/testify/assert"
)
// Key for test only. DO NOT USE IN PRODUCTION
const (
privateKeyStr = "ce9c2fd75623e82a83ed743518ec7749f6f355f7301dd432400b087717fed2f2"
mainnetKey = "L49LKamtrPZxty5TG7jaFPHMRZbrvAr4Dvn5BHGdvmvbcTDNAbZj"
pubKeyStr = "0251e2dfcdeea17cc9726e4be0855cd0bae19e64f3e247b10760cd76851e7df47e"
)
func TestEncryptDecrypt(t *testing.T) {
plaintext := "hello world"
privClient, err := New(privateKeyStr)
assert.NoError(t, err)
pubClient, err := New("")
assert.NoError(t, err)
ciphertext, err := pubClient.Encrypt(plaintext, pubKeyStr)
assert.NoError(t, err)
decrypted, err := privClient.Decrypt(ciphertext)
assert.NoError(t, err)
assert.Equal(t, plaintext, decrypted)
}
func TestSignVerify(t *testing.T) {
plaintext := "hello world"
invalidSignature := "3044022066504a82e2bc23167214e05497a1ca957add9cacc078aa69f5417079a4d56f0002206b215920b046c779d4a58d4029c26dbadcaf6d3c884b3463f44e70ef9146c1cd"
privClient, err := New(privateKeyStr)
assert.NoError(t, err)
pubClient, err := New("")
assert.NoError(t, err)
signature := privClient.Sign(plaintext)
println(signature)
verified, err := pubClient.Verify(plaintext, signature, pubKeyStr)
assert.NoError(t, err)
assert.True(t, verified)
verified, err = pubClient.Verify(plaintext, invalidSignature, pubKeyStr)
assert.NoError(t, err)
assert.False(t, verified)
}
func TestWIF(t *testing.T) {
privClient, err := New(privateKeyStr)
assert.NoError(t, err)
wifPrivKey, err := privClient.WIF(&chaincfg.MainNetParams)
assert.NoError(t, err)
assert.Equal(t, wifPrivKey, mainnetKey)
}

View File

@@ -21,11 +21,7 @@ func NewHTTPErrorHandler() func(ctx *fiber.Ctx, err error) error {
return errors.WithStack(ctx.Status(e.Code).SendString(e.Error()))
}
logger.ErrorContext(ctx.UserContext(), "Something went wrong, unhandled api error",
slogx.String("event", "api_unhandled_error"),
slogx.Error(err),
)
logger.ErrorContext(ctx.UserContext(), "unhandled error", slogx.Error(err))
return errors.WithStack(ctx.Status(http.StatusInternalServerError).JSON(map[string]any{
"error": "Internal Server Error",
}))

View File

@@ -10,7 +10,6 @@ import (
"github.com/Cleverse/go-utilities/utils"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/valyala/fasthttp"
)
@@ -30,7 +29,7 @@ type Client struct {
func New(baseURL string, config ...Config) (*Client, error) {
if _, err := url.Parse(baseURL); err != nil {
return nil, errors.Join(errs.InvalidArgument, errors.Wrap(err, "can't parse base url"))
return nil, errors.Wrap(err, "can't parse base url")
}
var cf Config
if len(config) > 0 {
@@ -115,7 +114,7 @@ func (h *Client) request(ctx context.Context, reqOptions RequestOptions) (*HttpR
)
}
logger.InfoContext(ctx, "Finished make request", slog.String("package", "httpclient"))
logger.Info("Finished make request")
}
fasthttp.ReleaseResponse(resp)

View File

@@ -1,15 +0,0 @@
package logger
import (
"log/slog"
)
func durationToMsAttrReplacer(groups []string, attr slog.Attr) slog.Attr {
if attr.Value.Kind() == slog.KindDuration {
return slog.Attr{
Key: attr.Key,
Value: slog.Int64Value(attr.Value.Duration().Milliseconds()),
}
}
return attr
}

View File

@@ -7,18 +7,12 @@ import (
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
)
// NewGCPHandler returns a new GCP handler.
// The handler writes logs to the os.Stdout and
// replaces the default attribute keys/values with the GCP logging attribute keys/values
//
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
func NewGCPHandler(opts *slog.HandlerOptions) slog.Handler {
return slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true,
Level: opts.Level,
ReplaceAttr: attrReplacerChain(
GCPAttrReplacer,
durationToMsAttrReplacer,
opts.ReplaceAttr,
),
})

View File

@@ -34,9 +34,6 @@ func New(config Config) (*ReportingClient, error) {
if err != nil {
return nil, errors.Wrap(err, "can't create http client")
}
if config.Name == "" {
return nil, errors.New("reporting.name config is required if reporting is enabled")
}
return &ReportingClient{
httpClient: httpClient,
config: config,
@@ -56,8 +53,6 @@ type SubmitBlockReportPayload struct {
}
func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitBlockReportPayload) error {
ctx = logger.WithContext(ctx, slog.String("package", "reporting_client"), slog.Any("payload", payload))
body, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "can't marshal payload")
@@ -69,11 +64,9 @@ func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitB
return errors.Wrap(err, "can't send request")
}
if resp.StatusCode() >= 400 {
// TODO: unmashal response body and log it
logger.WarnContext(ctx, "Reporting block event failed", slog.Any("resp_body", resp.Body()))
return nil
logger.WarnContext(ctx, "failed to submit block report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
}
logger.DebugContext(ctx, "Reported block event")
logger.DebugContext(ctx, "block report submitted", slog.Any("payload", payload))
return nil
}
@@ -93,9 +86,6 @@ func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, n
WebsiteURL: r.config.WebsiteURL,
IndexerAPIURL: r.config.IndexerAPIURL,
}
ctx = logger.WithContext(ctx, slog.String("package", "reporting_client"), slog.Any("payload", payload))
body, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "can't marshal payload")
@@ -107,8 +97,8 @@ func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, n
return errors.Wrap(err, "can't send request")
}
if resp.StatusCode() >= 400 {
logger.WarnContext(ctx, "Reporting node info failed", slog.Any("resp_body", resp.Body()))
logger.WarnContext(ctx, "failed to submit node report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
}
logger.DebugContext(ctx, "Reported node info")
logger.InfoContext(ctx, "node report submitted", slog.Any("payload", payload))
return nil
}

View File

@@ -0,0 +1,168 @@
package reportingclientv2
import (
"context"
"encoding/json"
"log/slog"
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/pkg/crypto"
"github.com/gaze-network/indexer-network/pkg/httpclient"
"github.com/gaze-network/indexer-network/pkg/logger"
)
type Config struct {
Disabled bool `mapstructure:"disabled"`
ReportCenter ReportCenter `mapstructure:"report_center"`
NodeInfo NodeInfo `mapstructure:"node_info"`
}
type NodeInfo struct {
Name string `mapstructure:"name"`
WebsiteURL string `mapstructure:"website_url"`
APIURL string `mapstructure:"api_url"`
}
type ReportCenter struct {
BaseURL string `mapstructure:"base_url"`
PublicKey string `mapstructure:"public_key"`
}
type ReportingClient struct {
httpClient *httpclient.Client
cryptoClient *crypto.Client
config Config
}
const (
defaultBaseURL = "https://indexer.api.gaze.network"
defaultPublicKey = "039298683d53a1cbdb6f318d5ad4b12bc0d752f3a6cd62c19b2c22b1ae1e12fe05"
)
func New(config Config, indexerPrivateKey string) (*ReportingClient, error) {
config.ReportCenter.BaseURL = utils.Default(config.ReportCenter.BaseURL, defaultBaseURL)
config.ReportCenter.PublicKey = utils.Default(config.ReportCenter.PublicKey, defaultPublicKey)
httpClient, err := httpclient.New(config.ReportCenter.BaseURL)
if err != nil {
return nil, errors.Wrap(err, "can't create http client")
}
cryptoClient, err := crypto.New(indexerPrivateKey)
if err != nil {
return nil, errors.Wrap(err, "can't create crypto client")
}
return &ReportingClient{
httpClient: httpClient,
config: config,
cryptoClient: cryptoClient,
}, nil
}
type SubmitBlockReportPayload struct {
EncryptedData string `json:"encryptedData"`
IndexerPublicKey string `json:"indexerPublicKey"`
}
type SubmitBlockReportPayloadData struct {
Type string `json:"type"`
ClientVersion string `json:"clientVersion"`
DBVersion int `json:"dbVersion"`
EventHashVersion int `json:"eventHashVersion"`
Network common.Network `json:"network"`
BlockHeight uint64 `json:"blockHeight"`
BlockHash chainhash.Hash `json:"blockHash"`
EventHash chainhash.Hash `json:"eventHash"`
CumulativeEventHash chainhash.Hash `json:"cumulativeEventHash"`
IndexerPublicKey string `json:"indexerPublicKey"`
}
func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitBlockReportPayloadData) error {
payload.IndexerPublicKey = r.cryptoClient.PublicKey()
data, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "can't marshal payload")
}
encryptedData, err := r.cryptoClient.Encrypt(string(data), r.config.ReportCenter.PublicKey)
if err != nil {
return errors.Wrap(err, "can't encrypt data")
}
bodyStruct := SubmitBlockReportPayload{
EncryptedData: encryptedData,
IndexerPublicKey: r.cryptoClient.PublicKey(),
}
body, err := json.Marshal(bodyStruct)
if err != nil {
return errors.Wrap(err, "can't marshal payload")
}
resp, err := r.httpClient.Post(ctx, "/v2/report/block", httpclient.RequestOptions{
Body: body,
})
if err != nil {
return errors.Wrap(err, "can't send request")
}
if resp.StatusCode() >= 400 {
logger.WarnContext(ctx, "failed to submit block report", slog.Any("payload", bodyStruct), slog.Any("responseBody", resp.Body()))
} else {
logger.DebugContext(ctx, "block report submitted", slog.Any("payload", bodyStruct))
}
return nil
}
type SubmitNodeReportPayload struct {
Data SubmitNodeReportPayloadData `json:"data"`
IndexerPublicKey string `json:"indexerPublicKey"`
Signature string `json:"string"`
}
type SubmitNodeReportPayloadData struct {
Name string `json:"name"`
Type string `json:"type"`
Network common.Network `json:"network"`
WebsiteURL string `json:"websiteUrl,omitempty"`
APIURL string `json:"apiUrl,omitempty"`
IndexerPublicKey string `json:"indexerPublicKey"`
ClientVersion string `json:"clientVersion"`
}
func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, network common.Network, clientVersion string) error {
payload := SubmitNodeReportPayload{
Data: SubmitNodeReportPayloadData{
Name: r.config.NodeInfo.Name,
Type: module,
Network: network,
WebsiteURL: r.config.NodeInfo.WebsiteURL,
APIURL: r.config.NodeInfo.APIURL,
IndexerPublicKey: r.cryptoClient.PublicKey(),
ClientVersion: clientVersion,
},
IndexerPublicKey: r.cryptoClient.PublicKey(),
}
dataPayload, err := json.Marshal(payload.Data)
if err != nil {
return errors.Wrap(err, "can't marshal payload data")
}
payload.Signature = r.cryptoClient.Sign(string(dataPayload))
body, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "can't marshal payload")
}
resp, err := r.httpClient.Post(ctx, "/v2/report/node", httpclient.RequestOptions{
Body: body,
})
if err != nil {
return errors.Wrap(err, "can't send request")
}
if resp.StatusCode() >= 400 {
logger.WarnContext(ctx, "failed to submit node report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
} else {
logger.InfoContext(ctx, "node report submitted", slog.Any("payload", payload))
}
return nil
}

View File

@@ -0,0 +1,96 @@
package reportingclientv2
import (
"context"
"fmt"
"testing"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/gaze-network/indexer-network/common"
"github.com/stretchr/testify/assert"
)
// Key for test only. DO NOT USE IN PRODUCTION
const (
privateKeyStrOfficial = "ce9c2fd75623e82a83ed743518ec7749f6f355f7301dd432400b087717fed2f2"
mainnetKeyOfficial = "L2hqSEYLjRQHHSiSgfNSFwaZ1dYpwn4PUTt2nQT8AefzYGQCwsGY"
pubKeyStrOfficial = "0251e2dfcdeea17cc9726e4be0855cd0bae19e64f3e247b10760cd76851e7df47e"
)
const (
privateKeyStr1 = "a3a7d2c40c8bb7a4b4afa9a6b3ed613da1de233913ed07a017e6dd44ef542d80"
mainnetKey1 = "L49LKamtrPZxty5TG7jaFPHMRZbrvAr4Dvn5BHGdvmvbcTDNAbZj"
pubKeyStr1 = "02596e11b5a2104533d2732a3df35eadaeb61b189b9069715106e72e27c1de7775"
)
const (
privateKeyStr2 = "a3a7d2c40c8bb7a4b4afa9a6b3ed613da1de233913ed07a017e6dd44ef542d81"
// mainnetKey1 = "L49LKamtrPZxty5TG7jaFPHMRZbrvAr4Dvn5BHGdvmvbcTDNAbZj"
// pubKeyStr1 = "02596e11b5a2104533d2732a3df35eadaeb61b189b9069715106e72e27c1de7775"
)
func TestReport1(t *testing.T) {
block := 18
hash, err := chainhash.NewHashFromStr(fmt.Sprintf("%d", block))
assert.NoError(t, err)
configOfficial := Config{
ReportCenter: ReportCenter{
BaseURL: "https://indexer-dev.api.gaze.network",
PublicKey: defaultPublicKey,
},
NodeInfo: NodeInfo{
Name: "Official Node",
APIURL: "http://localhost:2000",
},
}
config1 := Config{
ReportCenter: ReportCenter{
BaseURL: "https://indexer-dev.api.gaze.network",
PublicKey: defaultPublicKey,
},
NodeInfo: NodeInfo{
Name: "Node 1",
APIURL: "http://localhost:2000",
},
}
config2 := Config{
ReportCenter: ReportCenter{
BaseURL: "https://indexer-dev.api.gaze.network",
PublicKey: defaultPublicKey,
},
NodeInfo: NodeInfo{
Name: "Node 2",
APIURL: "http://localhost:2000",
},
}
clientOfficial, err := New(configOfficial, privateKeyStrOfficial)
assert.NoError(t, err)
client1, err := New(config1, privateKeyStr1)
assert.NoError(t, err)
client2, err := New(config2, privateKeyStr2)
assert.NoError(t, err)
err = clientOfficial.SubmitNodeReport(context.Background(), "runes", "mainnet", "v0.0.1")
assert.NoError(t, err)
err = client1.SubmitNodeReport(context.Background(), "runes", "mainnet", "v0.0.1")
assert.NoError(t, err)
err = client2.SubmitNodeReport(context.Background(), "runes", "mainnet", "v0.0.1")
assert.NoError(t, err)
blockReport := SubmitBlockReportPayloadData{
Type: "runes",
ClientVersion: "v0.0.1",
DBVersion: 1,
EventHashVersion: 1,
Network: common.NetworkMainnet,
BlockHeight: uint64(block),
BlockHash: *hash,
EventHash: *hash,
CumulativeEventHash: *hash,
}
err = clientOfficial.SubmitBlockReport(context.Background(), blockReport)
err = client1.SubmitBlockReport(context.Background(), blockReport)
err = client2.SubmitBlockReport(context.Background(), blockReport)
}