mirror of
https://github.com/alexgo-io/gaze-indexer.git
synced 2026-03-28 23:48:31 +08:00
Compare commits
7 Commits
feature/do
...
feature/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b2da298ba | ||
|
|
bd9acb8841 | ||
|
|
58761b1ef5 | ||
|
|
85fdf79a00 | ||
|
|
e0e2934160 | ||
|
|
d56c58334a | ||
|
|
be316442ea |
1
.github/CODE_OF_CONDUCT.md
vendored
1
.github/CODE_OF_CONDUCT.md
vendored
@@ -1 +0,0 @@
|
||||
# Contributor Covenant Code of Conduct
|
||||
34
.github/CONTRIBUTING.md
vendored
34
.github/CONTRIBUTING.md
vendored
@@ -1,34 +0,0 @@
|
||||
# Contributing
|
||||
|
||||
Please note: we have a [code of conduct](https://github.com/gaze-network/gaze-indexer/blob/main/.github/CODE_OF_CONDUCT.md), please follow it in all your interactions with the Gaze Network project.
|
||||
|
||||
## Pull Requests or Commits
|
||||
|
||||
#### Message structured
|
||||
|
||||
```plaintext
|
||||
<type>(optional scope):<description>
|
||||
```
|
||||
|
||||
The `<type>` must be one of the following:
|
||||
|
||||
> feat:, refactor:, fix:, doc:, style:, perf:, test:, chore:, ci:, build:
|
||||
|
||||
- feat(runes): add Runes module to the project
|
||||
- refactor: change project structure
|
||||
- fix(btc): fix chain reorganization issue
|
||||
- doc: update \`run\` command documentation
|
||||
- style: fix linting issues
|
||||
- perf: improve performance of the bitcoin node datasource
|
||||
- test(runes): add unit tests for etching logic
|
||||
- chore: bump dependencies versions
|
||||
- ci: update CI configuration
|
||||
- build: update Dockerfile to use alpine
|
||||
|
||||
# 👍 Contribute
|
||||
|
||||
If you want to say **thank you** and/or support the active development of `Fiber`:
|
||||
|
||||
1. Add a [GitHub Star](https://github.com/gaze-network/gaze-indexer/stargazers) to the project.
|
||||
2. Follow and mention our [Twitter (𝕏)](https://twitter.com/Gaze_Network).
|
||||
3. Write a review or tutorial on [Medium](https://medium.com/), [Dev.to](https://dev.to/) or personal blog.
|
||||
22
.github/PULL_REQUEST_TEMPLATE.md
vendored
22
.github/PULL_REQUEST_TEMPLATE.md
vendored
@@ -1,22 +0,0 @@
|
||||
## Description
|
||||
|
||||
Please provide a clear and concise description of the changes you've made and the problem they address. Include the purpose of the change, any relevant issues it solves, and the benefits it brings to the project. If this change introduces new features or adjustments, highlight them here.
|
||||
|
||||
Fixes # (issue)
|
||||
|
||||
## Type of change
|
||||
|
||||
What types of changes does your code introduce to Appium?
|
||||
_Put an `x` in the boxes that apply_
|
||||
|
||||
- [ ] Bugfix (non-breaking change which fixes an issue)
|
||||
- [ ] New feature (non-breaking change which adds functionality)
|
||||
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
|
||||
- [ ] Enhancement (improvement to existing features and functionality)
|
||||
- [ ] Documentation update (changes to documentation)
|
||||
- [ ] Performance improvement (non-breaking change which improves efficiency)
|
||||
- [ ] Code consistency (non-breaking change which improves code reliability and robustness)
|
||||
|
||||
## Commit formatting
|
||||
|
||||
Please follow the commit message conventions for an easy way to identify the purpose or intention of a commit. Check out our commit message conventions in the [CONTRIBUTING.md](https://github.com/gaze-network/gaze-indexer/blob/main/.github/CONTRIBUTING.md#pull-requests-or-commits)
|
||||
77
.github/workflows/code-analysis.yml
vendored
77
.github/workflows/code-analysis.yml
vendored
@@ -1,77 +0,0 @@
|
||||
name: Code Analysis & Test
|
||||
on:
|
||||
workflow_dispatch:
|
||||
pull_request:
|
||||
branches:
|
||||
- develop
|
||||
- main
|
||||
paths:
|
||||
- "go.mod"
|
||||
- "go.sum"
|
||||
- "**.go"
|
||||
- ".golangci.yaml"
|
||||
- ".github/workflows/code-analysis.yml"
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
strategy:
|
||||
matrix:
|
||||
os: ["ubuntu-latest"]
|
||||
name: Lint (${{ matrix.os }})
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: "0"
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
cache-dependency-path: "**/*.sum"
|
||||
cache: true # caching and restoring go modules and build outputs.
|
||||
|
||||
- name: Lint
|
||||
uses: reviewdog/action-golangci-lint@v2
|
||||
with: # https://github.com/reviewdog/action-golangci-lint#inputs
|
||||
go_version_file: "go.mod"
|
||||
workdir: ./
|
||||
golangci_lint_flags: "--config=./.golangci.yaml --verbose --new-from-rev=${{ github.event.pull_request.base.sha }}"
|
||||
fail_on_error: true
|
||||
test:
|
||||
strategy:
|
||||
matrix:
|
||||
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
|
||||
go-version: ["1.22.x", "1.x"] # minimum version and latest version
|
||||
name: Test (${{ matrix.os }}/${{ matrix.go-version }})
|
||||
runs-on: ${{ matrix.os }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: "0"
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version: ${{ matrix.go-version }}
|
||||
cache: true # caching and restoring go modules and build outputs.
|
||||
- run: echo "GOVERSION=$(go version)" >> $GITHUB_ENV
|
||||
|
||||
- name: Build
|
||||
run: go build -v ./...
|
||||
|
||||
- name: Test
|
||||
run: go test -json ./... > test_output.json
|
||||
|
||||
- name: Summary Test Results
|
||||
if: always()
|
||||
uses: robherley/go-test-action@v0
|
||||
with:
|
||||
fromJSONFile: test_output.json
|
||||
|
||||
- name: Annotate Test Results
|
||||
if: always()
|
||||
uses: guyarb/golang-test-annotations@v0.5.1
|
||||
with:
|
||||
test-results: test_output.json
|
||||
2
.github/workflows/release-server.yml
vendored
2
.github/workflows/release-server.yml
vendored
@@ -32,6 +32,6 @@ jobs:
|
||||
uses: ./.github/workflows/reusable-build-and-push-ghcr.yml
|
||||
with:
|
||||
context: .
|
||||
dockerfile: ./docker/Dockerfile
|
||||
dockerfile: Dockerfile
|
||||
image-repo: "ghcr.io/gaze-network/gaze-indexer"
|
||||
image-tag: ${{ needs.prepare.outputs.tag }}
|
||||
|
||||
28
.github/workflows/sqlc-verify.yml
vendored
28
.github/workflows/sqlc-verify.yml
vendored
@@ -1,28 +0,0 @@
|
||||
name: Sqlc ORM Framework Verify
|
||||
on:
|
||||
workflow_dispatch:
|
||||
pull_request:
|
||||
branches:
|
||||
- develop
|
||||
- main
|
||||
paths:
|
||||
- "sqlc.yaml"
|
||||
- "**.sql"
|
||||
- ".github/workflows/sqlc-verify.yml"
|
||||
|
||||
jobs:
|
||||
sqlc-diff:
|
||||
name: Sqlc Diff Checker
|
||||
runs-on: "ubuntu-latest" # "self-hosted", "ubuntu-latest", "macos-latest", "windows-latest"
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: "0"
|
||||
|
||||
- name: Setup Sqlc
|
||||
uses: sqlc-dev/setup-sqlc@v4
|
||||
with:
|
||||
sqlc-version: "1.26.0"
|
||||
|
||||
- name: Check Diff
|
||||
run: sqlc diff
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -3,6 +3,8 @@
|
||||
# Eg. ignore.foo_test.go, ignore.credentials.json, ignore.config.yml
|
||||
ignore.*
|
||||
|
||||
/key
|
||||
|
||||
**/cmd.local/**
|
||||
**/cmd.local.**/**
|
||||
|
||||
|
||||
140
README.md
140
README.md
@@ -1,139 +1 @@
|
||||
<!-- omit from toc -->
|
||||
# Gaze Indexer
|
||||
Gaze Indexer is an open-source and modular indexing client for Bitcoin meta-protocols. It has support for Bitcoin and Runes out of the box, with **Unified Consistent APIs** across fungible token protocols.
|
||||
|
||||
Gaze Indexer is built with **modularity** in mind, allowing users to run all modules in one monolithic instance with a single command, or as a distributed cluster of micro-services.
|
||||
|
||||
Gaze Indexer serves as a foundation for building ANY meta-protocol indexers, with efficient data fetching, reorg detection, and database migration tool.
|
||||
This allows developers to focus on what **truly** matters: Meta-protocol indexing logic. New meta-protocols can be easily added by implementing new modules.
|
||||
|
||||
Gaze Indexer also comes with a block reporting system for verifying data integrity of indexers. Visit the [Gaze Network dashboard](https://dash.gaze.network) to see the status of other indexers.
|
||||
|
||||
- [Modules](#modules)
|
||||
- [1. Bitcoin](#1-bitcoin)
|
||||
- [2. Runes](#2-runes)
|
||||
- [Installation](#installation)
|
||||
- [Prerequisites](#prerequisites)
|
||||
- [1. Hardware Requirements](#1-hardware-requirements)
|
||||
- [2. Prepare Bitcoin Core RPC server.](#2-prepare-bitcoin-core-rpc-server)
|
||||
- [3. Prepare database.](#3-prepare-database)
|
||||
- [4. Prepare `config.yaml` file.](#4-prepare-configyaml-file)
|
||||
- [Install with Docker (recommended)](#install-with-docker-recommended)
|
||||
- [Install from source](#install-from-source)
|
||||
|
||||
## Modules
|
||||
### 1. Bitcoin
|
||||
The Bitcoin Indexer, the heart of every meta-protocol, is responsible for indexing **Bitcoin transactions, blocks, and UTXOs**. It requires a Bitcoin Core RPC as source of Bitcoin transactions,
|
||||
and stores the indexed data in database to be used by other modules.
|
||||
|
||||
### 2. Runes
|
||||
The Runes Indexer is our first meta-protocol indexer. It indexes Runes states, transactions, runestones, and balances using Bitcoin transactions.
|
||||
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://documenter.getpostman.com/view/28396285/2sA3Bn7Cxr) for full details.
|
||||
|
||||
## Installation
|
||||
### Prerequisites
|
||||
#### 1. Hardware Requirements
|
||||
Each module requires different hardware requirements.
|
||||
| Module | CPU | RAM |
|
||||
| ------- | ---------- | ------ |
|
||||
| Bitcoin | 0.25 cores | 256 MB |
|
||||
| Runes | 0.5 cores | 1 GB |
|
||||
|
||||
#### 2. Prepare Bitcoin Core RPC server.
|
||||
Gaze Indexer needs to fetch transaction data from a Bitcoin Core RPC, either self-hosted or using managed providers like QuickNode.
|
||||
To self host a Bitcoin Core, see https://bitcoin.org/en/full-node.
|
||||
|
||||
#### 3. Prepare database.
|
||||
Gaze Indexer has first-class support for PostgreSQL. If you wish to use other databases, you can implement your own database repository that satisfies each module's Data Gateway interface.
|
||||
Here is our minimum database disk space requirement for each module.
|
||||
| Module | Database Storage |
|
||||
| ------- | ---------------- |
|
||||
| Bitcoin | 240 GB |
|
||||
| Runes | 150 GB |
|
||||
|
||||
#### 4. Prepare `config.yaml` file.
|
||||
```yaml
|
||||
# config.yaml
|
||||
logger:
|
||||
output: text # Output format for logs. current supported formats: "text" | "json" | "gcp"
|
||||
debug: false
|
||||
|
||||
bitcoin_node:
|
||||
host: "" # [Required] Host of Bitcoin Core RPC (without https://)
|
||||
user: "" # Username to authenticate with Bitcoin Core RPC
|
||||
pass: "" # Password to authenticate with Bitcoin Core RPC
|
||||
disable_tls: false # Set to true to disable tls
|
||||
|
||||
network: mainnet # Network to run the indexer on. Current supported networks: "mainnet" | "testnet"
|
||||
|
||||
reporting: # Block reporting configuration options. See Block Reporting section for more details.
|
||||
disabled: false # Set to true to disable block reporting to Gaze Network. Default is false.
|
||||
base_url: "https://indexer.api.gaze.network" # Defaults to "https://indexer.api.gaze.network" if left empty
|
||||
name: "" # [Required if not disabled] Name of this indexer to show on the Gaze Network dashboard
|
||||
website_url: "" # Public website URL to show on the dashboard. Can be left empty.
|
||||
indexer_api_url: "" # Public url to access this indexer's API. Can be left empty if you want to keep your indexer private.
|
||||
|
||||
http_server:
|
||||
port: 8080 # Port to run the HTTP server on for modules with HTTP API handlers.
|
||||
|
||||
modules:
|
||||
bitcoin: # Configuration options for Bitcoin module. Can be removed if not used.
|
||||
database: "postgres" # Database to store bitcoin data. current supported databases: "postgres"
|
||||
postgres:
|
||||
host: "localhost"
|
||||
port: 5432
|
||||
user: "postgres"
|
||||
password: "password"
|
||||
db_name: "postgres"
|
||||
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database credentials above.
|
||||
runes: # Configuration options for Runes module. Can be removed if not used.
|
||||
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
|
||||
datasource: "database" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node" | "database". If "database" is used, it will use the database config in bitcoin module as datasource.
|
||||
api_handlers: # API handlers to enable. current supported handlers: "http"
|
||||
- http
|
||||
postgres:
|
||||
host: "localhost"
|
||||
port: 5432
|
||||
user: "postgres"
|
||||
password: "password"
|
||||
db_name: "postgres"
|
||||
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database credentials above.
|
||||
```
|
||||
|
||||
### Install with Docker (recommended)
|
||||
We will be using `docker-compose` for our installation guide. Make sure the `docker-compose.yaml` file is in the same directory as the `config.yaml` file.
|
||||
```yaml
|
||||
# docker-compose.yaml
|
||||
services:
|
||||
gaze-indexer:
|
||||
image: ghcr.io/gaze-network/gaze-indexer:v1.0.0
|
||||
container_name: gaze-indexer
|
||||
restart: unless-stopped
|
||||
volumes:
|
||||
- './config.yaml:/app/config.yaml' # mount config.yaml file to the container as "/app/config.yaml"
|
||||
command: ["/app/main", "run", "--bitcoin", "--runes"] # Put module flags after "run" commands to select which modules to run.
|
||||
```
|
||||
|
||||
### Install from source
|
||||
1. Install `go` version 1.22 or higher. See Go installation guide [here](https://go.dev/doc/install).
|
||||
2. Clone this repository.
|
||||
```bash
|
||||
git clone https://github.com/gaze-network/gaze-indexer.git
|
||||
cd gaze-indexer
|
||||
```
|
||||
3. Build the main binary.
|
||||
```bash
|
||||
# Get dependencies
|
||||
go mod download
|
||||
|
||||
# Build the main binary
|
||||
go build -o gaze main.go
|
||||
```
|
||||
4. Run the main binary with the `run` command and module flags.
|
||||
```bash
|
||||
./gaze run --bitcoin --runes
|
||||
```
|
||||
If `config.yaml` is not located at `./app/config.yaml`, use the `--config` flag to specify the path to the `config.yaml` file.
|
||||
```bash
|
||||
./gaze run --bitcoin --runes --config /path/to/config.yaml
|
||||
```
|
||||
# Gaze Indexer Network
|
||||
|
||||
12
cmd/cmd.go
12
cmd/cmd.go
@@ -14,14 +14,14 @@ var (
|
||||
// root command
|
||||
cmd = &cobra.Command{
|
||||
Use: "gaze",
|
||||
Long: `Description of gaze indexer`,
|
||||
Long: `Gaze in a Bitcoin meta-protocol indexer`,
|
||||
}
|
||||
|
||||
// sub-commands
|
||||
// sub-commandsf
|
||||
cmds = []*cobra.Command{
|
||||
NewVersionCommand(),
|
||||
NewRunCommand(),
|
||||
NewMigrateCommand(),
|
||||
NewGenerateKeypairCommand(),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -44,7 +44,7 @@ func Execute(ctx context.Context) {
|
||||
|
||||
// Initialize logger
|
||||
if err := logger.Init(config.Logger); err != nil {
|
||||
logger.PanicContext(ctx, "Something went wrong, can't init logger", slogx.Error(err), slog.Any("config", config.Logger))
|
||||
logger.Panic("Failed to initialize logger: %v", slogx.Error(err), slog.Any("config", config.Logger))
|
||||
}
|
||||
})
|
||||
|
||||
@@ -53,7 +53,7 @@ func Execute(ctx context.Context) {
|
||||
|
||||
// Execute command
|
||||
if err := cmd.ExecuteContext(ctx); err != nil {
|
||||
// Cobra will print the error message by default
|
||||
logger.DebugContext(ctx, "Error executing command", slogx.Error(err))
|
||||
// use cobra to log error message by default
|
||||
logger.Debug("Failed to execute root command", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
100
cmd/cmd_generate_keypair.go
Normal file
100
cmd/cmd_generate_keypair.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec/v2"
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/common/errs"
|
||||
"github.com/gaze-network/indexer-network/pkg/crypto"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
type generateKeypairCmdOptions struct {
|
||||
Path string
|
||||
}
|
||||
|
||||
func NewGenerateKeypairCommand() *cobra.Command {
|
||||
opts := &generateKeypairCmdOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "generate-keypair",
|
||||
Short: "Generate new public/private keypair for encryption and signature generation",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return generateKeypairHandler(opts, cmd, args)
|
||||
},
|
||||
}
|
||||
|
||||
flags := cmd.Flags()
|
||||
flags.StringVar(&opts.Path, "path", "/data/keys", `Path to save to key pair file`)
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func generateKeypairHandler(opts *generateKeypairCmdOptions, _ *cobra.Command, _ []string) error {
|
||||
fmt.Printf("Generating key pair\n")
|
||||
privKeyBytes := make([]byte, 32)
|
||||
|
||||
_, err := rand.Read(privKeyBytes)
|
||||
if err != nil {
|
||||
return errors.Wrap(errs.SomethingWentWrong, "random bytes")
|
||||
}
|
||||
_, pubKey := btcec.PrivKeyFromBytes(privKeyBytes)
|
||||
serializedPubKey := pubKey.SerializeCompressed()
|
||||
|
||||
// fmt.Println(hex.EncodeToString(privKeyBytes))
|
||||
|
||||
fmt.Printf("Public key: %s\n", hex.EncodeToString(serializedPubKey))
|
||||
err = os.MkdirAll(opts.Path, 0o755)
|
||||
if err != nil {
|
||||
return errors.Wrap(errs.SomethingWentWrong, "create directory")
|
||||
}
|
||||
|
||||
privateKeyPath := path.Join(opts.Path, "priv.key")
|
||||
|
||||
_, err = os.Stat(privateKeyPath)
|
||||
if err == nil {
|
||||
fmt.Printf("Existing private key found at %s\n[WARNING] THE EXISTING PRIVATE KEY WILL BE LOST\nType [replace] to replace existing private key: ", privateKeyPath)
|
||||
var ans string
|
||||
fmt.Scanln(&ans)
|
||||
if ans != "replace" {
|
||||
fmt.Printf("Keypair generation aborted\n")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
err = os.WriteFile(privateKeyPath, []byte(hex.EncodeToString(privKeyBytes)), 0o644)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "write private key file")
|
||||
}
|
||||
fmt.Printf("Private key saved at %s\n", privateKeyPath)
|
||||
|
||||
wifKeyPath := path.Join(opts.Path, "priv_wif_mainnet.key")
|
||||
client, err := crypto.New(hex.EncodeToString(privKeyBytes))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "new crypto client")
|
||||
}
|
||||
wifKey, err := client.WIF(&chaincfg.MainNetParams)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get WIF key")
|
||||
}
|
||||
|
||||
err = os.WriteFile(wifKeyPath, []byte(wifKey), 0o644)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "write WIF private key file")
|
||||
}
|
||||
fmt.Printf("WIF private key saved at %s\n", wifKeyPath)
|
||||
|
||||
publicKeyPath := path.Join(opts.Path, "pub.key")
|
||||
err = os.WriteFile(publicKeyPath, []byte(hex.EncodeToString(serializedPubKey)), 0o644)
|
||||
if err != nil {
|
||||
return errors.Wrap(errs.SomethingWentWrong, "write public key file")
|
||||
}
|
||||
fmt.Printf("Public key saved at %s\n", publicKeyPath)
|
||||
return nil
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/gaze-network/indexer-network/cmd/migrate"
|
||||
_ "github.com/golang-migrate/migrate/v4/database/postgres"
|
||||
_ "github.com/golang-migrate/migrate/v4/source/file"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func NewMigrateCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "migrate",
|
||||
Short: "Migrate database schema",
|
||||
}
|
||||
cmd.AddCommand(
|
||||
migrate.NewMigrateUpCommand(),
|
||||
migrate.NewMigrateDownCommand(),
|
||||
)
|
||||
return cmd
|
||||
}
|
||||
218
cmd/cmd_run.go
218
cmd/cmd_run.go
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
@@ -31,16 +30,13 @@ import (
|
||||
"github.com/gaze-network/indexer-network/pkg/errorhandler"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclient"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclientv2"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/fiber/v2/middleware/compress"
|
||||
fiberrecover "github.com/gofiber/fiber/v2/middleware/recover"
|
||||
"github.com/samber/lo"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
const (
|
||||
shutdownTimeout = 60 * time.Second
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type runCmdOptions struct {
|
||||
@@ -87,23 +83,22 @@ type HttpHandler interface {
|
||||
func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
conf := config.Load()
|
||||
|
||||
// Validate inputs
|
||||
{
|
||||
if !conf.Network.IsSupported() {
|
||||
return errors.Wrapf(errs.Unsupported, "%q network is not supported", conf.Network.String())
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize application process context
|
||||
// Initialize context
|
||||
ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
// Initialize worker context to separate worker's lifecycle from main process
|
||||
ctxWorker, stopWorker := context.WithCancel(context.Background())
|
||||
defer stopWorker()
|
||||
|
||||
// Add logger context
|
||||
ctxWorker = logger.WithContext(ctxWorker, slogx.Stringer("network", conf.Network))
|
||||
ctx = logger.WithContext(ctx, slogx.Stringer("network", conf.Network))
|
||||
|
||||
// Load private key
|
||||
privKeyPath := conf.NodeKey.Path
|
||||
if privKeyPath == "" {
|
||||
privKeyPath = "/data/keys/priv.key"
|
||||
}
|
||||
privKeyByte, err := os.ReadFile(privKeyPath)
|
||||
if err != nil {
|
||||
logger.PanicContext(ctx, "Failed to read private key file", slogx.Error(err))
|
||||
}
|
||||
|
||||
// Initialize Bitcoin Core RPC Client
|
||||
client, err := rpcclient.New(&rpcclient.ConnConfig{
|
||||
@@ -114,18 +109,19 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
HTTPPostMode: true,
|
||||
}, nil)
|
||||
if err != nil {
|
||||
logger.PanicContext(ctx, "Invalid Bitcoin node configuration", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "Failed to create Bitcoin Core RPC Client", slogx.Error(err))
|
||||
}
|
||||
defer client.Shutdown()
|
||||
|
||||
// Check Bitcoin RPC connection
|
||||
{
|
||||
start := time.Now()
|
||||
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
|
||||
if err := client.Ping(); err != nil {
|
||||
logger.PanicContext(ctx, "Can't connect to Bitcoin Core RPC Server", slogx.String("host", conf.BitcoinNode.Host), slogx.Error(err))
|
||||
}
|
||||
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server", slog.Duration("latency", time.Since(start)))
|
||||
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
|
||||
if err := client.Ping(); err != nil {
|
||||
logger.PanicContext(ctx, "Failed to ping Bitcoin Core RPC Server", slogx.Error(err))
|
||||
}
|
||||
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server")
|
||||
|
||||
// Validate network
|
||||
if !conf.Network.IsSupported() {
|
||||
return errors.Wrapf(errs.Unsupported, "%q network is not supported", conf.Network.String())
|
||||
}
|
||||
|
||||
// TODO: create module command package.
|
||||
@@ -134,20 +130,19 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
// TODO: refactor module name to specific type instead of string?
|
||||
httpHandlers := make(map[string]HttpHandler, 0)
|
||||
|
||||
var reportingClient *reportingclient.ReportingClient
|
||||
// use gracefulEG to coordinate graceful shutdown after context is done. (e.g. shut down http server, shutdown logic of each module, etc.)
|
||||
gracefulEG, gctx := errgroup.WithContext(context.Background())
|
||||
|
||||
var reportingClient *reportingclientv2.ReportingClient
|
||||
if !conf.Reporting.Disabled {
|
||||
reportingClient, err = reportingclient.New(conf.Reporting)
|
||||
reportingClient, err = reportingclientv2.New(conf.Reporting, string(privKeyByte)) // TODO: read private key from file
|
||||
if err != nil {
|
||||
if errors.Is(err, errs.InvalidArgument) {
|
||||
logger.PanicContext(ctx, "Invalid reporting configuration", slogx.Error(err))
|
||||
}
|
||||
logger.PanicContext(ctx, "Something went wrong, can't create reporting client", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "Failed to create reporting client", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize Bitcoin Indexer
|
||||
if opts.Bitcoin {
|
||||
ctx := logger.WithContext(ctx, slogx.String("module", "bitcoin"))
|
||||
var (
|
||||
btcDB btcdatagateway.BitcoinDataGateway
|
||||
indexerInfoDB btcdatagateway.IndexerInformationDataGateway
|
||||
@@ -156,70 +151,55 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
case "postgresql", "postgres", "pg":
|
||||
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
|
||||
if err != nil {
|
||||
if errors.Is(err, errs.InvalidArgument) {
|
||||
logger.PanicContext(ctx, "Invalid Postgres configuration for indexer", slogx.Error(err))
|
||||
}
|
||||
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
|
||||
}
|
||||
defer pg.Close()
|
||||
repo := btcpostgres.NewRepository(pg)
|
||||
btcDB = repo
|
||||
indexerInfoDB = repo
|
||||
default:
|
||||
return errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Bitcoin.Database)
|
||||
return errors.Wrapf(errs.Unsupported, "%q database is not supported", conf.Modules.Bitcoin.Database)
|
||||
}
|
||||
if !opts.APIOnly {
|
||||
processor := bitcoin.NewProcessor(conf, btcDB, indexerInfoDB)
|
||||
datasource := datasources.NewBitcoinNode(client)
|
||||
indexer := indexers.NewBitcoinIndexer(processor, datasource)
|
||||
defer func() {
|
||||
if err := indexer.ShutdownWithTimeout(shutdownTimeout); err != nil {
|
||||
logger.ErrorContext(ctx, "Error during shutdown indexer", slogx.Error(err))
|
||||
return
|
||||
}
|
||||
logger.InfoContext(ctx, "Indexer stopped gracefully")
|
||||
}()
|
||||
bitcoinProcessor := bitcoin.NewProcessor(conf, btcDB, indexerInfoDB)
|
||||
bitcoinNodeDatasource := datasources.NewBitcoinNode(client)
|
||||
bitcoinIndexer := indexers.NewBitcoinIndexer(bitcoinProcessor, bitcoinNodeDatasource)
|
||||
|
||||
// Verify states before running Indexer
|
||||
if err := processor.VerifyStates(ctx); err != nil {
|
||||
if err := bitcoinProcessor.VerifyStates(ctx); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Run Indexer
|
||||
go func() {
|
||||
// stop main process if indexer stopped
|
||||
defer stop()
|
||||
|
||||
logger.InfoContext(ctx, "Starting Gaze Indexer")
|
||||
if err := indexer.Run(ctxWorker); err != nil {
|
||||
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
|
||||
logger.InfoContext(ctx, "Starting Bitcoin Indexer")
|
||||
if err := bitcoinIndexer.Run(ctx); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to run Bitcoin Indexer", slogx.Error(err))
|
||||
}
|
||||
|
||||
// stop main process if Bitcoin Indexer failed
|
||||
logger.InfoContext(ctx, "Bitcoin Indexer stopped. Stopping main process...")
|
||||
stop()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize Runes Indexer
|
||||
if opts.Runes {
|
||||
ctx := logger.WithContext(ctx, slogx.String("module", "runes"))
|
||||
var (
|
||||
runesDg runesdatagateway.RunesDataGateway
|
||||
indexerInfoDg runesdatagateway.IndexerInfoDataGateway
|
||||
)
|
||||
var runesDg runesdatagateway.RunesDataGateway
|
||||
var indexerInfoDg runesdatagateway.IndexerInfoDataGateway
|
||||
switch strings.ToLower(conf.Modules.Runes.Database) {
|
||||
case "postgresql", "postgres", "pg":
|
||||
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
|
||||
if err != nil {
|
||||
if errors.Is(err, errs.InvalidArgument) {
|
||||
logger.PanicContext(ctx, "Invalid Postgres configuration for indexer", slogx.Error(err))
|
||||
}
|
||||
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
|
||||
}
|
||||
defer pg.Close()
|
||||
runesRepo := runespostgres.NewRepository(pg)
|
||||
runesDg = runesRepo
|
||||
indexerInfoDg = runesRepo
|
||||
default:
|
||||
return errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Runes.Database)
|
||||
logger.PanicContext(ctx, "Unsupported database", slogx.String("database", conf.Modules.Runes.Database))
|
||||
}
|
||||
var bitcoinDatasource indexers.BitcoinDatasource
|
||||
var bitcoinClient btcclient.Contract
|
||||
@@ -229,12 +209,9 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
bitcoinDatasource = bitcoinNodeDatasource
|
||||
bitcoinClient = bitcoinNodeDatasource
|
||||
case "database":
|
||||
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
|
||||
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
|
||||
if err != nil {
|
||||
if errors.Is(err, errs.InvalidArgument) {
|
||||
logger.PanicContext(ctx, "Invalid Postgres configuration for datasource", slogx.Error(err))
|
||||
}
|
||||
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
|
||||
}
|
||||
defer pg.Close()
|
||||
btcRepo := btcpostgres.NewRepository(pg)
|
||||
@@ -244,31 +221,24 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
default:
|
||||
return errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
|
||||
}
|
||||
|
||||
if !opts.APIOnly {
|
||||
processor := runes.NewProcessor(runesDg, indexerInfoDg, bitcoinClient, bitcoinDatasource, conf.Network, reportingClient)
|
||||
indexer := indexers.NewBitcoinIndexer(processor, bitcoinDatasource)
|
||||
defer func() {
|
||||
if err := indexer.ShutdownWithTimeout(shutdownTimeout); err != nil {
|
||||
logger.ErrorContext(ctx, "Error during shutdown indexer", slogx.Error(err))
|
||||
return
|
||||
}
|
||||
logger.InfoContext(ctx, "Indexer stopped gracefully")
|
||||
}()
|
||||
runesProcessor := runes.NewProcessor(runesDg, indexerInfoDg, bitcoinClient, bitcoinDatasource, conf.Network, reportingClient)
|
||||
runesIndexer := indexers.NewBitcoinIndexer(runesProcessor, bitcoinDatasource)
|
||||
|
||||
if err := processor.VerifyStates(ctx); err != nil {
|
||||
if err := runesProcessor.VerifyStates(ctx); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Run Indexer
|
||||
go func() {
|
||||
// stop main process if indexer stopped
|
||||
defer stop()
|
||||
|
||||
logger.InfoContext(ctx, "Starting Gaze Indexer")
|
||||
if err := indexer.Run(ctxWorker); err != nil {
|
||||
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
|
||||
logger.InfoContext(ctx, "Started Runes Indexer")
|
||||
if err := runesIndexer.Run(ctx); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to run Runes Indexer", slogx.Error(err))
|
||||
}
|
||||
|
||||
// stop main process if Runes Indexer failed
|
||||
logger.InfoContext(ctx, "Runes Indexer stopped. Stopping main process...")
|
||||
stop()
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -281,7 +251,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
runesHTTPHandler := runesapi.NewHTTPHandler(conf.Network, runesUsecase)
|
||||
httpHandlers["runes"] = runesHTTPHandler
|
||||
default:
|
||||
logger.PanicContext(ctx, "Something went wrong, unsupported API handler", slogx.String("handler", handler))
|
||||
logger.PanicContext(ctx, "Unsupported API handler", slogx.String("handler", handler))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -290,7 +260,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
// Setup HTTP server if there are any HTTP handlers
|
||||
if len(httpHandlers) > 0 {
|
||||
app := fiber.New(fiber.Config{
|
||||
AppName: "Gaze Indexer",
|
||||
AppName: "gaze",
|
||||
ErrorHandler: errorhandler.NewHTTPErrorHandler(),
|
||||
})
|
||||
app.
|
||||
@@ -299,72 +269,44 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
StackTraceHandler: func(c *fiber.Ctx, e interface{}) {
|
||||
buf := make([]byte, 1024) // bufLen = 1024
|
||||
buf = buf[:runtime.Stack(buf, false)]
|
||||
logger.ErrorContext(c.UserContext(), "Something went wrong, panic in http handler", slogx.Any("panic", e), slog.String("stacktrace", string(buf)))
|
||||
logger.ErrorContext(c.UserContext(), "panic in http handler", slogx.Any("panic", e), slog.String("stacktrace", string(buf)))
|
||||
},
|
||||
})).
|
||||
Use(compress.New(compress.Config{
|
||||
Level: compress.LevelDefault,
|
||||
}))
|
||||
|
||||
defer func() {
|
||||
if err := app.ShutdownWithTimeout(shutdownTimeout); err != nil {
|
||||
logger.ErrorContext(ctx, "Error during shutdown HTTP server", slogx.Error(err))
|
||||
return
|
||||
}
|
||||
logger.InfoContext(ctx, "HTTP server stopped gracefully")
|
||||
}()
|
||||
|
||||
// Health check
|
||||
app.Get("/", func(c *fiber.Ctx) error {
|
||||
return errors.WithStack(c.SendStatus(http.StatusOK))
|
||||
})
|
||||
|
||||
// mount http handlers from each http-enabled module
|
||||
for module, handler := range httpHandlers {
|
||||
if err := handler.Mount(app); err != nil {
|
||||
logger.PanicContext(ctx, "Something went wrong, can't mount HTTP handler", slogx.Error(err), slogx.String("module", module))
|
||||
logger.PanicContext(ctx, "Failed to mount HTTP handler", slogx.Error(err), slogx.String("module", module))
|
||||
}
|
||||
logger.InfoContext(ctx, "Mounted HTTP handler", slogx.String("module", module))
|
||||
}
|
||||
|
||||
go func() {
|
||||
// stop main process if API stopped
|
||||
defer stop()
|
||||
|
||||
logger.InfoContext(ctx, "Started HTTP server", slog.Int("port", conf.HTTPServer.Port))
|
||||
if err := app.Listen(fmt.Sprintf(":%d", conf.HTTPServer.Port)); err != nil {
|
||||
logger.PanicContext(ctx, "Something went wrong, error during running HTTP server", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "Failed to start HTTP server", slogx.Error(err))
|
||||
}
|
||||
}()
|
||||
// handle graceful shutdown
|
||||
gracefulEG.Go(func() error {
|
||||
<-ctx.Done()
|
||||
logger.InfoContext(gctx, "Stopping HTTP server...")
|
||||
if err := app.ShutdownWithTimeout(60 * time.Second); err != nil {
|
||||
logger.ErrorContext(gctx, "Error during shutdown HTTP server", slogx.Error(err))
|
||||
}
|
||||
logger.InfoContext(gctx, "HTTP server stopped gracefully")
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Stop application if worker context is done
|
||||
go func() {
|
||||
<-ctxWorker.Done()
|
||||
defer stop()
|
||||
|
||||
logger.InfoContext(ctx, "Gaze Indexer Worker is stopped. Stopping application...")
|
||||
}()
|
||||
|
||||
logger.InfoContext(ctxWorker, "Gaze Indexer started")
|
||||
|
||||
// Wait for interrupt signal to gracefully stop the server
|
||||
<-ctx.Done()
|
||||
|
||||
// Force shutdown if timeout exceeded or got signal again
|
||||
go func() {
|
||||
defer os.Exit(1)
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.FatalContext(ctx, "Received exit signal again. Force shutdown...")
|
||||
case <-time.After(shutdownTimeout + 15*time.Second):
|
||||
logger.FatalContext(ctx, "Shutdown timeout exceeded. Force shutdown...")
|
||||
}
|
||||
}()
|
||||
|
||||
// wait until all graceful shutdown goroutines are done before returning
|
||||
if err := gracefulEG.Wait(); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to shutdown gracefully", slogx.Error(err))
|
||||
} else {
|
||||
logger.InfoContext(ctx, "Successfully shut down gracefully")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,128 +0,0 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
_ "github.com/golang-migrate/migrate/v4/database/postgres"
|
||||
_ "github.com/golang-migrate/migrate/v4/source/file"
|
||||
"github.com/samber/lo"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
type migrateDownCmdOptions struct {
|
||||
DatabaseURL string
|
||||
Bitcoin bool
|
||||
Runes bool
|
||||
All bool
|
||||
}
|
||||
|
||||
type migrateDownCmdArgs struct {
|
||||
N int
|
||||
}
|
||||
|
||||
func (a *migrateDownCmdArgs) ParseArgs(args []string) error {
|
||||
if len(args) > 0 {
|
||||
// assume args already validated by cobra to be len(args) <= 1
|
||||
n, err := strconv.Atoi(args[0])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to parse N")
|
||||
}
|
||||
if n < 0 {
|
||||
return errors.New("N must be a positive integer")
|
||||
}
|
||||
a.N = n
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewMigrateDownCommand() *cobra.Command {
|
||||
opts := &migrateDownCmdOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "down [N]",
|
||||
Short: "Apply all or N down migrations",
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
Example: `gaze migrate down --database "postgres://postgres:postgres@localhost:5432/gaze-indexer?sslmode=disable"`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// args already validated by cobra
|
||||
var downArgs migrateDownCmdArgs
|
||||
if err := downArgs.ParseArgs(args); err != nil {
|
||||
return errors.Wrap(err, "failed to parse args")
|
||||
}
|
||||
return migrateDownHandler(opts, cmd, downArgs)
|
||||
},
|
||||
}
|
||||
|
||||
flags := cmd.Flags()
|
||||
flags.BoolVar(&opts.Bitcoin, "bitcoin", false, "Apply Bitcoin down migrations")
|
||||
flags.BoolVar(&opts.Runes, "runes", false, "Apply Runes down migrations")
|
||||
flags.StringVar(&opts.DatabaseURL, "database", "", "Database url to run migration on")
|
||||
flags.BoolVar(&opts.All, "all", false, "Confirm apply ALL down migrations without prompt")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func migrateDownHandler(opts *migrateDownCmdOptions, _ *cobra.Command, args migrateDownCmdArgs) error {
|
||||
if opts.DatabaseURL == "" {
|
||||
return errors.New("--database is required")
|
||||
}
|
||||
databaseURL, err := url.Parse(opts.DatabaseURL)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to parse database URL")
|
||||
}
|
||||
if _, ok := supportedDrivers[databaseURL.Scheme]; !ok {
|
||||
return errors.Errorf("unsupported database driver: %s", databaseURL.Scheme)
|
||||
}
|
||||
// prevent accidental down all migrations
|
||||
if args.N == 0 && !opts.All {
|
||||
input := ""
|
||||
fmt.Print("Are you sure you want to apply all down migrations? (y/N):")
|
||||
fmt.Scanln(&input)
|
||||
if !lo.Contains([]string{"y", "yes"}, strings.ToLower(input)) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
applyDownMigrations := func(module string, sourcePath string, migrationTable string) error {
|
||||
newDatabaseURL := cloneURLWithQuery(databaseURL, url.Values{"x-migrations-table": {migrationTable}})
|
||||
sourceURL := "file://" + sourcePath
|
||||
m, err := migrate.New(sourceURL, newDatabaseURL.String())
|
||||
m.Log = &consoleLogger{
|
||||
prefix: fmt.Sprintf("[%s] ", module),
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create Migrate instance")
|
||||
}
|
||||
if args.N == 0 {
|
||||
m.Log.Printf("Applying down migrations...\n")
|
||||
err = m.Down()
|
||||
} else {
|
||||
m.Log.Printf("Applying %d down migrations...\n", args.N)
|
||||
err = m.Steps(-args.N)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, migrate.ErrNoChange) {
|
||||
return errors.Wrapf(err, "failed to apply %s down migrations", module)
|
||||
}
|
||||
m.Log.Printf("No more down migrations to apply\n")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if opts.Bitcoin {
|
||||
if err := applyDownMigrations("Bitcoin", bitcoinMigrationSource, "bitcoin_schema_migrations"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
if opts.Runes {
|
||||
if err := applyDownMigrations("Runes", runesMigrationSource, "runes_schema_migrations"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,112 +0,0 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
_ "github.com/golang-migrate/migrate/v4/database/postgres"
|
||||
_ "github.com/golang-migrate/migrate/v4/source/file"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
type migrateUpCmdOptions struct {
|
||||
DatabaseURL string
|
||||
Bitcoin bool
|
||||
Runes bool
|
||||
}
|
||||
|
||||
type migrateUpCmdArgs struct {
|
||||
N int
|
||||
}
|
||||
|
||||
func (a *migrateUpCmdArgs) ParseArgs(args []string) error {
|
||||
if len(args) > 0 {
|
||||
// assume args already validated by cobra to be len(args) <= 1
|
||||
n, err := strconv.Atoi(args[0])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to parse N")
|
||||
}
|
||||
a.N = n
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewMigrateUpCommand() *cobra.Command {
|
||||
opts := &migrateUpCmdOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "up [N]",
|
||||
Short: "Apply all or N up migrations",
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
Example: `gaze migrate up --database "postgres://postgres:postgres@localhost:5432/gaze-indexer?sslmode=disable"`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// args already validated by cobra
|
||||
var upArgs migrateUpCmdArgs
|
||||
if err := upArgs.ParseArgs(args); err != nil {
|
||||
return errors.Wrap(err, "failed to parse args")
|
||||
}
|
||||
return migrateUpHandler(opts, cmd, upArgs)
|
||||
},
|
||||
}
|
||||
|
||||
flags := cmd.Flags()
|
||||
flags.BoolVar(&opts.Bitcoin, "bitcoin", false, "Apply Bitcoin up migrations")
|
||||
flags.BoolVar(&opts.Runes, "runes", false, "Apply Runes up migrations")
|
||||
flags.StringVar(&opts.DatabaseURL, "database", "", "Database url to run migration on")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func migrateUpHandler(opts *migrateUpCmdOptions, _ *cobra.Command, args migrateUpCmdArgs) error {
|
||||
if opts.DatabaseURL == "" {
|
||||
return errors.New("--database is required")
|
||||
}
|
||||
databaseURL, err := url.Parse(opts.DatabaseURL)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to parse database URL")
|
||||
}
|
||||
if _, ok := supportedDrivers[databaseURL.Scheme]; !ok {
|
||||
return errors.Errorf("unsupported database driver: %s", databaseURL.Scheme)
|
||||
}
|
||||
|
||||
applyUpMigrations := func(module string, sourcePath string, migrationTable string) error {
|
||||
newDatabaseURL := cloneURLWithQuery(databaseURL, url.Values{"x-migrations-table": {migrationTable}})
|
||||
sourceURL := "file://" + sourcePath
|
||||
m, err := migrate.New(sourceURL, newDatabaseURL.String())
|
||||
m.Log = &consoleLogger{
|
||||
prefix: fmt.Sprintf("[%s] ", module),
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create Migrate instance")
|
||||
}
|
||||
if args.N == 0 {
|
||||
m.Log.Printf("Applying up migrations...\n")
|
||||
err = m.Up()
|
||||
} else {
|
||||
m.Log.Printf("Applying %d up migrations...\n", args.N)
|
||||
err = m.Steps(args.N)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, migrate.ErrNoChange) {
|
||||
return errors.Wrapf(err, "failed to apply %s up migrations", module)
|
||||
}
|
||||
m.Log.Printf("Migrations already up-to-date\n")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if opts.Bitcoin {
|
||||
if err := applyUpMigrations("Bitcoin", bitcoinMigrationSource, "bitcoin_schema_migrations"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
if opts.Runes {
|
||||
if err := applyUpMigrations("Runes", runesMigrationSource, "runes_schema_migrations"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
)
|
||||
|
||||
var _ migrate.Logger = (*consoleLogger)(nil)
|
||||
|
||||
type consoleLogger struct {
|
||||
prefix string
|
||||
verbose bool
|
||||
}
|
||||
|
||||
func (l *consoleLogger) Printf(format string, v ...interface{}) {
|
||||
fmt.Printf(l.prefix+format, v...)
|
||||
}
|
||||
|
||||
func (l *consoleLogger) Verbose() bool {
|
||||
return l.verbose
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
package migrate
|
||||
|
||||
import "net/url"
|
||||
|
||||
const (
|
||||
bitcoinMigrationSource = "modules/bitcoin/database/postgresql/migrations"
|
||||
runesMigrationSource = "modules/runes/database/postgresql/migrations"
|
||||
)
|
||||
|
||||
func cloneURLWithQuery(u *url.URL, newQuery url.Values) *url.URL {
|
||||
clone := *u
|
||||
query := clone.Query()
|
||||
for key, values := range newQuery {
|
||||
for _, value := range values {
|
||||
query.Add(key, value)
|
||||
}
|
||||
}
|
||||
clone.RawQuery = query.Encode()
|
||||
return &clone
|
||||
}
|
||||
|
||||
var supportedDrivers = map[string]struct{}{
|
||||
"postgres": {},
|
||||
"postgresql": {},
|
||||
}
|
||||
@@ -32,7 +32,7 @@ modules:
|
||||
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database connection configurations
|
||||
runes:
|
||||
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
|
||||
datasource: "postgres" # Data source to be used (to fetch bitcoin blocks). current supported data sources: "bitcoin-node" | "postgres"
|
||||
datasource: "bitcoin-node" # Data source to be used. current supported data sources: "bitcoin-node" | "postgres"
|
||||
api_handlers: # API handlers to be used. current supported handlers: "http"
|
||||
- http
|
||||
postgres:
|
||||
|
||||
@@ -39,7 +39,7 @@ func NewBitcoinNode(btcclient *rpcclient.Client) *BitcoinNodeDatasource {
|
||||
}
|
||||
|
||||
func (p BitcoinNodeDatasource) Name() string {
|
||||
return "bitcoin_node"
|
||||
return "BitcoinNode"
|
||||
}
|
||||
|
||||
// Fetch polling blocks from Bitcoin node
|
||||
@@ -83,11 +83,6 @@ func (d *BitcoinNodeDatasource) Fetch(ctx context.Context, from, to int64) ([]*t
|
||||
// - from: block height to start fetching, if -1, it will start from genesis block
|
||||
// - to: block height to stop fetching, if -1, it will fetch until the latest block
|
||||
func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
|
||||
ctx = logger.WithContext(ctx,
|
||||
slogx.String("package", "datasources"),
|
||||
slogx.String("datasource", d.Name()),
|
||||
)
|
||||
|
||||
from, to, skip, err := d.prepareRange(from, to)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to prepare fetch range")
|
||||
@@ -143,10 +138,10 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
if errors.Is(err, errs.Closed) {
|
||||
return
|
||||
}
|
||||
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
|
||||
logger.WarnContext(ctx, "failed while dispatch block",
|
||||
slogx.Error(err),
|
||||
slogx.Int64("start", data[0].Header.Height),
|
||||
slogx.Int64("end", data[len(data)-1].Header.Height),
|
||||
slogx.Error(err),
|
||||
)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
@@ -162,7 +157,6 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
done := subscription.Done()
|
||||
chunks := lo.Chunk(blockHeights, blockStreamChunkSize)
|
||||
for _, chunk := range chunks {
|
||||
// TODO: Implement throttling logic to control the rate of fetching blocks (block/sec)
|
||||
chunk := chunk
|
||||
select {
|
||||
case <-done:
|
||||
@@ -173,11 +167,12 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
stream.Go(func() []*types.Block {
|
||||
startAt := time.Now()
|
||||
defer func() {
|
||||
logger.DebugContext(ctx, "Fetched chunk of blocks from Bitcoin node",
|
||||
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched blocks",
|
||||
slogx.Int("total_blocks", len(chunk)),
|
||||
slogx.Int64("from", chunk[0]),
|
||||
slogx.Int64("to", chunk[len(chunk)-1]),
|
||||
slogx.Duration("duration", time.Since(startAt)),
|
||||
slogx.Stringer("duration", time.Since(startAt)),
|
||||
slogx.Int64("duration_ms", time.Since(startAt).Milliseconds()),
|
||||
)
|
||||
}()
|
||||
// TODO: should concurrent fetch block or not ?
|
||||
@@ -185,21 +180,22 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
for _, height := range chunk {
|
||||
hash, err := d.btcclient.GetBlockHash(height)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Can't get block hash from Bitcoin node rpc", slogx.Error(err), slogx.Int64("height", height))
|
||||
logger.ErrorContext(ctx, "failed to get block hash", slogx.Error(err), slogx.Int64("height", height))
|
||||
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get block hash: height: %d", height)); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
block, err := d.btcclient.GetBlock(hash)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Can't get block data from Bitcoin node rpc", slogx.Error(err), slogx.Int64("height", height))
|
||||
logger.ErrorContext(ctx, "failed to get block", slogx.Error(err), slogx.Int64("height", height))
|
||||
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get block: height: %d, hash: %s", height, hash)); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched block", slogx.Int64("height", height), slogx.String("hash", hash.String()))
|
||||
|
||||
blocks = append(blocks, types.ParseMsgBlock(block, height))
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package indexers
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
@@ -31,10 +30,6 @@ type BitcoinIndexer struct {
|
||||
Processor BitcoinProcessor
|
||||
Datasource BitcoinDatasource
|
||||
currentBlock types.BlockHeader
|
||||
|
||||
quitOnce sync.Once
|
||||
quit chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewBitcoinIndexer create new BitcoinIndexer
|
||||
@@ -42,46 +37,12 @@ func NewBitcoinIndexer(processor BitcoinProcessor, datasource BitcoinDatasource)
|
||||
return &BitcoinIndexer{
|
||||
Processor: processor,
|
||||
Datasource: datasource,
|
||||
|
||||
quit: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (*BitcoinIndexer) Type() string {
|
||||
return "bitcoin"
|
||||
}
|
||||
|
||||
func (i *BitcoinIndexer) Shutdown() error {
|
||||
return i.ShutdownWithContext(context.Background())
|
||||
}
|
||||
|
||||
func (i *BitcoinIndexer) ShutdownWithTimeout(timeout time.Duration) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
return i.ShutdownWithContext(ctx)
|
||||
}
|
||||
|
||||
func (i *BitcoinIndexer) ShutdownWithContext(ctx context.Context) (err error) {
|
||||
i.quitOnce.Do(func() {
|
||||
close(i.quit)
|
||||
select {
|
||||
case <-i.done:
|
||||
case <-time.After(180 * time.Second):
|
||||
err = errors.Wrap(errs.Timeout, "indexer shutdown timeout")
|
||||
case <-ctx.Done():
|
||||
err = errors.Wrap(ctx.Err(), "indexer shutdown context canceled")
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
|
||||
defer close(i.done)
|
||||
|
||||
ctx = logger.WithContext(ctx,
|
||||
slog.String("package", "indexers"),
|
||||
slog.String("indexer", i.Type()),
|
||||
slog.String("indexer", "bitcoin"),
|
||||
slog.String("processor", i.Processor.Name()),
|
||||
slog.String("datasource", i.Datasource.Name()),
|
||||
)
|
||||
@@ -95,41 +56,38 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
|
||||
i.currentBlock.Height = -1
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(pollingInterval)
|
||||
// TODO:
|
||||
// - compare db version in constants and database
|
||||
// - compare current network and local indexed network
|
||||
// - update indexer stats
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-i.quit:
|
||||
logger.InfoContext(ctx, "Got quit signal, stopping indexer")
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := i.process(ctx); err != nil {
|
||||
logger.ErrorContext(ctx, "Indexer failed while processing", slogx.Error(err))
|
||||
return errors.Wrap(err, "process failed")
|
||||
logger.ErrorContext(ctx, "failed to process", slogx.Error(err))
|
||||
return errors.Wrap(err, "failed to process")
|
||||
}
|
||||
logger.DebugContext(ctx, "Waiting for next polling interval")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
// height range to fetch data
|
||||
from, to := i.currentBlock.Height+1, int64(-1)
|
||||
|
||||
logger.InfoContext(ctx, "Start fetching bitcoin blocks", slog.Int64("from", from))
|
||||
ch := make(chan []*types.Block)
|
||||
from, to := i.currentBlock.Height+1, int64(-1)
|
||||
logger.InfoContext(ctx, "Fetching blocks", slog.Int64("from", from), slog.Int64("to", to))
|
||||
subscription, err := i.Datasource.FetchAsync(ctx, from, to, ch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to fetch data")
|
||||
return errors.Wrap(err, "failed to call fetch async")
|
||||
}
|
||||
defer subscription.Unsubscribe()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-i.quit:
|
||||
return nil
|
||||
case blocks := <-ch:
|
||||
// empty blocks
|
||||
if len(blocks) == 0 {
|
||||
@@ -138,6 +96,7 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
|
||||
startAt := time.Now()
|
||||
ctx := logger.WithContext(ctx,
|
||||
slog.Int("total_blocks", len(blocks)),
|
||||
slogx.Int64("from", blocks[0].Header.Height),
|
||||
slogx.Int64("to", blocks[len(blocks)-1].Header.Height),
|
||||
)
|
||||
@@ -146,8 +105,7 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
{
|
||||
remoteBlockHeader := blocks[0].Header
|
||||
if !remoteBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
|
||||
logger.WarnContext(ctx, "Detected chain reorganization. Searching for fork point...",
|
||||
slogx.String("event", "reorg_detected"),
|
||||
logger.WarnContext(ctx, "Reorg detected",
|
||||
slogx.Stringer("current_hash", i.currentBlock.Hash),
|
||||
slogx.Stringer("expected_hash", remoteBlockHeader.PrevBlock),
|
||||
)
|
||||
@@ -186,15 +144,12 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
return errors.Wrap(errs.SomethingWentWrong, "reorg look back limit reached")
|
||||
}
|
||||
|
||||
logger.InfoContext(ctx, "Found reorg fork point, starting to revert data...",
|
||||
slogx.String("event", "reorg_forkpoint"),
|
||||
slogx.Int64("since", beforeReorgBlockHeader.Height+1),
|
||||
slogx.Int64("total_blocks", i.currentBlock.Height-beforeReorgBlockHeader.Height),
|
||||
slogx.Duration("search_duration", time.Since(start)),
|
||||
)
|
||||
|
||||
// Revert all data since the reorg block
|
||||
start = time.Now()
|
||||
logger.WarnContext(ctx, "reverting reorg data",
|
||||
slogx.Int64("reorg_from", beforeReorgBlockHeader.Height+1),
|
||||
slogx.Int64("total_reorg_blocks", i.currentBlock.Height-beforeReorgBlockHeader.Height),
|
||||
slogx.Stringer("detect_duration", time.Since(start)),
|
||||
)
|
||||
if err := i.Processor.RevertData(ctx, beforeReorgBlockHeader.Height+1); err != nil {
|
||||
return errors.Wrap(err, "failed to revert data")
|
||||
}
|
||||
@@ -202,9 +157,10 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
// Set current block to before reorg block and
|
||||
// end current round to fetch again
|
||||
i.currentBlock = beforeReorgBlockHeader
|
||||
logger.Info("Fixing chain reorganization completed",
|
||||
slogx.Int64("current_block", i.currentBlock.Height),
|
||||
slogx.Duration("duration", time.Since(start)),
|
||||
logger.Info("Reverted data successfully",
|
||||
slogx.Any("current_block", i.currentBlock),
|
||||
slogx.Stringer("duration", time.Since(start)),
|
||||
slogx.Int64("duration_ms", time.Since(start).Milliseconds()),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
@@ -217,15 +173,12 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
if !blocks[i].Header.PrevBlock.IsEqual(&blocks[i-1].Header.Hash) {
|
||||
logger.WarnContext(ctx, "Chain Reorganization occurred in the middle of batch fetching blocks, need to try to fetch again")
|
||||
|
||||
logger.WarnContext(ctx, "reorg occurred while batch fetching blocks, need to try to fetch again")
|
||||
// end current round
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
ctx = logger.WithContext(ctx, slog.Int("total_blocks", len(blocks)))
|
||||
|
||||
// Start processing blocks
|
||||
logger.InfoContext(ctx, "Processing blocks")
|
||||
if err := i.Processor.Process(ctx, blocks); err != nil {
|
||||
@@ -236,9 +189,8 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
i.currentBlock = blocks[len(blocks)-1].Header
|
||||
|
||||
logger.InfoContext(ctx, "Processed blocks successfully",
|
||||
slogx.String("event", "processed_blocks"),
|
||||
slogx.Int64("current_block", i.currentBlock.Height),
|
||||
slogx.Duration("duration", time.Since(startAt)),
|
||||
slogx.Stringer("duration", time.Since(startAt)),
|
||||
slogx.Int64("duration_ms", time.Since(startAt).Milliseconds()),
|
||||
)
|
||||
case <-subscription.Done():
|
||||
// end current round
|
||||
|
||||
@@ -2,22 +2,12 @@ package indexers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
)
|
||||
|
||||
const (
|
||||
// pollingInterval is the default polling interval for the indexer polling worker
|
||||
pollingInterval = 15 * time.Second
|
||||
)
|
||||
|
||||
type IndexerWorker interface {
|
||||
Type() string
|
||||
Run(ctx context.Context) error
|
||||
Shutdown() error
|
||||
ShutdownWithTimeout(timeout time.Duration) error
|
||||
ShutdownWithContext(ctx context.Context) error
|
||||
}
|
||||
|
||||
type Processor[T any] interface {
|
||||
|
||||
1
docker/.gitignore
vendored
1
docker/.gitignore
vendored
@@ -1 +0,0 @@
|
||||
volumes
|
||||
@@ -1,43 +0,0 @@
|
||||
logger:
|
||||
output: text
|
||||
debug: false
|
||||
|
||||
bitcoin_node:
|
||||
host: "bitcoin-mainnet-archive.allthatnode.com"
|
||||
user: "user"
|
||||
pass: "pass"
|
||||
disable_tls: false
|
||||
|
||||
network: mainnet
|
||||
|
||||
reporting:
|
||||
disabled: false
|
||||
base_url: "https://staging.indexer.api.gaze.network" # defaults to "https://indexer.api.gaze.network" if empty
|
||||
name: "Local Indexer"
|
||||
website_url: "" # public website URL to show on the dashboard. Can be left empty.
|
||||
indexer_api_url: "" # public url to access this api. Can be left empty.
|
||||
|
||||
้http_server:
|
||||
port: 8080
|
||||
|
||||
modules:
|
||||
bitcoin:
|
||||
database: "postgres" # Store bitcoin data in postgres
|
||||
postgres:
|
||||
host: "db"
|
||||
port: 5432
|
||||
user: "postgres"
|
||||
password: "password"
|
||||
db_name: "postgres"
|
||||
|
||||
runes:
|
||||
database: "postgres" # Store Runes data in postgres
|
||||
datasource: "postgres" # Fetch bitcoin blocks from postgres
|
||||
api_handlers:
|
||||
- http
|
||||
postgres:
|
||||
host: "db"
|
||||
port: 5432
|
||||
user: "postgres"
|
||||
password: "password"
|
||||
db_name: "postgres"
|
||||
@@ -1,47 +0,0 @@
|
||||
services:
|
||||
# TODO: need to mount migrations folder
|
||||
gaze-migrator:
|
||||
# image: ghcr.io/gaze-network/gaze-indexer:v0.1.0
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: ./docker/Dockerfile
|
||||
restart: on-failure
|
||||
depends_on:
|
||||
- db
|
||||
networks:
|
||||
- indexer
|
||||
command:
|
||||
["/app/main", "migrate", "up", "--bitcoin", "--runes", "--database", "postgres://postgres:password@db:5432/postgres?sslmode=disable"]
|
||||
|
||||
gaze-indexer:
|
||||
# image: ghcr.io/gaze-network/gaze-indexer:v0.1.0
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: ./docker/Dockerfile
|
||||
restart: unless-stopped
|
||||
depends_on:
|
||||
- db
|
||||
- gaze-migrator:
|
||||
condition: service_completed_successfully
|
||||
networks:
|
||||
- indexer
|
||||
ports:
|
||||
- 8080:8080
|
||||
volumes:
|
||||
- "./config.example.yaml:/app/config.yaml"
|
||||
command: ["/app/main", "run", "--bitcoin", "--runes"]
|
||||
|
||||
db:
|
||||
image: postgres:16-alpine
|
||||
volumes:
|
||||
- "./volumes/postgresql/data:/var/lib/postgresql/data"
|
||||
networks:
|
||||
- indexer
|
||||
environment:
|
||||
- POSTGRES_USER=postgres
|
||||
- POSTGRES_PASSWORD=password
|
||||
- POSTGRES_DB=postgres
|
||||
|
||||
networks:
|
||||
indexer:
|
||||
driver: bridge
|
||||
18
go.mod
18
go.mod
@@ -10,7 +10,6 @@ require (
|
||||
github.com/cockroachdb/errors v1.11.1
|
||||
github.com/gaze-network/uint128 v1.3.0
|
||||
github.com/gofiber/fiber/v2 v2.52.4
|
||||
github.com/golang-migrate/migrate/v4 v4.17.1
|
||||
github.com/jackc/pgx v3.6.2+incompatible
|
||||
github.com/jackc/pgx/v5 v5.5.5
|
||||
github.com/mcosta74/pgx-slog v0.3.0
|
||||
@@ -21,28 +20,27 @@ require (
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.18.2
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/valyala/fasthttp v1.51.0
|
||||
go.uber.org/automaxprocs v1.5.3
|
||||
golang.org/x/sync v0.5.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/andybalholm/brotli v1.0.5 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
|
||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
|
||||
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
|
||||
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
|
||||
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
|
||||
github.com/cockroachdb/redact v1.1.5 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
|
||||
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
github.com/ecies/go/v2 v2.0.9 // indirect
|
||||
github.com/ethereum/go-ethereum v1.13.5 // indirect
|
||||
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
||||
github.com/getsentry/sentry-go v0.18.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/google/uuid v1.5.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
@@ -51,7 +49,6 @@ require (
|
||||
github.com/klauspost/compress v1.17.0 // indirect
|
||||
github.com/kr/pretty v0.3.1 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/lib/pq v1.10.9 // indirect
|
||||
github.com/magiconair/properties v1.8.7 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
@@ -69,12 +66,13 @@ require (
|
||||
github.com/spf13/cast v1.6.0 // indirect
|
||||
github.com/subosito/gotenv v1.6.0 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasthttp v1.51.0 // indirect
|
||||
github.com/valyala/tcplisten v1.0.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.9.0 // indirect
|
||||
golang.org/x/crypto v0.20.0 // indirect
|
||||
golang.org/x/crypto v0.17.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
|
||||
golang.org/x/sys v0.17.0 // indirect
|
||||
golang.org/x/sys v0.15.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
|
||||
@@ -12,15 +12,14 @@ import (
|
||||
runesconfig "github.com/gaze-network/indexer-network/modules/runes/config"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclient"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclientv2"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
var (
|
||||
isInit bool
|
||||
mu sync.Mutex
|
||||
config = &Config{
|
||||
configOnce sync.Once
|
||||
config = &Config{
|
||||
Logger: logger.Config{
|
||||
Output: "TEXT",
|
||||
},
|
||||
@@ -33,12 +32,17 @@ var (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Logger logger.Config `mapstructure:"logger"`
|
||||
BitcoinNode BitcoinNodeClient `mapstructure:"bitcoin_node"`
|
||||
Network common.Network `mapstructure:"network"`
|
||||
HTTPServer HTTPServerConfig `mapstructure:"http_server"`
|
||||
Modules Modules `mapstructure:"modules"`
|
||||
Reporting reportingclient.Config `mapstructure:"reporting"`
|
||||
Logger logger.Config `mapstructure:"logger"`
|
||||
BitcoinNode BitcoinNodeClient `mapstructure:"bitcoin_node"`
|
||||
Network common.Network `mapstructure:"network"`
|
||||
HTTPServer HTTPServerConfig `mapstructure:"http_server"`
|
||||
Modules Modules `mapstructure:"modules"`
|
||||
Reporting reportingclientv2.Config `mapstructure:"reporting"`
|
||||
NodeKey NodeKey `mapstructure:"node_key"`
|
||||
}
|
||||
|
||||
type NodeKey struct {
|
||||
Path string `mapstructure:"path"`
|
||||
}
|
||||
|
||||
type BitcoinNodeClient struct {
|
||||
@@ -59,38 +63,6 @@ type HTTPServerConfig struct {
|
||||
|
||||
// Parse parse the configuration from environment variables
|
||||
func Parse(configFile ...string) Config {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return parse(configFile...)
|
||||
}
|
||||
|
||||
// Load returns the loaded configuration
|
||||
func Load() Config {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if isInit {
|
||||
return *config
|
||||
}
|
||||
return parse()
|
||||
}
|
||||
|
||||
// BindPFlag binds a specific key to a pflag (as used by cobra).
|
||||
// Example (where serverCmd is a Cobra instance):
|
||||
//
|
||||
// serverCmd.Flags().Int("port", 1138, "Port to run Application server on")
|
||||
// Viper.BindPFlag("port", serverCmd.Flags().Lookup("port"))
|
||||
func BindPFlag(key string, flag *pflag.Flag) {
|
||||
if err := viper.BindPFlag(key, flag); err != nil {
|
||||
logger.Panic("Something went wrong, failed to bind flag for config", slog.String("package", "config"), slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// SetDefault sets the default value for this key.
|
||||
// SetDefault is case-insensitive for a key.
|
||||
// Default only used when no value is provided by the user via flag, config or ENV.
|
||||
func SetDefault(key string, value any) { viper.SetDefault(key, value) }
|
||||
|
||||
func parse(configFile ...string) Config {
|
||||
ctx := logger.WithContext(context.Background(), slog.String("package", "config"))
|
||||
|
||||
if len(configFile) > 0 && configFile[0] != "" {
|
||||
@@ -105,16 +77,39 @@ func parse(configFile ...string) Config {
|
||||
if err := viper.ReadInConfig(); err != nil {
|
||||
var errNotfound viper.ConfigFileNotFoundError
|
||||
if errors.As(err, &errNotfound) {
|
||||
logger.WarnContext(ctx, "Config file not found, use default config value", slogx.Error(err))
|
||||
logger.WarnContext(ctx, "config file not found, use default value", slogx.Error(err))
|
||||
} else {
|
||||
logger.PanicContext(ctx, "Invalid config file", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "invalid config file", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := viper.Unmarshal(&config); err != nil {
|
||||
logger.PanicContext(ctx, "Something went wrong, failed to unmarshal config", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "failed to unmarshal config", slogx.Error(err))
|
||||
}
|
||||
|
||||
isInit = true
|
||||
return *config
|
||||
}
|
||||
|
||||
// Load returns the loaded configuration
|
||||
func Load() Config {
|
||||
configOnce.Do(func() {
|
||||
_ = Parse()
|
||||
})
|
||||
return *config
|
||||
}
|
||||
|
||||
// BindPFlag binds a specific key to a pflag (as used by cobra).
|
||||
// Example (where serverCmd is a Cobra instance):
|
||||
//
|
||||
// serverCmd.Flags().Int("port", 1138, "Port to run Application server on")
|
||||
// Viper.BindPFlag("port", serverCmd.Flags().Lookup("port"))
|
||||
func BindPFlag(key string, flag *pflag.Flag) {
|
||||
if err := viper.BindPFlag(key, flag); err != nil {
|
||||
logger.Panic("Failed to bind pflag for config", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// SetDefault sets the default value for this key.
|
||||
// SetDefault is case-insensitive for a key.
|
||||
// Default only used when no value is provided by the user via flag, config or ENV.
|
||||
func SetDefault(key string, value any) { viper.SetDefault(key, value) }
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
|
||||
"github.com/Cleverse/go-utilities/utils"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/common/errs"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
@@ -63,7 +62,7 @@ func NewPool(ctx context.Context, conf Config) (*pgxpool.Pool, error) {
|
||||
// Prepare connection pool configuration
|
||||
connConfig, err := pgxpool.ParseConfig(conf.String())
|
||||
if err != nil {
|
||||
return nil, errors.Join(errs.InvalidArgument, errors.Wrap(err, "failed while parse config"))
|
||||
return nil, errors.Wrap(err, "failed to parse config to create a new connection pool")
|
||||
}
|
||||
connConfig.MaxConns = utils.Default(conf.MaxConns, DefaultMaxConns)
|
||||
connConfig.MinConns = utils.Default(conf.MinConns, DefaultMinConns)
|
||||
|
||||
@@ -37,13 +37,13 @@ func NewClientDatabase(bitcoinDg datagateway.BitcoinDataGateway) *ClientDatabase
|
||||
}
|
||||
}
|
||||
|
||||
func (d ClientDatabase) Name() string {
|
||||
return "bitcoin_database"
|
||||
func (c ClientDatabase) Name() string {
|
||||
return "BitcoinDatabase"
|
||||
}
|
||||
|
||||
func (d *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
|
||||
func (c *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
|
||||
ch := make(chan []*types.Block)
|
||||
subscription, err := d.FetchAsync(ctx, from, to, ch)
|
||||
subscription, err := c.FetchAsync(ctx, from, to, ch)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
@@ -73,13 +73,8 @@ func (d *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Bl
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
|
||||
ctx = logger.WithContext(ctx,
|
||||
slogx.String("package", "datasources"),
|
||||
slogx.String("datasource", d.Name()),
|
||||
)
|
||||
|
||||
from, to, skip, err := d.prepareRange(ctx, from, to)
|
||||
func (c *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
|
||||
from, to, skip, err := c.prepareRange(ctx, from, to)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to prepare fetch range")
|
||||
}
|
||||
@@ -134,10 +129,10 @@ func (d *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan
|
||||
if errors.Is(err, errs.Closed) {
|
||||
return
|
||||
}
|
||||
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
|
||||
logger.WarnContext(ctx, "failed while dispatch block",
|
||||
slogx.Error(err),
|
||||
slogx.Int64("start", data[0].Header.Height),
|
||||
slogx.Int64("end", data[len(data)-1].Header.Height),
|
||||
slogx.Error(err),
|
||||
)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
@@ -164,26 +159,16 @@ func (d *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan
|
||||
continue
|
||||
}
|
||||
stream.Go(func() []*types.Block {
|
||||
startAt := time.Now()
|
||||
defer func() {
|
||||
logger.DebugContext(ctx, "Fetched chunk of blocks from Bitcoin node",
|
||||
slogx.Int("total_blocks", len(chunk)),
|
||||
slogx.Int64("from", chunk[0]),
|
||||
slogx.Int64("to", chunk[len(chunk)-1]),
|
||||
slogx.Duration("duration", time.Since(startAt)),
|
||||
)
|
||||
}()
|
||||
|
||||
fromHeight, toHeight := chunk[0], chunk[len(chunk)-1]
|
||||
blocks, err := d.bitcoinDg.GetBlocksByHeightRange(ctx, fromHeight, toHeight)
|
||||
blocks, err := c.bitcoinDg.GetBlocksByHeightRange(ctx, fromHeight, toHeight)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Can't get block data from Bitcoin database",
|
||||
logger.ErrorContext(ctx, "failed to get blocks",
|
||||
slogx.Error(err),
|
||||
slogx.Int64("from", fromHeight),
|
||||
slogx.Int64("to", toHeight),
|
||||
slogx.Int64("from_height", fromHeight),
|
||||
slogx.Int64("to_height", toHeight),
|
||||
)
|
||||
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get blocks: from_height: %d, to_height: %d", fromHeight, toHeight)); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package bitcoin
|
||||
|
||||
import (
|
||||
"github.com/Cleverse/go-utilities/utils"
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/gaze-network/indexer-network/common"
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
)
|
||||
@@ -12,15 +10,8 @@ const (
|
||||
DBVersion = 1
|
||||
)
|
||||
|
||||
var (
|
||||
// defaultCurrentBlockHeight is the default value for the current block height for first time indexing
|
||||
defaultCurrentBlock = types.BlockHeader{
|
||||
Hash: common.ZeroHash,
|
||||
Height: -1,
|
||||
}
|
||||
|
||||
lastV1Block = types.BlockHeader{
|
||||
Hash: *utils.Must(chainhash.NewHashFromStr("00000000000001aa077d7aa84c532a4d69bdbff519609d1da0835261b7a74eb6")),
|
||||
Height: 227835,
|
||||
}
|
||||
)
|
||||
// DefaultCurrentBlockHeight is the default value for the current block height for first time indexing
|
||||
var defaultCurrentBlock = types.BlockHeader{
|
||||
Hash: common.ZeroHash,
|
||||
Height: -1,
|
||||
}
|
||||
|
||||
@@ -2,8 +2,8 @@ BEGIN;
|
||||
|
||||
-- DROP INDEX
|
||||
DROP INDEX IF EXISTS bitcoin_blocks_block_hash_idx;
|
||||
DROP INDEX IF EXISTS bitcoin_transactions_tx_hash_idx;
|
||||
DROP INDEX IF EXISTS bitcoin_transactions_block_hash_idx;
|
||||
DROP INDEX IF EXISTS bitcoin_transactions_block_height_idx;
|
||||
DROP INDEX IF EXISTS bitcoin_transaction_txouts_pkscript_idx;
|
||||
DROP INDEX IF EXISTS bitcoin_transaction_txins_prevout_idx;
|
||||
|
||||
|
||||
@@ -32,17 +32,16 @@ CREATE TABLE IF NOT EXISTS "bitcoin_blocks" (
|
||||
CREATE INDEX IF NOT EXISTS bitcoin_blocks_block_hash_idx ON "bitcoin_blocks" USING HASH ("block_hash");
|
||||
|
||||
CREATE TABLE IF NOT EXISTS "bitcoin_transactions" (
|
||||
"tx_hash" TEXT NOT NULL, -- can't use as primary key because block v1 has duplicate tx hashes (coinbase tx). See: https://github.com/bitcoin/bitcoin/commit/a206b0ea12eb4606b93323268fc81a4f1f952531
|
||||
"tx_hash" TEXT NOT NULL PRIMARY KEY,
|
||||
"version" INT NOT NULL,
|
||||
"locktime" BIGINT NOT NULL,
|
||||
"block_height" INT NOT NULL,
|
||||
"block_hash" TEXT NOT NULL,
|
||||
"idx" INT NOT NULL,
|
||||
PRIMARY KEY ("block_height", "idx")
|
||||
"idx" INT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS bitcoin_transactions_tx_hash_idx ON "bitcoin_transactions" USING HASH ("tx_hash");
|
||||
CREATE INDEX IF NOT EXISTS bitcoin_transactions_block_hash_idx ON "bitcoin_transactions" USING HASH ("block_hash");
|
||||
CREATE INDEX IF NOT EXISTS bitcoin_transactions_block_height_idx ON "bitcoin_transactions" USING BTREE ("block_height");
|
||||
CREATE INDEX IF NOT EXISTS bitcoin_transactions_block_hash_idx ON "bitcoin_transactions" USING BTREE ("block_hash");
|
||||
|
||||
CREATE TABLE IF NOT EXISTS "bitcoin_transaction_txouts" (
|
||||
"tx_hash" TEXT NOT NULL,
|
||||
@@ -62,7 +61,7 @@ CREATE TABLE IF NOT EXISTS "bitcoin_transaction_txins" (
|
||||
"prevout_tx_idx" INT NOT NULL,
|
||||
"prevout_pkscript" TEXT NULL, -- Hex String, Can be NULL if the prevout is a coinbase transaction
|
||||
"scriptsig" TEXT NOT NULL, -- Hex String
|
||||
"witness" TEXT NOT NULL DEFAULT '', -- Hex String
|
||||
"witness" TEXT, -- Hex String
|
||||
"sequence" BIGINT NOT NULL,
|
||||
PRIMARY KEY ("tx_hash", "tx_idx")
|
||||
);
|
||||
|
||||
@@ -4,61 +4,21 @@ SELECT * FROM bitcoin_blocks ORDER BY block_height DESC LIMIT 1;
|
||||
-- name: InsertBlock :exec
|
||||
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce") VALUES ($1, $2, $3, $4, $5, $6, $7, $8);
|
||||
|
||||
-- name: BatchInsertBlocks :exec
|
||||
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce")
|
||||
VALUES (
|
||||
unnest(@block_height_arr::INT[]),
|
||||
unnest(@block_hash_arr::TEXT[]),
|
||||
unnest(@version_arr::INT[]),
|
||||
unnest(@merkle_root_arr::TEXT[]),
|
||||
unnest(@prev_block_hash_arr::TEXT[]),
|
||||
unnest(@timestamp_arr::TIMESTAMP WITH TIME ZONE[]), -- or use TIMESTAMPTZ
|
||||
unnest(@bits_arr::BIGINT[]),
|
||||
unnest(@nonce_arr::BIGINT[])
|
||||
);
|
||||
-- name: InsertTransaction :exec
|
||||
INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height","block_hash","idx") VALUES ($1, $2, $3, $4, $5, $6);
|
||||
|
||||
-- name: BatchInsertTransactions :exec
|
||||
INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height","block_hash","idx")
|
||||
VALUES (
|
||||
unnest(@tx_hash_arr::TEXT[]),
|
||||
unnest(@version_arr::INT[]),
|
||||
unnest(@locktime_arr::BIGINT[]),
|
||||
unnest(@block_height_arr::INT[]),
|
||||
unnest(@block_hash_arr::TEXT[]),
|
||||
unnest(@idx_arr::INT[])
|
||||
);
|
||||
-- name: InsertTransactionTxOut :exec
|
||||
INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value") VALUES ($1, $2, $3, $4);
|
||||
|
||||
-- name: BatchInsertTransactionTxIns :exec
|
||||
-- name: InsertTransactionTxIn :exec
|
||||
WITH update_txout AS (
|
||||
UPDATE "bitcoin_transaction_txouts"
|
||||
SET "is_spent" = true
|
||||
FROM (SELECT unnest(@prevout_tx_hash_arr::TEXT[]) as tx_hash, unnest(@prevout_tx_idx_arr::INT[]) as tx_idx) as txin
|
||||
WHERE "bitcoin_transaction_txouts"."tx_hash" = txin.tx_hash AND "bitcoin_transaction_txouts"."tx_idx" = txin.tx_idx AND "is_spent" = false
|
||||
RETURNING "bitcoin_transaction_txouts"."tx_hash", "bitcoin_transaction_txouts"."tx_idx", "pkscript"
|
||||
), prepare_insert AS (
|
||||
SELECT input.tx_hash, input.tx_idx, prevout_tx_hash, prevout_tx_idx, update_txout.pkscript as prevout_pkscript, scriptsig, witness, sequence
|
||||
FROM (
|
||||
SELECT
|
||||
unnest(@tx_hash_arr::TEXT[]) as tx_hash,
|
||||
unnest(@tx_idx_arr::INT[]) as tx_idx,
|
||||
unnest(@prevout_tx_hash_arr::TEXT[]) as prevout_tx_hash,
|
||||
unnest(@prevout_tx_idx_arr::INT[]) as prevout_tx_idx,
|
||||
unnest(@scriptsig_arr::TEXT[]) as scriptsig,
|
||||
unnest(@witness_arr::TEXT[]) as witness,
|
||||
unnest(@sequence_arr::INT[]) as sequence
|
||||
) input LEFT JOIN update_txout ON "update_txout"."tx_hash" = "input"."prevout_tx_hash" AND "update_txout"."tx_idx" = "input"."prevout_tx_idx"
|
||||
WHERE "tx_hash" = $3 AND "tx_idx" = $4 AND "is_spent" = false -- TODO: should throw an error if already spent
|
||||
RETURNING "pkscript"
|
||||
)
|
||||
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx", "prevout_pkscript","scriptsig","witness","sequence")
|
||||
SELECT "tx_hash", "tx_idx", "prevout_tx_hash", "prevout_tx_idx", "prevout_pkscript", "scriptsig", "witness", "sequence" FROM prepare_insert;
|
||||
|
||||
-- name: BatchInsertTransactionTxOuts :exec
|
||||
INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value")
|
||||
VALUES (
|
||||
unnest(@tx_hash_arr::TEXT[]),
|
||||
unnest(@tx_idx_arr::INT[]),
|
||||
unnest(@pkscript_arr::TEXT[]),
|
||||
unnest(@value_arr::BIGINT[])
|
||||
);
|
||||
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx","prevout_pkscript","scriptsig","witness","sequence")
|
||||
VALUES ($1, $2, $3, $4, (SELECT "pkscript" FROM update_txout), $5, $6, $7);
|
||||
|
||||
-- name: RevertData :exec
|
||||
WITH delete_tx AS (
|
||||
|
||||
@@ -13,7 +13,7 @@ type BitcoinDataGateway interface {
|
||||
}
|
||||
|
||||
type BitcoinWriterDataDataGateway interface {
|
||||
InsertBlocks(ctx context.Context, blocks []*types.Block) error
|
||||
InsertBlock(context.Context, *types.Block) error
|
||||
RevertBlocks(context.Context, int64) error
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
package bitcoin
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"log/slog"
|
||||
"slices"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/common/errs"
|
||||
@@ -9,6 +12,8 @@ import (
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
"github.com/gaze-network/indexer-network/internal/config"
|
||||
"github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
)
|
||||
|
||||
// Make sure to implement the BitcoinProcessor interface
|
||||
@@ -29,7 +34,7 @@ func NewProcessor(config config.Config, bitcoinDg datagateway.BitcoinDataGateway
|
||||
}
|
||||
|
||||
func (p Processor) Name() string {
|
||||
return "bitcoin"
|
||||
return "Bitcoin"
|
||||
}
|
||||
|
||||
func (p *Processor) Process(ctx context.Context, inputs []*types.Block) error {
|
||||
@@ -37,15 +42,36 @@ func (p *Processor) Process(ctx context.Context, inputs []*types.Block) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Process the given blocks before inserting to the database
|
||||
inputs, err := p.process(ctx, inputs)
|
||||
// Sort ASC by block height
|
||||
slices.SortFunc(inputs, func(t1, t2 *types.Block) int {
|
||||
return cmp.Compare(t1.Header.Height, t2.Header.Height)
|
||||
})
|
||||
|
||||
latestBlock, err := p.CurrentBlock(ctx)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
return errors.Wrap(err, "failed to get latest indexed block header")
|
||||
}
|
||||
|
||||
// check if the given blocks are continue from the latest indexed block
|
||||
// return an error to prevent inserting out-of-order blocks or duplicate blocks
|
||||
if inputs[0].Header.Height != latestBlock.Height+1 {
|
||||
return errors.New("given blocks are not continue from the latest indexed block")
|
||||
}
|
||||
|
||||
// check if the given blocks are in sequence and not missing any block
|
||||
for i := 1; i < len(inputs); i++ {
|
||||
if inputs[i].Header.Height != inputs[i-1].Header.Height+1 {
|
||||
return errors.New("given blocks are not in sequence")
|
||||
}
|
||||
}
|
||||
|
||||
// Insert blocks
|
||||
if err := p.bitcoinDg.InsertBlocks(ctx, inputs); err != nil {
|
||||
return errors.Wrapf(err, "error during insert blocks, from: %d, to: %d", inputs[0].Header.Height, inputs[len(inputs)-1].Header.Height)
|
||||
for _, b := range inputs {
|
||||
err := p.bitcoinDg.InsertBlock(ctx, b)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to insert block, height: %d, hash: %s", b.Header.Height, b.Header.Hash)
|
||||
}
|
||||
logger.InfoContext(ctx, "Block inserted", slog.Int64("height", b.Header.Height), slogx.Stringer("hash", b.Header.Hash))
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -71,12 +97,6 @@ func (p *Processor) GetIndexedBlock(ctx context.Context, height int64) (types.Bl
|
||||
}
|
||||
|
||||
func (p *Processor) RevertData(ctx context.Context, from int64) error {
|
||||
// to prevent remove txin/txout of duplicated coinbase transaction in the blocks 91842 and 91880
|
||||
// if you really want to revert the data before the block `227835`, you should reset the database and reindex the data instead.
|
||||
if from <= lastV1Block.Height {
|
||||
return errors.Wrapf(errs.InvalidArgument, "can't revert data before block version 2, height: %d", lastV1Block.Height)
|
||||
}
|
||||
|
||||
if err := p.bitcoinDg.RevertBlocks(ctx, from); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
@@ -1,91 +0,0 @@
|
||||
package bitcoin
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"slices"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
)
|
||||
|
||||
// process is a processing rules for the given blocks before inserting to the database
|
||||
//
|
||||
// this function will modify the given data directly.
|
||||
func (p *Processor) process(ctx context.Context, blocks []*types.Block) ([]*types.Block, error) {
|
||||
if len(blocks) == 0 {
|
||||
return blocks, nil
|
||||
}
|
||||
|
||||
// Sort ASC by block height
|
||||
slices.SortFunc(blocks, func(t1, t2 *types.Block) int {
|
||||
return cmp.Compare(t1.Header.Height, t2.Header.Height)
|
||||
})
|
||||
|
||||
if !p.isContinueFromLatestIndexedBlock(ctx, blocks[0]) {
|
||||
return nil, errors.New("given blocks are not continue from the latest indexed block")
|
||||
}
|
||||
|
||||
if !p.isBlocksSequential(blocks) {
|
||||
return nil, errors.New("given blocks are not in sequence")
|
||||
}
|
||||
|
||||
p.removeDuplicateCoinbaseTxInputsOutputs(blocks)
|
||||
|
||||
return blocks, nil
|
||||
}
|
||||
|
||||
// check if the given blocks are continue from the latest indexed block
|
||||
// to prevent inserting out-of-order blocks or duplicate blocks
|
||||
func (p *Processor) isBlocksSequential(blocks []*types.Block) bool {
|
||||
if len(blocks) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
for i, block := range blocks {
|
||||
if i == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if block.Header.Height != blocks[i-1].Header.Height+1 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// check if the given blocks are continue from the latest indexed block
|
||||
// to prevent inserting out-of-order blocks or duplicate blocks
|
||||
func (p *Processor) isContinueFromLatestIndexedBlock(ctx context.Context, block *types.Block) bool {
|
||||
latestBlock, err := p.CurrentBlock(ctx)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return block.Header.Height == latestBlock.Height+1
|
||||
}
|
||||
|
||||
// there 2 coinbase transaction that are duplicated in the blocks 91842 and 91880.
|
||||
// if the given block version is v1 and height is `91842` or `91880`,
|
||||
// then remove transaction inputs/outputs to prevent duplicate txin/txout error when inserting to the database.
|
||||
//
|
||||
// Theses duplicated coinbase transactions are having the same transaction input/output and
|
||||
// utxo from these 2 duplicated coinbase txs can redeem only once. so, it's safe to remove them and can
|
||||
// use inputs/outputs from the previous block.
|
||||
//
|
||||
// Duplicate Coinbase Transactions:
|
||||
// - `454279874213763724535987336644243549a273058910332236515429488599` in blocks 91812, 91842
|
||||
// - `e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468` in blocks 91722, 91880
|
||||
//
|
||||
// This function will modify the given data directly.
|
||||
func (p *Processor) removeDuplicateCoinbaseTxInputsOutputs(blocks []*types.Block) {
|
||||
for _, block := range blocks {
|
||||
header := block.Header
|
||||
if header.Version == 1 && (header.Height == 91842 || header.Height == 91880) {
|
||||
// remove transaction inputs/outputs from coinbase transaction (first transaction)
|
||||
block.Transactions[0].TxIn = nil
|
||||
block.Transactions[0].TxOut = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,144 +0,0 @@
|
||||
package bitcoin
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestDuplicateCoinbaseTxHashHandling(t *testing.T) {
|
||||
processor := Processor{}
|
||||
generator := func() []*types.Block {
|
||||
return []*types.Block{
|
||||
{
|
||||
Header: types.BlockHeader{Height: 91842, Version: 1},
|
||||
Transactions: []*types.Transaction{
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Header: types.BlockHeader{Height: 91880, Version: 1},
|
||||
Transactions: []*types.Transaction{
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("all_duplicated_txs", func(t *testing.T) {
|
||||
blocks := generator()
|
||||
processor.removeDuplicateCoinbaseTxInputsOutputs(blocks)
|
||||
|
||||
assert.Len(t, blocks, 2, "should not remove any blocks")
|
||||
for _, block := range blocks {
|
||||
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
|
||||
assert.Len(t, block.Transactions[0].TxIn, 0, "should remove tx inputs from coinbase transaction")
|
||||
assert.Len(t, block.Transactions[0].TxOut, 0, "should remove tx outputs from coinbase transaction")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("not_duplicated_txs", func(t *testing.T) {
|
||||
blocks := []*types.Block{
|
||||
{
|
||||
Header: types.BlockHeader{Height: 91812, Version: 1},
|
||||
Transactions: []*types.Transaction{
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Header: types.BlockHeader{Height: 91722, Version: 1},
|
||||
Transactions: []*types.Transaction{
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
processor.removeDuplicateCoinbaseTxInputsOutputs(blocks)
|
||||
|
||||
assert.Len(t, blocks, 2, "should not remove any blocks")
|
||||
for _, block := range blocks {
|
||||
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
|
||||
assert.Len(t, block.Transactions[0].TxIn, 4, "should not remove tx inputs from coinbase transaction")
|
||||
assert.Len(t, block.Transactions[0].TxOut, 4, "should not remove tx outputs from coinbase transaction")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("mixed", func(t *testing.T) {
|
||||
blocks := []*types.Block{
|
||||
{
|
||||
Header: types.BlockHeader{Height: 91812, Version: 1},
|
||||
Transactions: []*types.Transaction{
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
blocks = append(blocks, generator()...)
|
||||
blocks = append(blocks, &types.Block{
|
||||
Header: types.BlockHeader{Height: 91722, Version: 1},
|
||||
Transactions: []*types.Transaction{
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
{
|
||||
TxIn: []*types.TxIn{{}, {}, {}, {}},
|
||||
TxOut: []*types.TxOut{{}, {}, {}, {}},
|
||||
},
|
||||
},
|
||||
})
|
||||
processor.removeDuplicateCoinbaseTxInputsOutputs(blocks)
|
||||
|
||||
assert.Len(t, blocks, 4, "should not remove any blocks")
|
||||
|
||||
// only 2nd and 3rd blocks should be modified
|
||||
for i, block := range blocks {
|
||||
t.Run(fmt.Sprint(i), func(t *testing.T) {
|
||||
if i == 1 || i == 2 {
|
||||
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
|
||||
assert.Len(t, block.Transactions[0].TxIn, 0, "should remove tx inputs from coinbase transaction")
|
||||
assert.Len(t, block.Transactions[0].TxOut, 0, "should remove tx outputs from coinbase transaction")
|
||||
} else {
|
||||
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
|
||||
assert.Lenf(t, block.Transactions[0].TxIn, 4, "should not remove tx inputs from coinbase transaction")
|
||||
assert.Len(t, block.Transactions[0].TxOut, 4, "should not remove tx outputs from coinbase transaction")
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -28,12 +28,8 @@ func (r *Repository) GetLatestBlockHeader(ctx context.Context) (types.BlockHeade
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (r *Repository) InsertBlocks(ctx context.Context, blocks []*types.Block) error {
|
||||
if len(blocks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
blockParams, txParams, txoutParams, txinParams := mapBlocksTypeToParams(blocks)
|
||||
func (r *Repository) InsertBlock(ctx context.Context, block *types.Block) error {
|
||||
blockParams, txParams, txoutParams, txinParams := mapBlockTypeToParams(block)
|
||||
|
||||
tx, err := r.db.Begin(ctx)
|
||||
if err != nil {
|
||||
@@ -43,22 +39,28 @@ func (r *Repository) InsertBlocks(ctx context.Context, blocks []*types.Block) er
|
||||
|
||||
queries := r.queries.WithTx(tx)
|
||||
|
||||
if err := queries.BatchInsertBlocks(ctx, blockParams); err != nil {
|
||||
return errors.Wrap(err, "failed to batch insert block headers")
|
||||
if err := queries.InsertBlock(ctx, blockParams); err != nil {
|
||||
return errors.Wrapf(err, "failed to insert block, height: %d, hash: %s", blockParams.BlockHeight, blockParams.BlockHash)
|
||||
}
|
||||
|
||||
if err := queries.BatchInsertTransactions(ctx, txParams); err != nil {
|
||||
return errors.Wrap(err, "failed to batch insert transactions")
|
||||
for _, params := range txParams {
|
||||
if err := queries.InsertTransaction(ctx, params); err != nil {
|
||||
return errors.Wrapf(err, "failed to insert transaction, hash: %s", params.TxHash)
|
||||
}
|
||||
}
|
||||
|
||||
// Should insert txout first, then txin
|
||||
// Because txin references txout
|
||||
if err := queries.BatchInsertTransactionTxOuts(ctx, txoutParams); err != nil {
|
||||
return errors.Wrap(err, "failed to batch insert transaction txins")
|
||||
for _, params := range txoutParams {
|
||||
if err := queries.InsertTransactionTxOut(ctx, params); err != nil {
|
||||
return errors.Wrapf(err, "failed to insert transaction txout, %v:%v", params.TxHash, params.TxIdx)
|
||||
}
|
||||
}
|
||||
|
||||
if err := queries.BatchInsertTransactionTxIns(ctx, txinParams); err != nil {
|
||||
return errors.Wrap(err, "failed to batch insert transaction txins")
|
||||
for _, params := range txinParams {
|
||||
if err := queries.InsertTransactionTxIn(ctx, params); err != nil {
|
||||
return errors.Wrapf(err, "failed to insert transaction txin, %v:%v", params.TxHash, params.TxIdx)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(ctx); err != nil {
|
||||
|
||||
@@ -11,152 +11,6 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const batchInsertBlocks = `-- name: BatchInsertBlocks :exec
|
||||
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce")
|
||||
VALUES (
|
||||
unnest($1::INT[]),
|
||||
unnest($2::TEXT[]),
|
||||
unnest($3::INT[]),
|
||||
unnest($4::TEXT[]),
|
||||
unnest($5::TEXT[]),
|
||||
unnest($6::TIMESTAMP WITH TIME ZONE[]), -- or use TIMESTAMPTZ
|
||||
unnest($7::BIGINT[]),
|
||||
unnest($8::BIGINT[])
|
||||
)
|
||||
`
|
||||
|
||||
type BatchInsertBlocksParams struct {
|
||||
BlockHeightArr []int32
|
||||
BlockHashArr []string
|
||||
VersionArr []int32
|
||||
MerkleRootArr []string
|
||||
PrevBlockHashArr []string
|
||||
TimestampArr []pgtype.Timestamptz
|
||||
BitsArr []int64
|
||||
NonceArr []int64
|
||||
}
|
||||
|
||||
func (q *Queries) BatchInsertBlocks(ctx context.Context, arg BatchInsertBlocksParams) error {
|
||||
_, err := q.db.Exec(ctx, batchInsertBlocks,
|
||||
arg.BlockHeightArr,
|
||||
arg.BlockHashArr,
|
||||
arg.VersionArr,
|
||||
arg.MerkleRootArr,
|
||||
arg.PrevBlockHashArr,
|
||||
arg.TimestampArr,
|
||||
arg.BitsArr,
|
||||
arg.NonceArr,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const batchInsertTransactionTxIns = `-- name: BatchInsertTransactionTxIns :exec
|
||||
WITH update_txout AS (
|
||||
UPDATE "bitcoin_transaction_txouts"
|
||||
SET "is_spent" = true
|
||||
FROM (SELECT unnest($1::TEXT[]) as tx_hash, unnest($2::INT[]) as tx_idx) as txin
|
||||
WHERE "bitcoin_transaction_txouts"."tx_hash" = txin.tx_hash AND "bitcoin_transaction_txouts"."tx_idx" = txin.tx_idx AND "is_spent" = false
|
||||
RETURNING "bitcoin_transaction_txouts"."tx_hash", "bitcoin_transaction_txouts"."tx_idx", "pkscript"
|
||||
), prepare_insert AS (
|
||||
SELECT input.tx_hash, input.tx_idx, prevout_tx_hash, prevout_tx_idx, update_txout.pkscript as prevout_pkscript, scriptsig, witness, sequence
|
||||
FROM (
|
||||
SELECT
|
||||
unnest($3::TEXT[]) as tx_hash,
|
||||
unnest($4::INT[]) as tx_idx,
|
||||
unnest($1::TEXT[]) as prevout_tx_hash,
|
||||
unnest($2::INT[]) as prevout_tx_idx,
|
||||
unnest($5::TEXT[]) as scriptsig,
|
||||
unnest($6::TEXT[]) as witness,
|
||||
unnest($7::INT[]) as sequence
|
||||
) input LEFT JOIN update_txout ON "update_txout"."tx_hash" = "input"."prevout_tx_hash" AND "update_txout"."tx_idx" = "input"."prevout_tx_idx"
|
||||
)
|
||||
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx", "prevout_pkscript","scriptsig","witness","sequence")
|
||||
SELECT "tx_hash", "tx_idx", "prevout_tx_hash", "prevout_tx_idx", "prevout_pkscript", "scriptsig", "witness", "sequence" FROM prepare_insert
|
||||
`
|
||||
|
||||
type BatchInsertTransactionTxInsParams struct {
|
||||
PrevoutTxHashArr []string
|
||||
PrevoutTxIdxArr []int32
|
||||
TxHashArr []string
|
||||
TxIdxArr []int32
|
||||
ScriptsigArr []string
|
||||
WitnessArr []string
|
||||
SequenceArr []int32
|
||||
}
|
||||
|
||||
func (q *Queries) BatchInsertTransactionTxIns(ctx context.Context, arg BatchInsertTransactionTxInsParams) error {
|
||||
_, err := q.db.Exec(ctx, batchInsertTransactionTxIns,
|
||||
arg.PrevoutTxHashArr,
|
||||
arg.PrevoutTxIdxArr,
|
||||
arg.TxHashArr,
|
||||
arg.TxIdxArr,
|
||||
arg.ScriptsigArr,
|
||||
arg.WitnessArr,
|
||||
arg.SequenceArr,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const batchInsertTransactionTxOuts = `-- name: BatchInsertTransactionTxOuts :exec
|
||||
INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value")
|
||||
VALUES (
|
||||
unnest($1::TEXT[]),
|
||||
unnest($2::INT[]),
|
||||
unnest($3::TEXT[]),
|
||||
unnest($4::BIGINT[])
|
||||
)
|
||||
`
|
||||
|
||||
type BatchInsertTransactionTxOutsParams struct {
|
||||
TxHashArr []string
|
||||
TxIdxArr []int32
|
||||
PkscriptArr []string
|
||||
ValueArr []int64
|
||||
}
|
||||
|
||||
func (q *Queries) BatchInsertTransactionTxOuts(ctx context.Context, arg BatchInsertTransactionTxOutsParams) error {
|
||||
_, err := q.db.Exec(ctx, batchInsertTransactionTxOuts,
|
||||
arg.TxHashArr,
|
||||
arg.TxIdxArr,
|
||||
arg.PkscriptArr,
|
||||
arg.ValueArr,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const batchInsertTransactions = `-- name: BatchInsertTransactions :exec
|
||||
INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height","block_hash","idx")
|
||||
VALUES (
|
||||
unnest($1::TEXT[]),
|
||||
unnest($2::INT[]),
|
||||
unnest($3::BIGINT[]),
|
||||
unnest($4::INT[]),
|
||||
unnest($5::TEXT[]),
|
||||
unnest($6::INT[])
|
||||
)
|
||||
`
|
||||
|
||||
type BatchInsertTransactionsParams struct {
|
||||
TxHashArr []string
|
||||
VersionArr []int32
|
||||
LocktimeArr []int64
|
||||
BlockHeightArr []int32
|
||||
BlockHashArr []string
|
||||
IdxArr []int32
|
||||
}
|
||||
|
||||
func (q *Queries) BatchInsertTransactions(ctx context.Context, arg BatchInsertTransactionsParams) error {
|
||||
_, err := q.db.Exec(ctx, batchInsertTransactions,
|
||||
arg.TxHashArr,
|
||||
arg.VersionArr,
|
||||
arg.LocktimeArr,
|
||||
arg.BlockHeightArr,
|
||||
arg.BlockHashArr,
|
||||
arg.IdxArr,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const getBlockByHeight = `-- name: GetBlockByHeight :one
|
||||
SELECT block_height, block_hash, version, merkle_root, prev_block_hash, timestamp, bits, nonce FROM bitcoin_blocks WHERE block_height = $1
|
||||
`
|
||||
@@ -381,6 +235,86 @@ func (q *Queries) InsertBlock(ctx context.Context, arg InsertBlockParams) error
|
||||
return err
|
||||
}
|
||||
|
||||
const insertTransaction = `-- name: InsertTransaction :exec
|
||||
INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height","block_hash","idx") VALUES ($1, $2, $3, $4, $5, $6)
|
||||
`
|
||||
|
||||
type InsertTransactionParams struct {
|
||||
TxHash string
|
||||
Version int32
|
||||
Locktime int64
|
||||
BlockHeight int32
|
||||
BlockHash string
|
||||
Idx int32
|
||||
}
|
||||
|
||||
func (q *Queries) InsertTransaction(ctx context.Context, arg InsertTransactionParams) error {
|
||||
_, err := q.db.Exec(ctx, insertTransaction,
|
||||
arg.TxHash,
|
||||
arg.Version,
|
||||
arg.Locktime,
|
||||
arg.BlockHeight,
|
||||
arg.BlockHash,
|
||||
arg.Idx,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const insertTransactionTxIn = `-- name: InsertTransactionTxIn :exec
|
||||
WITH update_txout AS (
|
||||
UPDATE "bitcoin_transaction_txouts"
|
||||
SET "is_spent" = true
|
||||
WHERE "tx_hash" = $3 AND "tx_idx" = $4 AND "is_spent" = false -- TODO: should throw an error if already spent
|
||||
RETURNING "pkscript"
|
||||
)
|
||||
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx","prevout_pkscript","scriptsig","witness","sequence")
|
||||
VALUES ($1, $2, $3, $4, (SELECT "pkscript" FROM update_txout), $5, $6, $7)
|
||||
`
|
||||
|
||||
type InsertTransactionTxInParams struct {
|
||||
TxHash string
|
||||
TxIdx int32
|
||||
PrevoutTxHash string
|
||||
PrevoutTxIdx int32
|
||||
Scriptsig string
|
||||
Witness pgtype.Text
|
||||
Sequence int64
|
||||
}
|
||||
|
||||
func (q *Queries) InsertTransactionTxIn(ctx context.Context, arg InsertTransactionTxInParams) error {
|
||||
_, err := q.db.Exec(ctx, insertTransactionTxIn,
|
||||
arg.TxHash,
|
||||
arg.TxIdx,
|
||||
arg.PrevoutTxHash,
|
||||
arg.PrevoutTxIdx,
|
||||
arg.Scriptsig,
|
||||
arg.Witness,
|
||||
arg.Sequence,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const insertTransactionTxOut = `-- name: InsertTransactionTxOut :exec
|
||||
INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value") VALUES ($1, $2, $3, $4)
|
||||
`
|
||||
|
||||
type InsertTransactionTxOutParams struct {
|
||||
TxHash string
|
||||
TxIdx int32
|
||||
Pkscript string
|
||||
Value int64
|
||||
}
|
||||
|
||||
func (q *Queries) InsertTransactionTxOut(ctx context.Context, arg InsertTransactionTxOutParams) error {
|
||||
_, err := q.db.Exec(ctx, insertTransactionTxOut,
|
||||
arg.TxHash,
|
||||
arg.TxIdx,
|
||||
arg.Pkscript,
|
||||
arg.Value,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const revertData = `-- name: RevertData :exec
|
||||
WITH delete_tx AS (
|
||||
DELETE FROM "bitcoin_transactions" WHERE "block_height" >= $1
|
||||
|
||||
@@ -48,7 +48,7 @@ type BitcoinTransactionTxin struct {
|
||||
PrevoutTxIdx int32
|
||||
PrevoutPkscript pgtype.Text
|
||||
Scriptsig string
|
||||
Witness string
|
||||
Witness pgtype.Text
|
||||
Sequence int64
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,22 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
func mapBlockHeaderTypeToModel(src types.BlockHeader) gen.BitcoinBlock {
|
||||
return gen.BitcoinBlock{
|
||||
BlockHeight: int32(src.Height),
|
||||
BlockHash: src.Hash.String(),
|
||||
Version: src.Version,
|
||||
MerkleRoot: src.MerkleRoot.String(),
|
||||
PrevBlockHash: src.PrevBlock.String(),
|
||||
Timestamp: pgtype.Timestamptz{
|
||||
Time: src.Timestamp,
|
||||
Valid: true,
|
||||
},
|
||||
Bits: int64(src.Bits),
|
||||
Nonce: int64(src.Nonce),
|
||||
}
|
||||
}
|
||||
|
||||
func mapBlockHeaderModelToType(src gen.BitcoinBlock) (types.BlockHeader, error) {
|
||||
hash, err := chainhash.NewHashFromStr(src.BlockHash)
|
||||
if err != nil {
|
||||
@@ -39,93 +55,63 @@ func mapBlockHeaderModelToType(src gen.BitcoinBlock) (types.BlockHeader, error)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func mapBlocksTypeToParams(src []*types.Block) (gen.BatchInsertBlocksParams, gen.BatchInsertTransactionsParams, gen.BatchInsertTransactionTxOutsParams, gen.BatchInsertTransactionTxInsParams) {
|
||||
blocks := gen.BatchInsertBlocksParams{
|
||||
BlockHeightArr: make([]int32, 0, len(src)),
|
||||
BlockHashArr: make([]string, 0, len(src)),
|
||||
VersionArr: make([]int32, 0, len(src)),
|
||||
MerkleRootArr: make([]string, 0, len(src)),
|
||||
PrevBlockHashArr: make([]string, 0, len(src)),
|
||||
TimestampArr: make([]pgtype.Timestamptz, 0, len(src)),
|
||||
BitsArr: make([]int64, 0, len(src)),
|
||||
NonceArr: make([]int64, 0, len(src)),
|
||||
}
|
||||
txs := gen.BatchInsertTransactionsParams{
|
||||
TxHashArr: []string{},
|
||||
VersionArr: []int32{},
|
||||
LocktimeArr: []int64{},
|
||||
BlockHeightArr: []int32{},
|
||||
BlockHashArr: []string{},
|
||||
IdxArr: []int32{},
|
||||
}
|
||||
txouts := gen.BatchInsertTransactionTxOutsParams{
|
||||
TxHashArr: []string{},
|
||||
TxIdxArr: []int32{},
|
||||
PkscriptArr: []string{},
|
||||
ValueArr: []int64{},
|
||||
}
|
||||
txins := gen.BatchInsertTransactionTxInsParams{
|
||||
PrevoutTxHashArr: []string{},
|
||||
PrevoutTxIdxArr: []int32{},
|
||||
TxHashArr: []string{},
|
||||
TxIdxArr: []int32{},
|
||||
ScriptsigArr: []string{},
|
||||
WitnessArr: []string{},
|
||||
SequenceArr: []int32{},
|
||||
}
|
||||
|
||||
for _, block := range src {
|
||||
blockHash := block.Header.Hash.String()
|
||||
|
||||
// Batch insert blocks
|
||||
blocks.BlockHeightArr = append(blocks.BlockHeightArr, int32(block.Header.Height))
|
||||
blocks.BlockHashArr = append(blocks.BlockHashArr, blockHash)
|
||||
blocks.VersionArr = append(blocks.VersionArr, block.Header.Version)
|
||||
blocks.MerkleRootArr = append(blocks.MerkleRootArr, block.Header.MerkleRoot.String())
|
||||
blocks.PrevBlockHashArr = append(blocks.PrevBlockHashArr, block.Header.PrevBlock.String())
|
||||
blocks.TimestampArr = append(blocks.TimestampArr, pgtype.Timestamptz{
|
||||
Time: block.Header.Timestamp,
|
||||
func mapBlockTypeToParams(src *types.Block) (gen.InsertBlockParams, []gen.InsertTransactionParams, []gen.InsertTransactionTxOutParams, []gen.InsertTransactionTxInParams) {
|
||||
txs := make([]gen.InsertTransactionParams, 0, len(src.Transactions))
|
||||
txouts := make([]gen.InsertTransactionTxOutParams, 0)
|
||||
txins := make([]gen.InsertTransactionTxInParams, 0)
|
||||
block := gen.InsertBlockParams{
|
||||
BlockHeight: int32(src.Header.Height),
|
||||
BlockHash: src.Header.Hash.String(),
|
||||
Version: src.Header.Version,
|
||||
MerkleRoot: src.Header.MerkleRoot.String(),
|
||||
PrevBlockHash: src.Header.PrevBlock.String(),
|
||||
Timestamp: pgtype.Timestamptz{
|
||||
Time: src.Header.Timestamp,
|
||||
Valid: true,
|
||||
})
|
||||
blocks.BitsArr = append(blocks.BitsArr, int64(block.Header.Bits))
|
||||
blocks.NonceArr = append(blocks.NonceArr, int64(block.Header.Nonce))
|
||||
},
|
||||
Bits: int64(src.Header.Bits),
|
||||
Nonce: int64(src.Header.Nonce),
|
||||
}
|
||||
for txIdx, srcTx := range src.Transactions {
|
||||
tx := gen.InsertTransactionParams{
|
||||
TxHash: srcTx.TxHash.String(),
|
||||
Version: srcTx.Version,
|
||||
Locktime: int64(srcTx.LockTime),
|
||||
BlockHeight: int32(src.Header.Height),
|
||||
BlockHash: src.Header.Hash.String(),
|
||||
Idx: int32(txIdx),
|
||||
}
|
||||
txs = append(txs, tx)
|
||||
|
||||
for txIdx, srcTx := range block.Transactions {
|
||||
txHash := srcTx.TxHash.String()
|
||||
|
||||
// Batch insert transactions
|
||||
txs.TxHashArr = append(txs.TxHashArr, txHash)
|
||||
txs.VersionArr = append(txs.VersionArr, srcTx.Version)
|
||||
txs.LocktimeArr = append(txs.LocktimeArr, int64(srcTx.LockTime))
|
||||
txs.BlockHeightArr = append(txs.BlockHeightArr, int32(block.Header.Height))
|
||||
txs.BlockHashArr = append(txs.BlockHashArr, blockHash)
|
||||
txs.IdxArr = append(txs.IdxArr, int32(txIdx))
|
||||
|
||||
// Batch insert txins
|
||||
for idx, txin := range srcTx.TxIn {
|
||||
var witness string
|
||||
if len(txin.Witness) > 0 {
|
||||
witness = btcutils.WitnessToString(txin.Witness)
|
||||
for idx, txin := range srcTx.TxIn {
|
||||
var witness pgtype.Text
|
||||
if len(txin.Witness) > 0 {
|
||||
witness = pgtype.Text{
|
||||
String: btcutils.WitnessToString(txin.Witness),
|
||||
Valid: true,
|
||||
}
|
||||
txins.TxHashArr = append(txins.TxHashArr, txHash)
|
||||
txins.TxIdxArr = append(txins.TxIdxArr, int32(idx))
|
||||
txins.PrevoutTxHashArr = append(txins.PrevoutTxHashArr, txin.PreviousOutTxHash.String())
|
||||
txins.PrevoutTxIdxArr = append(txins.PrevoutTxIdxArr, int32(txin.PreviousOutIndex))
|
||||
txins.ScriptsigArr = append(txins.ScriptsigArr, hex.EncodeToString(txin.SignatureScript))
|
||||
txins.WitnessArr = append(txins.WitnessArr, witness)
|
||||
txins.SequenceArr = append(txins.SequenceArr, int32(txin.Sequence))
|
||||
}
|
||||
txins = append(txins, gen.InsertTransactionTxInParams{
|
||||
TxHash: tx.TxHash,
|
||||
TxIdx: int32(idx),
|
||||
PrevoutTxHash: txin.PreviousOutTxHash.String(),
|
||||
PrevoutTxIdx: int32(txin.PreviousOutIndex),
|
||||
Scriptsig: hex.EncodeToString(txin.SignatureScript),
|
||||
Witness: witness,
|
||||
Sequence: int64(txin.Sequence),
|
||||
})
|
||||
}
|
||||
|
||||
// Batch insert txouts
|
||||
for idx, txout := range srcTx.TxOut {
|
||||
txouts.TxHashArr = append(txouts.TxHashArr, txHash)
|
||||
txouts.TxIdxArr = append(txouts.TxIdxArr, int32(idx))
|
||||
txouts.PkscriptArr = append(txouts.PkscriptArr, hex.EncodeToString(txout.PkScript))
|
||||
txouts.ValueArr = append(txouts.ValueArr, txout.Value)
|
||||
}
|
||||
for idx, txout := range srcTx.TxOut {
|
||||
txouts = append(txouts, gen.InsertTransactionTxOutParams{
|
||||
TxHash: tx.TxHash,
|
||||
TxIdx: int32(idx),
|
||||
Pkscript: hex.EncodeToString(txout.PkScript),
|
||||
Value: txout.Value,
|
||||
})
|
||||
}
|
||||
}
|
||||
return blocks, txs, txouts, txins
|
||||
return block, txs, txouts, txins
|
||||
}
|
||||
|
||||
func mapTransactionModelToType(src gen.BitcoinTransaction, txInModel []gen.BitcoinTransactionTxin, txOutModels []gen.BitcoinTransactionTxout) (types.Transaction, error) {
|
||||
@@ -160,9 +146,13 @@ func mapTransactionModelToType(src gen.BitcoinTransaction, txInModel []gen.Bitco
|
||||
return types.Transaction{}, errors.Wrap(err, "failed to parse prevout tx hash")
|
||||
}
|
||||
|
||||
witness, err := btcutils.WitnessFromString(txInModel.Witness)
|
||||
if err != nil {
|
||||
return types.Transaction{}, errors.Wrap(err, "failed to parse witness from hex string")
|
||||
var witness [][]byte
|
||||
if txInModel.Witness.Valid {
|
||||
w, err := btcutils.WitnessFromString(txInModel.Witness.String)
|
||||
if err != nil {
|
||||
return types.Transaction{}, errors.Wrap(err, "failed to parse witness from hex string")
|
||||
}
|
||||
witness = w
|
||||
}
|
||||
|
||||
txIns = append(txIns, &types.TxIn{
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
package httphandler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"slices"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/common/errs"
|
||||
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
|
||||
"github.com/gaze-network/indexer-network/modules/runes/runes"
|
||||
"github.com/gaze-network/uint128"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
@@ -83,7 +84,6 @@ type amountWithDecimal struct {
|
||||
type transaction struct {
|
||||
TxHash chainhash.Hash `json:"txHash"`
|
||||
BlockHeight uint64 `json:"blockHeight"`
|
||||
Index uint32 `json:"index"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Inputs []txInputOutput `json:"inputs"`
|
||||
Outputs []txInputOutput `json:"outputs"`
|
||||
@@ -116,6 +116,15 @@ func (h *HttpHandler) GetTransactions(ctx *fiber.Ctx) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
blockHeight := req.BlockHeight
|
||||
if blockHeight == 0 {
|
||||
blockHeader, err := h.usecase.GetLatestBlock(ctx.UserContext())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error during GetLatestBlock")
|
||||
}
|
||||
blockHeight = uint64(blockHeader.Height)
|
||||
}
|
||||
|
||||
var runeId runes.RuneId
|
||||
if req.Id != "" {
|
||||
var ok bool
|
||||
@@ -125,23 +134,68 @@ func (h *HttpHandler) GetTransactions(ctx *fiber.Ctx) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
blockHeight := req.BlockHeight
|
||||
// set blockHeight to the latest block height blockHeight, pkScript, and runeId are not provided
|
||||
if blockHeight == 0 && pkScript == nil && runeId == (runes.RuneId{}) {
|
||||
blockHeader, err := h.usecase.GetLatestBlock(ctx.UserContext())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error during GetLatestBlock")
|
||||
}
|
||||
blockHeight = uint64(blockHeader.Height)
|
||||
}
|
||||
|
||||
txs, err := h.usecase.GetRuneTransactions(ctx.UserContext(), pkScript, runeId, blockHeight)
|
||||
txs, err := h.usecase.GetTransactionsByHeight(ctx.UserContext(), blockHeight)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error during GetRuneTransactions")
|
||||
return errors.Wrap(err, "error during GetTransactionsByHeight")
|
||||
}
|
||||
|
||||
var allRuneIds []runes.RuneId
|
||||
filteredTxs := make([]*entity.RuneTransaction, 0)
|
||||
isTxContainPkScript := func(tx *entity.RuneTransaction) bool {
|
||||
for _, input := range tx.Inputs {
|
||||
if bytes.Equal(input.PkScript, pkScript) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for _, output := range tx.Outputs {
|
||||
if bytes.Equal(output.PkScript, pkScript) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
isTxContainRuneId := func(tx *entity.RuneTransaction) bool {
|
||||
for _, input := range tx.Inputs {
|
||||
if input.RuneId == runeId {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for _, output := range tx.Outputs {
|
||||
if output.RuneId == runeId {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for mintedRuneId := range tx.Mints {
|
||||
if mintedRuneId == runeId {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for burnedRuneId := range tx.Burns {
|
||||
if burnedRuneId == runeId {
|
||||
return true
|
||||
}
|
||||
}
|
||||
if tx.Runestone != nil {
|
||||
if tx.Runestone.Mint != nil && *tx.Runestone.Mint == runeId {
|
||||
return true
|
||||
}
|
||||
// returns true if this tx etched this runeId
|
||||
if tx.RuneEtched && tx.BlockHeight == runeId.BlockHeight && tx.Index == runeId.TxIndex {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
for _, tx := range txs {
|
||||
if pkScript != nil && !isTxContainPkScript(tx) {
|
||||
continue
|
||||
}
|
||||
if runeId != (runes.RuneId{}) && !isTxContainRuneId(tx) {
|
||||
continue
|
||||
}
|
||||
filteredTxs = append(filteredTxs, tx)
|
||||
}
|
||||
var allRuneIds []runes.RuneId
|
||||
for _, tx := range filteredTxs {
|
||||
for id := range tx.Mints {
|
||||
allRuneIds = append(allRuneIds, id)
|
||||
}
|
||||
@@ -161,12 +215,11 @@ func (h *HttpHandler) GetTransactions(ctx *fiber.Ctx) (err error) {
|
||||
return errors.Wrap(err, "error during GetRuneEntryByRuneIdBatch")
|
||||
}
|
||||
|
||||
txList := make([]transaction, 0, len(txs))
|
||||
for _, tx := range txs {
|
||||
txList := make([]transaction, 0, len(filteredTxs))
|
||||
for _, tx := range filteredTxs {
|
||||
respTx := transaction{
|
||||
TxHash: tx.Hash,
|
||||
BlockHeight: tx.BlockHeight,
|
||||
Index: tx.Index,
|
||||
Timestamp: tx.Timestamp.Unix(),
|
||||
Inputs: make([]txInputOutput, 0, len(tx.Inputs)),
|
||||
Outputs: make([]txInputOutput, 0, len(tx.Outputs)),
|
||||
@@ -256,13 +309,6 @@ func (h *HttpHandler) GetTransactions(ctx *fiber.Ctx) (err error) {
|
||||
}
|
||||
txList = append(txList, respTx)
|
||||
}
|
||||
// sort by block height ASC, then index ASC
|
||||
slices.SortFunc(txList, func(t1, t2 transaction) int {
|
||||
if t1.BlockHeight != t2.BlockHeight {
|
||||
return int(t1.BlockHeight - t2.BlockHeight)
|
||||
}
|
||||
return int(t1.Index - t2.Index)
|
||||
})
|
||||
|
||||
resp := getTransactionsResponse{
|
||||
Result: &getTransactionsResult{
|
||||
|
||||
@@ -72,7 +72,6 @@ CREATE TABLE IF NOT EXISTS "runes_transactions" (
|
||||
"rune_etched" BOOLEAN NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS runes_transactions_block_height_idx ON "runes_transactions" USING BTREE ("block_height");
|
||||
CREATE INDEX IF NOT EXISTS runes_transactions_jsonb_idx ON "runes_transactions" USING GIN ("inputs", "outputs", "mints", "burns");
|
||||
|
||||
CREATE TABLE IF NOT EXISTS "runes_runestones" (
|
||||
"tx_hash" TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
@@ -40,23 +40,10 @@ SELECT * FROM runes_entries
|
||||
-- name: GetRuneIdFromRune :one
|
||||
SELECT rune_id FROM runes_entries WHERE rune = $1;
|
||||
|
||||
-- name: GetRuneTransactions :many
|
||||
SELECT * FROM runes_transactions
|
||||
LEFT JOIN runes_runestones ON runes_transactions.hash = runes_runestones.tx_hash
|
||||
WHERE (
|
||||
@filter_pk_script::BOOLEAN = FALSE -- if @filter_pk_script is TRUE, apply pk_script filter
|
||||
OR runes_transactions.outputs @> @pk_script_param::JSONB
|
||||
OR runes_transactions.inputs @> @pk_script_param::JSONB
|
||||
) AND (
|
||||
@filter_rune_id::BOOLEAN = FALSE -- if @filter_rune_id is TRUE, apply rune_id filter
|
||||
OR runes_transactions.outputs @> @rune_id_param::JSONB
|
||||
OR runes_transactions.inputs @> @rune_id_param::JSONB
|
||||
OR runes_transactions.mints ? @rune_id
|
||||
OR runes_transactions.burns ? @rune_id
|
||||
OR (runes_transactions.rune_etched = TRUE AND runes_transactions.block_height = @rune_id_block_height AND runes_transactions.index = @rune_id_tx_index)
|
||||
) AND (
|
||||
@block_height::INT = 0 OR runes_transactions.block_height = @block_height::INT -- if @block_height > 0, apply block_height filter
|
||||
);
|
||||
-- name: GetRuneTransactionsByHeight :many
|
||||
SELECT * FROM runes_transactions
|
||||
LEFT JOIN runes_runestones ON runes_transactions.hash = runes_runestones.tx_hash
|
||||
WHERE runes_transactions.block_height = $1;
|
||||
|
||||
-- name: CountRuneEntries :one
|
||||
SELECT COUNT(*) FROM runes_entries;
|
||||
|
||||
@@ -26,8 +26,7 @@ type RunesDataGatewayWithTx interface {
|
||||
type RunesReaderDataGateway interface {
|
||||
GetLatestBlock(ctx context.Context) (types.BlockHeader, error)
|
||||
GetIndexedBlockByHeight(ctx context.Context, height int64) (*entity.IndexedBlock, error)
|
||||
// GetRuneTransactions returns the runes transactions, filterable by pkScript, runeId and height. If pkScript, runeId or height is zero value, that filter is ignored.
|
||||
GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, height uint64) ([]*entity.RuneTransaction, error)
|
||||
GetRuneTransactionsByHeight(ctx context.Context, height uint64) ([]*entity.RuneTransaction, error)
|
||||
|
||||
GetRunesBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[runes.RuneId]*entity.OutPointBalance, error)
|
||||
GetUnspentOutPointBalancesByPkScript(ctx context.Context, pkScript []byte, blockHeight uint64) ([]*entity.OutPointBalance, error)
|
||||
|
||||
@@ -82,7 +82,6 @@ func (p *Processor) getHashPayload(header types.BlockHeader) ([]byte, error) {
|
||||
func serializeNewRuneEntry(entry *runes.RuneEntry) []byte {
|
||||
var sb strings.Builder
|
||||
sb.WriteString("newRuneEntry:")
|
||||
// nolint:goconst
|
||||
sb.WriteString("runeId:" + entry.RuneId.String())
|
||||
sb.WriteString("number:" + strconv.Itoa(int(entry.Number)))
|
||||
sb.WriteString("divisibility:" + strconv.Itoa(int(entry.Divisibility)))
|
||||
@@ -94,7 +93,6 @@ func serializeNewRuneEntry(entry *runes.RuneEntry) []byte {
|
||||
sb.WriteString("terms:")
|
||||
terms := entry.Terms
|
||||
if terms.Amount != nil {
|
||||
// nolint:goconst
|
||||
sb.WriteString("amount:" + terms.Amount.String())
|
||||
}
|
||||
if terms.Cap != nil {
|
||||
|
||||
@@ -16,8 +16,7 @@ import (
|
||||
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
|
||||
"github.com/gaze-network/indexer-network/modules/runes/runes"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclient"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclientv2"
|
||||
"github.com/gaze-network/uint128"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
@@ -30,7 +29,7 @@ type Processor struct {
|
||||
bitcoinClient btcclient.Contract
|
||||
bitcoinDataSource indexers.BitcoinDatasource
|
||||
network common.Network
|
||||
reportingClient *reportingclient.ReportingClient
|
||||
reportingClient *reportingclientv2.ReportingClient
|
||||
|
||||
newRuneEntries map[runes.RuneId]*runes.RuneEntry
|
||||
newRuneEntryStates map[runes.RuneId]*runes.RuneEntry
|
||||
@@ -40,7 +39,7 @@ type Processor struct {
|
||||
newRuneTxs []*entity.RuneTransaction
|
||||
}
|
||||
|
||||
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, bitcoinDataSource indexers.BitcoinDatasource, network common.Network, reportingClient *reportingclient.ReportingClient) *Processor {
|
||||
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, bitcoinDataSource indexers.BitcoinDatasource, network common.Network, reportingClient *reportingclientv2.ReportingClient) *Processor {
|
||||
return &Processor{
|
||||
runesDg: runesDg,
|
||||
indexerInfoDg: indexerInfoDg,
|
||||
@@ -73,7 +72,7 @@ func (p *Processor) VerifyStates(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
if p.reportingClient != nil {
|
||||
if err := p.reportingClient.SubmitNodeReport(ctx, "runes", p.network); err != nil {
|
||||
if err := p.reportingClient.SubmitNodeReport(ctx, "runes", p.network, Version); err != nil {
|
||||
return errors.Wrap(err, "failed to submit node report")
|
||||
}
|
||||
}
|
||||
@@ -159,7 +158,7 @@ func (p *Processor) ensureGenesisRune(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (p *Processor) Name() string {
|
||||
return "runes"
|
||||
return "Runes"
|
||||
}
|
||||
|
||||
func (p *Processor) CurrentBlock(ctx context.Context) (types.BlockHeader, error) {
|
||||
@@ -193,10 +192,7 @@ func (p *Processor) RevertData(ctx context.Context, from int64) error {
|
||||
}
|
||||
defer func() {
|
||||
if err := runesDgTx.Rollback(ctx); err != nil {
|
||||
logger.WarnContext(ctx, "failed to rollback transaction",
|
||||
slogx.Error(err),
|
||||
slogx.String("event", "rollback_runes_revert"),
|
||||
)
|
||||
logger.ErrorContext(ctx, "failed to rollback transaction", err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -18,27 +18,24 @@ import (
|
||||
"github.com/gaze-network/indexer-network/modules/runes/runes"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclient"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclientv2"
|
||||
"github.com/gaze-network/uint128"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
func (p *Processor) Process(ctx context.Context, blocks []*types.Block) error {
|
||||
for _, block := range blocks {
|
||||
ctx := logger.WithContext(ctx, slog.Int64("height", block.Header.Height))
|
||||
logger.DebugContext(ctx, "Processing new block", slog.Int("txs", len(block.Transactions)))
|
||||
ctx := logger.WithContext(ctx, slog.Int("block_height", int(block.Header.Height)))
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Processing block", slog.Int("txs", len(block.Transactions)))
|
||||
|
||||
for _, tx := range block.Transactions {
|
||||
if err := p.processTx(ctx, tx, block.Header); err != nil {
|
||||
return errors.Wrap(err, "failed to process tx")
|
||||
}
|
||||
}
|
||||
|
||||
if err := p.flushBlock(ctx, block.Header); err != nil {
|
||||
return errors.Wrap(err, "failed to flush block")
|
||||
}
|
||||
|
||||
logger.DebugContext(ctx, "Inserted new block")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -69,6 +66,13 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
for runeId, balance := range balances {
|
||||
unallocated[runeId] = unallocated[runeId].Add(balance.Amount)
|
||||
p.newSpendOutPoints = append(p.newSpendOutPoints, balance.OutPoint)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Found runes in tx input",
|
||||
slogx.Any("runeId", runeId),
|
||||
slogx.Stringer("amount", balance.Amount),
|
||||
slogx.Stringer("txHash", balance.OutPoint.Hash),
|
||||
slog.Int("txOutIndex", int(balance.OutPoint.Index)),
|
||||
slog.Int("blockHeight", int(tx.BlockHeight)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,6 +92,13 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
}
|
||||
allocated[output][runeId] = allocated[output][runeId].Add(amount)
|
||||
unallocated[runeId] = unallocated[runeId].Sub(amount)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Allocated runes to tx output",
|
||||
slogx.Any("runeId", runeId),
|
||||
slogx.Stringer("amount", amount),
|
||||
slog.Int("output", output),
|
||||
slogx.Stringer("txHash", tx.TxHash),
|
||||
slog.Int("blockHeight", int(tx.BlockHeight)),
|
||||
)
|
||||
}
|
||||
|
||||
mints := make(map[runes.RuneId]uint128.Uint128)
|
||||
@@ -120,6 +131,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
if !premine.IsZero() {
|
||||
unallocated[etchedRuneId] = unallocated[etchedRuneId].Add(premine)
|
||||
mints[etchedRuneId] = mints[etchedRuneId].Add(premine)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Minted premine", slogx.Any("runeId", etchedRuneId), slogx.Stringer("amount", premine))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,6 +206,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
// all input runes and minted runes in a tx with cenotaph are burned
|
||||
for runeId, amount := range unallocated {
|
||||
burns[runeId] = burns[runeId].Add(amount)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Burned runes in cenotaph", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
|
||||
}
|
||||
} else {
|
||||
// assign all un-allocated runes to the default output (pointer), or the first non
|
||||
@@ -223,6 +236,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
// if pointer is still nil, then no output is available. Burn all unallocated runes.
|
||||
for runeId, amount := range unallocated {
|
||||
burns[runeId] = burns[runeId].Add(amount)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Burned runes to no output", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -233,6 +247,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
// burn all allocated runes to OP_RETURN outputs
|
||||
for runeId, amount := range balances {
|
||||
burns[runeId] = burns[runeId].Add(amount)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Burned runes to OP_RETURN output", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -301,6 +316,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
}
|
||||
}
|
||||
p.newRuneTxs = append(p.newRuneTxs, &runeTx)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] created RuneTransaction", slogx.Any("runeTx", runeTx))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -397,6 +413,7 @@ func (p *Processor) mint(ctx context.Context, runeId runes.RuneId, blockHeader t
|
||||
if err := p.incrementMintCount(ctx, runeId, blockHeader); err != nil {
|
||||
return uint128.Zero, errors.Wrap(err, "failed to increment mint count")
|
||||
}
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Minted rune", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount), slogx.Stringer("mintCount", runeEntry.Mints), slogx.Stringer("cap", lo.FromPtr(runeEntry.Terms.Cap)))
|
||||
return amount, nil
|
||||
}
|
||||
|
||||
@@ -408,9 +425,11 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
|
||||
if rune != nil {
|
||||
minimumRune := runes.MinimumRuneAtHeight(p.network, uint64(tx.BlockHeight))
|
||||
if rune.Cmp(minimumRune) < 0 {
|
||||
logger.DebugContext(ctx, "invalid etching: rune is lower than minimum rune at this height", slogx.Any("rune", rune), slogx.Any("minimumRune", minimumRune))
|
||||
return nil, runes.RuneId{}, runes.Rune{}, nil
|
||||
}
|
||||
if rune.IsReserved() {
|
||||
logger.DebugContext(ctx, "invalid etching: rune is reserved", slogx.Any("rune", rune))
|
||||
return nil, runes.RuneId{}, runes.Rune{}, nil
|
||||
}
|
||||
|
||||
@@ -419,6 +438,7 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
|
||||
return nil, runes.RuneId{}, runes.Rune{}, errors.Wrap(err, "error during check rune existence")
|
||||
}
|
||||
if ok {
|
||||
logger.DebugContext(ctx, "invalid etching: rune already exists", slogx.Any("rune", rune))
|
||||
return nil, runes.RuneId{}, runes.Rune{}, nil
|
||||
}
|
||||
|
||||
@@ -428,6 +448,7 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
|
||||
return nil, runes.RuneId{}, runes.Rune{}, errors.Wrap(err, "error during check tx commits to rune")
|
||||
}
|
||||
if !commit {
|
||||
logger.DebugContext(ctx, "invalid etching: tx does not commit to the rune", slogx.Any("rune", rune))
|
||||
return nil, runes.RuneId{}, runes.Rune{}, nil
|
||||
}
|
||||
} else {
|
||||
@@ -443,7 +464,7 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
|
||||
|
||||
func (p *Processor) txCommitsToRune(ctx context.Context, tx *types.Transaction, rune runes.Rune) (bool, error) {
|
||||
commitment := rune.Commitment()
|
||||
for i, txIn := range tx.TxIn {
|
||||
for _, txIn := range tx.TxIn {
|
||||
tapscript, ok := extractTapScript(txIn.Witness)
|
||||
if !ok {
|
||||
continue
|
||||
@@ -471,7 +492,8 @@ func (p *Processor) txCommitsToRune(ctx context.Context, tx *types.Transaction,
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return false, errors.Wrapf(err, "can't get previous txout for txin `%v:%v`", tx.TxHash.String(), i)
|
||||
logger.ErrorContext(ctx, "failed to get pk script at out point", err)
|
||||
continue
|
||||
}
|
||||
pkScript := prevTx.TxOut[txIn.PreviousOutIndex].PkScript
|
||||
// input utxo must be P2TR
|
||||
@@ -554,6 +576,7 @@ func (p *Processor) createRuneEntry(ctx context.Context, runestone *runes.Runest
|
||||
}
|
||||
p.newRuneEntries[runeId] = runeEntry
|
||||
p.newRuneEntryStates[runeId] = runeEntry
|
||||
logger.DebugContext(ctx, "[RunesProcessor] created RuneEntry", slogx.Any("runeEntry", runeEntry))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -607,6 +630,7 @@ func (p *Processor) incrementBurnedAmount(ctx context.Context, burned map[runes.
|
||||
continue
|
||||
}
|
||||
runeEntry.BurnedAmount = runeEntry.BurnedAmount.Add(amount)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] burned amount incremented", slogx.Any("runeId", runeId), slogx.Any("amount", amount))
|
||||
p.newRuneEntryStates[runeId] = runeEntry
|
||||
}
|
||||
return nil
|
||||
@@ -674,10 +698,7 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
|
||||
}
|
||||
defer func() {
|
||||
if err := runesDgTx.Rollback(ctx); err != nil {
|
||||
logger.WarnContext(ctx, "failed to rollback transaction",
|
||||
slogx.Error(err),
|
||||
slogx.String("event", "rollback_runes_insertion"),
|
||||
)
|
||||
logger.ErrorContext(ctx, "[RunesProcessor] failed to rollback runes tx", err)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -789,7 +810,7 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
|
||||
|
||||
// submit event to reporting system
|
||||
if p.reportingClient != nil {
|
||||
if err := p.reportingClient.SubmitBlockReport(ctx, reportingclient.SubmitBlockReportPayload{
|
||||
if err := p.reportingClient.SubmitBlockReport(ctx, reportingclientv2.SubmitBlockReportPayloadData{
|
||||
Type: "runes",
|
||||
ClientVersion: Version,
|
||||
DBVersion: DBVersion,
|
||||
@@ -803,5 +824,6 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
|
||||
return errors.Wrap(err, "failed to submit block report")
|
||||
}
|
||||
}
|
||||
logger.InfoContext(ctx, "[RunesProcessor] block flushed")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -631,37 +631,13 @@ func (q *Queries) GetRuneIdFromRune(ctx context.Context, rune string) (string, e
|
||||
return rune_id, err
|
||||
}
|
||||
|
||||
const getRuneTransactions = `-- name: GetRuneTransactions :many
|
||||
SELECT hash, runes_transactions.block_height, index, timestamp, inputs, outputs, mints, burns, rune_etched, tx_hash, runes_runestones.block_height, etching, etching_divisibility, etching_premine, etching_rune, etching_spacers, etching_symbol, etching_terms, etching_terms_amount, etching_terms_cap, etching_terms_height_start, etching_terms_height_end, etching_terms_offset_start, etching_terms_offset_end, etching_turbo, edicts, mint, pointer, cenotaph, flaws FROM runes_transactions
|
||||
LEFT JOIN runes_runestones ON runes_transactions.hash = runes_runestones.tx_hash
|
||||
WHERE (
|
||||
$1::BOOLEAN = FALSE -- if @filter_pk_script is TRUE, apply pk_script filter
|
||||
OR runes_transactions.outputs @> $2::JSONB
|
||||
OR runes_transactions.inputs @> $2::JSONB
|
||||
) AND (
|
||||
$3::BOOLEAN = FALSE -- if @filter_rune_id is TRUE, apply rune_id filter
|
||||
OR runes_transactions.outputs @> $4::JSONB
|
||||
OR runes_transactions.inputs @> $4::JSONB
|
||||
OR runes_transactions.mints ? $5
|
||||
OR runes_transactions.burns ? $5
|
||||
OR (runes_transactions.rune_etched = TRUE AND runes_transactions.block_height = $6 AND runes_transactions.index = $7)
|
||||
) AND (
|
||||
$8::INT = 0 OR runes_transactions.block_height = $8::INT -- if @block_height > 0, apply block_height filter
|
||||
)
|
||||
const getRuneTransactionsByHeight = `-- name: GetRuneTransactionsByHeight :many
|
||||
SELECT hash, runes_transactions.block_height, index, timestamp, inputs, outputs, mints, burns, rune_etched, tx_hash, runes_runestones.block_height, etching, etching_divisibility, etching_premine, etching_rune, etching_spacers, etching_symbol, etching_terms, etching_terms_amount, etching_terms_cap, etching_terms_height_start, etching_terms_height_end, etching_terms_offset_start, etching_terms_offset_end, etching_turbo, edicts, mint, pointer, cenotaph, flaws FROM runes_transactions
|
||||
LEFT JOIN runes_runestones ON runes_transactions.hash = runes_runestones.tx_hash
|
||||
WHERE runes_transactions.block_height = $1
|
||||
`
|
||||
|
||||
type GetRuneTransactionsParams struct {
|
||||
FilterPkScript bool
|
||||
PkScriptParam []byte
|
||||
FilterRuneID bool
|
||||
RuneIDParam []byte
|
||||
RuneID []byte
|
||||
RuneIDBlockHeight int32
|
||||
RuneIDTxIndex int32
|
||||
BlockHeight int32
|
||||
}
|
||||
|
||||
type GetRuneTransactionsRow struct {
|
||||
type GetRuneTransactionsByHeightRow struct {
|
||||
Hash string
|
||||
BlockHeight int32
|
||||
Index int32
|
||||
@@ -694,24 +670,15 @@ type GetRuneTransactionsRow struct {
|
||||
Flaws pgtype.Int4
|
||||
}
|
||||
|
||||
func (q *Queries) GetRuneTransactions(ctx context.Context, arg GetRuneTransactionsParams) ([]GetRuneTransactionsRow, error) {
|
||||
rows, err := q.db.Query(ctx, getRuneTransactions,
|
||||
arg.FilterPkScript,
|
||||
arg.PkScriptParam,
|
||||
arg.FilterRuneID,
|
||||
arg.RuneIDParam,
|
||||
arg.RuneID,
|
||||
arg.RuneIDBlockHeight,
|
||||
arg.RuneIDTxIndex,
|
||||
arg.BlockHeight,
|
||||
)
|
||||
func (q *Queries) GetRuneTransactionsByHeight(ctx context.Context, blockHeight int32) ([]GetRuneTransactionsByHeightRow, error) {
|
||||
rows, err := q.db.Query(ctx, getRuneTransactionsByHeight, blockHeight)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []GetRuneTransactionsRow
|
||||
var items []GetRuneTransactionsByHeightRow
|
||||
for rows.Next() {
|
||||
var i GetRuneTransactionsRow
|
||||
var i GetRuneTransactionsByHeightRow
|
||||
if err := rows.Scan(
|
||||
&i.Hash,
|
||||
&i.BlockHeight,
|
||||
|
||||
@@ -306,7 +306,7 @@ func mapRuneTransactionTypeToParams(src entity.RuneTransaction) (gen.CreateRuneT
|
||||
}, runestoneParams, nil
|
||||
}
|
||||
|
||||
func extractModelRuneTxAndRunestone(src gen.GetRuneTransactionsRow) (gen.RunesTransaction, *gen.RunesRunestone, error) {
|
||||
func extractModelRuneTxAndRunestone(src gen.GetRuneTransactionsByHeightRow) (gen.RunesTransaction, *gen.RunesRunestone, error) {
|
||||
var runestone *gen.RunesRunestone
|
||||
if src.TxHash.Valid {
|
||||
// these fields should never be null
|
||||
|
||||
@@ -3,7 +3,6 @@ package postgres
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
@@ -62,21 +61,8 @@ func (r *Repository) GetIndexedBlockByHeight(ctx context.Context, height int64)
|
||||
return indexedBlock, nil
|
||||
}
|
||||
|
||||
func (r *Repository) GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, height uint64) ([]*entity.RuneTransaction, error) {
|
||||
pkScriptParam := []byte(fmt.Sprintf(`[{"pkScript":"%s"}]`, hex.EncodeToString(pkScript)))
|
||||
runeIdParam := []byte(fmt.Sprintf(`[{"runeId":"%s"}]`, runeId.String()))
|
||||
rows, err := r.queries.GetRuneTransactions(ctx, gen.GetRuneTransactionsParams{
|
||||
FilterPkScript: pkScript != nil,
|
||||
PkScriptParam: pkScriptParam,
|
||||
|
||||
FilterRuneID: runeId != runes.RuneId{},
|
||||
RuneIDParam: runeIdParam,
|
||||
RuneID: []byte(runeId.String()),
|
||||
RuneIDBlockHeight: int32(runeId.BlockHeight),
|
||||
RuneIDTxIndex: int32(runeId.TxIndex),
|
||||
|
||||
BlockHeight: int32(height),
|
||||
})
|
||||
func (r *Repository) GetRuneTransactionsByHeight(ctx context.Context, height uint64) ([]*entity.RuneTransaction, error) {
|
||||
rows, err := r.queries.GetRuneTransactionsByHeight(ctx, int32(height))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error during query")
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ func (r *Repository) Rollback(ctx context.Context) error {
|
||||
return errors.Wrap(err, "failed to rollback transaction")
|
||||
}
|
||||
if err == nil {
|
||||
logger.DebugContext(ctx, "rolled back transaction")
|
||||
logger.InfoContext(ctx, "rolled back transaction")
|
||||
}
|
||||
r.tx = nil
|
||||
return nil
|
||||
|
||||
@@ -5,11 +5,10 @@ import (
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
|
||||
"github.com/gaze-network/indexer-network/modules/runes/runes"
|
||||
)
|
||||
|
||||
func (u *Usecase) GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, height uint64) ([]*entity.RuneTransaction, error) {
|
||||
txs, err := u.runesDg.GetRuneTransactions(ctx, pkScript, runeId, height)
|
||||
func (u *Usecase) GetTransactionsByHeight(ctx context.Context, height uint64) ([]*entity.RuneTransaction, error) {
|
||||
txs, err := u.runesDg.GetRuneTransactionsByHeight(ctx, height)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error during GetTransactionsByHeight")
|
||||
}
|
||||
|
||||
105
pkg/crypto/crypto.go
Normal file
105
pkg/crypto/crypto.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package crypto
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec/v2"
|
||||
"github.com/btcsuite/btcd/btcec/v2/ecdsa"
|
||||
"github.com/btcsuite/btcd/btcutil"
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/cockroachdb/errors"
|
||||
ecies "github.com/ecies/go/v2"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
privateKey *btcec.PrivateKey
|
||||
eciesPrivateKey *ecies.PrivateKey
|
||||
}
|
||||
|
||||
func New(privateKeyStr string) (*Client, error) {
|
||||
if privateKeyStr != "" {
|
||||
privateKeyBytes, err := hex.DecodeString(privateKeyStr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "decode private key")
|
||||
}
|
||||
privateKey, _ := btcec.PrivKeyFromBytes(privateKeyBytes)
|
||||
|
||||
eciesPrivateKey := ecies.NewPrivateKeyFromBytes(privateKeyBytes)
|
||||
return &Client{
|
||||
privateKey: privateKey,
|
||||
eciesPrivateKey: eciesPrivateKey,
|
||||
}, nil
|
||||
}
|
||||
return &Client{}, nil
|
||||
}
|
||||
|
||||
func (c *Client) PublicKey() string {
|
||||
return hex.EncodeToString(c.privateKey.PubKey().SerializeCompressed())
|
||||
}
|
||||
|
||||
func (c *Client) WIF(params *chaincfg.Params) (string, error) {
|
||||
wif, err := btcutil.NewWIF(c.privateKey, params, true)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "wif convert failed")
|
||||
}
|
||||
return wif.String(), nil
|
||||
}
|
||||
|
||||
func (c *Client) Sign(message string) string {
|
||||
messageHash := chainhash.DoubleHashB([]byte(message))
|
||||
signature := ecdsa.Sign(c.privateKey, messageHash)
|
||||
return hex.EncodeToString(signature.Serialize())
|
||||
}
|
||||
|
||||
func (c *Client) Verify(message, sigStr, pubKeyStr string) (bool, error) {
|
||||
sigBytes, err := hex.DecodeString(sigStr)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "signature decode")
|
||||
}
|
||||
|
||||
pubBytes, err := hex.DecodeString(pubKeyStr)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "pubkey decode")
|
||||
}
|
||||
pubKey, err := btcec.ParsePubKey(pubBytes)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "pubkey parse")
|
||||
}
|
||||
|
||||
messageHash := chainhash.DoubleHashB([]byte(message))
|
||||
|
||||
signature, err := ecdsa.ParseSignature(sigBytes)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "signature parse")
|
||||
}
|
||||
return signature.Verify(messageHash, pubKey), nil
|
||||
}
|
||||
|
||||
func (c *Client) Encrypt(message, pubKeyStr string) (string, error) {
|
||||
pubKey, err := ecies.NewPublicKeyFromHex(pubKeyStr)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "parse pubkey")
|
||||
}
|
||||
|
||||
ciphertext, err := ecies.Encrypt(pubKey, []byte(message))
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "encrypt message")
|
||||
}
|
||||
|
||||
ciphertextStr := base64.StdEncoding.EncodeToString(ciphertext)
|
||||
return ciphertextStr, nil
|
||||
}
|
||||
|
||||
func (c *Client) Decrypt(ciphertextStr string) (string, error) {
|
||||
ciphertext, err := base64.StdEncoding.DecodeString(ciphertextStr)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "decode ciphertext")
|
||||
}
|
||||
plaintext, err := ecies.Decrypt(c.eciesPrivateKey, ciphertext)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "decrypt")
|
||||
}
|
||||
return string(plaintext), nil
|
||||
}
|
||||
65
pkg/crypto/crypto_test.go
Normal file
65
pkg/crypto/crypto_test.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package crypto
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Key for test only. DO NOT USE IN PRODUCTION
|
||||
const (
|
||||
privateKeyStr = "ce9c2fd75623e82a83ed743518ec7749f6f355f7301dd432400b087717fed2f2"
|
||||
mainnetKey = "L49LKamtrPZxty5TG7jaFPHMRZbrvAr4Dvn5BHGdvmvbcTDNAbZj"
|
||||
pubKeyStr = "0251e2dfcdeea17cc9726e4be0855cd0bae19e64f3e247b10760cd76851e7df47e"
|
||||
)
|
||||
|
||||
func TestEncryptDecrypt(t *testing.T) {
|
||||
plaintext := "hello world"
|
||||
|
||||
privClient, err := New(privateKeyStr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
pubClient, err := New("")
|
||||
assert.NoError(t, err)
|
||||
|
||||
ciphertext, err := pubClient.Encrypt(plaintext, pubKeyStr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
decrypted, err := privClient.Decrypt(ciphertext)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, plaintext, decrypted)
|
||||
}
|
||||
|
||||
func TestSignVerify(t *testing.T) {
|
||||
plaintext := "hello world"
|
||||
invalidSignature := "3044022066504a82e2bc23167214e05497a1ca957add9cacc078aa69f5417079a4d56f0002206b215920b046c779d4a58d4029c26dbadcaf6d3c884b3463f44e70ef9146c1cd"
|
||||
|
||||
privClient, err := New(privateKeyStr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
pubClient, err := New("")
|
||||
assert.NoError(t, err)
|
||||
|
||||
signature := privClient.Sign(plaintext)
|
||||
println(signature)
|
||||
verified, err := pubClient.Verify(plaintext, signature, pubKeyStr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.True(t, verified)
|
||||
|
||||
verified, err = pubClient.Verify(plaintext, invalidSignature, pubKeyStr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.False(t, verified)
|
||||
}
|
||||
|
||||
func TestWIF(t *testing.T) {
|
||||
privClient, err := New(privateKeyStr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
wifPrivKey, err := privClient.WIF(&chaincfg.MainNetParams)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, wifPrivKey, mainnetKey)
|
||||
}
|
||||
@@ -21,11 +21,7 @@ func NewHTTPErrorHandler() func(ctx *fiber.Ctx, err error) error {
|
||||
return errors.WithStack(ctx.Status(e.Code).SendString(e.Error()))
|
||||
}
|
||||
|
||||
logger.ErrorContext(ctx.UserContext(), "Something went wrong, unhandled api error",
|
||||
slogx.String("event", "api_unhandled_error"),
|
||||
slogx.Error(err),
|
||||
)
|
||||
|
||||
logger.ErrorContext(ctx.UserContext(), "unhandled error", slogx.Error(err))
|
||||
return errors.WithStack(ctx.Status(http.StatusInternalServerError).JSON(map[string]any{
|
||||
"error": "Internal Server Error",
|
||||
}))
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
|
||||
"github.com/Cleverse/go-utilities/utils"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/common/errs"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/valyala/fasthttp"
|
||||
)
|
||||
@@ -30,7 +29,7 @@ type Client struct {
|
||||
|
||||
func New(baseURL string, config ...Config) (*Client, error) {
|
||||
if _, err := url.Parse(baseURL); err != nil {
|
||||
return nil, errors.Join(errs.InvalidArgument, errors.Wrap(err, "can't parse base url"))
|
||||
return nil, errors.Wrap(err, "can't parse base url")
|
||||
}
|
||||
var cf Config
|
||||
if len(config) > 0 {
|
||||
@@ -115,7 +114,7 @@ func (h *Client) request(ctx context.Context, reqOptions RequestOptions) (*HttpR
|
||||
)
|
||||
}
|
||||
|
||||
logger.InfoContext(ctx, "Finished make request", slog.String("package", "httpclient"))
|
||||
logger.Info("Finished make request")
|
||||
}
|
||||
|
||||
fasthttp.ReleaseResponse(resp)
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
func durationToMsAttrReplacer(groups []string, attr slog.Attr) slog.Attr {
|
||||
if attr.Value.Kind() == slog.KindDuration {
|
||||
return slog.Attr{
|
||||
Key: attr.Key,
|
||||
Value: slog.Int64Value(attr.Value.Duration().Milliseconds()),
|
||||
}
|
||||
}
|
||||
return attr
|
||||
}
|
||||
@@ -7,18 +7,12 @@ import (
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
)
|
||||
|
||||
// NewGCPHandler returns a new GCP handler.
|
||||
// The handler writes logs to the os.Stdout and
|
||||
// replaces the default attribute keys/values with the GCP logging attribute keys/values
|
||||
//
|
||||
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
|
||||
func NewGCPHandler(opts *slog.HandlerOptions) slog.Handler {
|
||||
return slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||||
AddSource: true,
|
||||
Level: opts.Level,
|
||||
ReplaceAttr: attrReplacerChain(
|
||||
GCPAttrReplacer,
|
||||
durationToMsAttrReplacer,
|
||||
opts.ReplaceAttr,
|
||||
),
|
||||
})
|
||||
|
||||
@@ -34,9 +34,6 @@ func New(config Config) (*ReportingClient, error) {
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create http client")
|
||||
}
|
||||
if config.Name == "" {
|
||||
return nil, errors.New("reporting.name config is required if reporting is enabled")
|
||||
}
|
||||
return &ReportingClient{
|
||||
httpClient: httpClient,
|
||||
config: config,
|
||||
@@ -56,8 +53,6 @@ type SubmitBlockReportPayload struct {
|
||||
}
|
||||
|
||||
func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitBlockReportPayload) error {
|
||||
ctx = logger.WithContext(ctx, slog.String("package", "reporting_client"), slog.Any("payload", payload))
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't marshal payload")
|
||||
@@ -69,11 +64,9 @@ func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitB
|
||||
return errors.Wrap(err, "can't send request")
|
||||
}
|
||||
if resp.StatusCode() >= 400 {
|
||||
// TODO: unmashal response body and log it
|
||||
logger.WarnContext(ctx, "Reporting block event failed", slog.Any("resp_body", resp.Body()))
|
||||
return nil
|
||||
logger.WarnContext(ctx, "failed to submit block report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
|
||||
}
|
||||
logger.DebugContext(ctx, "Reported block event")
|
||||
logger.DebugContext(ctx, "block report submitted", slog.Any("payload", payload))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -93,9 +86,6 @@ func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, n
|
||||
WebsiteURL: r.config.WebsiteURL,
|
||||
IndexerAPIURL: r.config.IndexerAPIURL,
|
||||
}
|
||||
|
||||
ctx = logger.WithContext(ctx, slog.String("package", "reporting_client"), slog.Any("payload", payload))
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't marshal payload")
|
||||
@@ -107,8 +97,8 @@ func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, n
|
||||
return errors.Wrap(err, "can't send request")
|
||||
}
|
||||
if resp.StatusCode() >= 400 {
|
||||
logger.WarnContext(ctx, "Reporting node info failed", slog.Any("resp_body", resp.Body()))
|
||||
logger.WarnContext(ctx, "failed to submit node report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
|
||||
}
|
||||
logger.DebugContext(ctx, "Reported node info")
|
||||
logger.InfoContext(ctx, "node report submitted", slog.Any("payload", payload))
|
||||
return nil
|
||||
}
|
||||
|
||||
168
pkg/reportingclientv2/reportingclient.go
Normal file
168
pkg/reportingclientv2/reportingclient.go
Normal file
@@ -0,0 +1,168 @@
|
||||
package reportingclientv2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
|
||||
"github.com/Cleverse/go-utilities/utils"
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/common"
|
||||
"github.com/gaze-network/indexer-network/pkg/crypto"
|
||||
"github.com/gaze-network/indexer-network/pkg/httpclient"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Disabled bool `mapstructure:"disabled"`
|
||||
ReportCenter ReportCenter `mapstructure:"report_center"`
|
||||
NodeInfo NodeInfo `mapstructure:"node_info"`
|
||||
}
|
||||
|
||||
type NodeInfo struct {
|
||||
Name string `mapstructure:"name"`
|
||||
WebsiteURL string `mapstructure:"website_url"`
|
||||
APIURL string `mapstructure:"api_url"`
|
||||
}
|
||||
|
||||
type ReportCenter struct {
|
||||
BaseURL string `mapstructure:"base_url"`
|
||||
PublicKey string `mapstructure:"public_key"`
|
||||
}
|
||||
|
||||
type ReportingClient struct {
|
||||
httpClient *httpclient.Client
|
||||
cryptoClient *crypto.Client
|
||||
config Config
|
||||
}
|
||||
|
||||
const (
|
||||
defaultBaseURL = "https://indexer.api.gaze.network"
|
||||
defaultPublicKey = "039298683d53a1cbdb6f318d5ad4b12bc0d752f3a6cd62c19b2c22b1ae1e12fe05"
|
||||
)
|
||||
|
||||
func New(config Config, indexerPrivateKey string) (*ReportingClient, error) {
|
||||
config.ReportCenter.BaseURL = utils.Default(config.ReportCenter.BaseURL, defaultBaseURL)
|
||||
config.ReportCenter.PublicKey = utils.Default(config.ReportCenter.PublicKey, defaultPublicKey)
|
||||
httpClient, err := httpclient.New(config.ReportCenter.BaseURL)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create http client")
|
||||
}
|
||||
|
||||
cryptoClient, err := crypto.New(indexerPrivateKey)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create crypto client")
|
||||
}
|
||||
return &ReportingClient{
|
||||
httpClient: httpClient,
|
||||
config: config,
|
||||
cryptoClient: cryptoClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type SubmitBlockReportPayload struct {
|
||||
EncryptedData string `json:"encryptedData"`
|
||||
IndexerPublicKey string `json:"indexerPublicKey"`
|
||||
}
|
||||
|
||||
type SubmitBlockReportPayloadData struct {
|
||||
Type string `json:"type"`
|
||||
ClientVersion string `json:"clientVersion"`
|
||||
DBVersion int `json:"dbVersion"`
|
||||
EventHashVersion int `json:"eventHashVersion"`
|
||||
Network common.Network `json:"network"`
|
||||
BlockHeight uint64 `json:"blockHeight"`
|
||||
BlockHash chainhash.Hash `json:"blockHash"`
|
||||
EventHash chainhash.Hash `json:"eventHash"`
|
||||
CumulativeEventHash chainhash.Hash `json:"cumulativeEventHash"`
|
||||
IndexerPublicKey string `json:"indexerPublicKey"`
|
||||
}
|
||||
|
||||
func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitBlockReportPayloadData) error {
|
||||
payload.IndexerPublicKey = r.cryptoClient.PublicKey()
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't marshal payload")
|
||||
}
|
||||
|
||||
encryptedData, err := r.cryptoClient.Encrypt(string(data), r.config.ReportCenter.PublicKey)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't encrypt data")
|
||||
}
|
||||
bodyStruct := SubmitBlockReportPayload{
|
||||
EncryptedData: encryptedData,
|
||||
IndexerPublicKey: r.cryptoClient.PublicKey(),
|
||||
}
|
||||
body, err := json.Marshal(bodyStruct)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't marshal payload")
|
||||
}
|
||||
resp, err := r.httpClient.Post(ctx, "/v2/report/block", httpclient.RequestOptions{
|
||||
Body: body,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't send request")
|
||||
}
|
||||
if resp.StatusCode() >= 400 {
|
||||
logger.WarnContext(ctx, "failed to submit block report", slog.Any("payload", bodyStruct), slog.Any("responseBody", resp.Body()))
|
||||
} else {
|
||||
logger.DebugContext(ctx, "block report submitted", slog.Any("payload", bodyStruct))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type SubmitNodeReportPayload struct {
|
||||
Data SubmitNodeReportPayloadData `json:"data"`
|
||||
IndexerPublicKey string `json:"indexerPublicKey"`
|
||||
Signature string `json:"string"`
|
||||
}
|
||||
|
||||
type SubmitNodeReportPayloadData struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Network common.Network `json:"network"`
|
||||
WebsiteURL string `json:"websiteUrl,omitempty"`
|
||||
APIURL string `json:"apiUrl,omitempty"`
|
||||
IndexerPublicKey string `json:"indexerPublicKey"`
|
||||
ClientVersion string `json:"clientVersion"`
|
||||
}
|
||||
|
||||
func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, network common.Network, clientVersion string) error {
|
||||
payload := SubmitNodeReportPayload{
|
||||
Data: SubmitNodeReportPayloadData{
|
||||
Name: r.config.NodeInfo.Name,
|
||||
Type: module,
|
||||
Network: network,
|
||||
WebsiteURL: r.config.NodeInfo.WebsiteURL,
|
||||
APIURL: r.config.NodeInfo.APIURL,
|
||||
IndexerPublicKey: r.cryptoClient.PublicKey(),
|
||||
ClientVersion: clientVersion,
|
||||
},
|
||||
IndexerPublicKey: r.cryptoClient.PublicKey(),
|
||||
}
|
||||
|
||||
dataPayload, err := json.Marshal(payload.Data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't marshal payload data")
|
||||
}
|
||||
|
||||
payload.Signature = r.cryptoClient.Sign(string(dataPayload))
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't marshal payload")
|
||||
}
|
||||
resp, err := r.httpClient.Post(ctx, "/v2/report/node", httpclient.RequestOptions{
|
||||
Body: body,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't send request")
|
||||
}
|
||||
if resp.StatusCode() >= 400 {
|
||||
logger.WarnContext(ctx, "failed to submit node report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
|
||||
} else {
|
||||
logger.InfoContext(ctx, "node report submitted", slog.Any("payload", payload))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
96
pkg/reportingclientv2/reportingclient_test.go
Normal file
96
pkg/reportingclientv2/reportingclient_test.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package reportingclientv2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/gaze-network/indexer-network/common"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Key for test only. DO NOT USE IN PRODUCTION
|
||||
const (
|
||||
privateKeyStrOfficial = "ce9c2fd75623e82a83ed743518ec7749f6f355f7301dd432400b087717fed2f2"
|
||||
mainnetKeyOfficial = "L2hqSEYLjRQHHSiSgfNSFwaZ1dYpwn4PUTt2nQT8AefzYGQCwsGY"
|
||||
pubKeyStrOfficial = "0251e2dfcdeea17cc9726e4be0855cd0bae19e64f3e247b10760cd76851e7df47e"
|
||||
)
|
||||
|
||||
const (
|
||||
privateKeyStr1 = "a3a7d2c40c8bb7a4b4afa9a6b3ed613da1de233913ed07a017e6dd44ef542d80"
|
||||
mainnetKey1 = "L49LKamtrPZxty5TG7jaFPHMRZbrvAr4Dvn5BHGdvmvbcTDNAbZj"
|
||||
pubKeyStr1 = "02596e11b5a2104533d2732a3df35eadaeb61b189b9069715106e72e27c1de7775"
|
||||
)
|
||||
|
||||
const (
|
||||
privateKeyStr2 = "a3a7d2c40c8bb7a4b4afa9a6b3ed613da1de233913ed07a017e6dd44ef542d81"
|
||||
// mainnetKey1 = "L49LKamtrPZxty5TG7jaFPHMRZbrvAr4Dvn5BHGdvmvbcTDNAbZj"
|
||||
// pubKeyStr1 = "02596e11b5a2104533d2732a3df35eadaeb61b189b9069715106e72e27c1de7775"
|
||||
)
|
||||
|
||||
func TestReport1(t *testing.T) {
|
||||
block := 18
|
||||
hash, err := chainhash.NewHashFromStr(fmt.Sprintf("%d", block))
|
||||
assert.NoError(t, err)
|
||||
|
||||
configOfficial := Config{
|
||||
ReportCenter: ReportCenter{
|
||||
BaseURL: "https://indexer-dev.api.gaze.network",
|
||||
PublicKey: defaultPublicKey,
|
||||
},
|
||||
NodeInfo: NodeInfo{
|
||||
Name: "Official Node",
|
||||
APIURL: "http://localhost:2000",
|
||||
},
|
||||
}
|
||||
config1 := Config{
|
||||
ReportCenter: ReportCenter{
|
||||
BaseURL: "https://indexer-dev.api.gaze.network",
|
||||
PublicKey: defaultPublicKey,
|
||||
},
|
||||
NodeInfo: NodeInfo{
|
||||
Name: "Node 1",
|
||||
APIURL: "http://localhost:2000",
|
||||
},
|
||||
}
|
||||
config2 := Config{
|
||||
ReportCenter: ReportCenter{
|
||||
BaseURL: "https://indexer-dev.api.gaze.network",
|
||||
PublicKey: defaultPublicKey,
|
||||
},
|
||||
NodeInfo: NodeInfo{
|
||||
Name: "Node 2",
|
||||
APIURL: "http://localhost:2000",
|
||||
},
|
||||
}
|
||||
clientOfficial, err := New(configOfficial, privateKeyStrOfficial)
|
||||
assert.NoError(t, err)
|
||||
client1, err := New(config1, privateKeyStr1)
|
||||
assert.NoError(t, err)
|
||||
client2, err := New(config2, privateKeyStr2)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = clientOfficial.SubmitNodeReport(context.Background(), "runes", "mainnet", "v0.0.1")
|
||||
assert.NoError(t, err)
|
||||
err = client1.SubmitNodeReport(context.Background(), "runes", "mainnet", "v0.0.1")
|
||||
assert.NoError(t, err)
|
||||
err = client2.SubmitNodeReport(context.Background(), "runes", "mainnet", "v0.0.1")
|
||||
assert.NoError(t, err)
|
||||
|
||||
blockReport := SubmitBlockReportPayloadData{
|
||||
Type: "runes",
|
||||
ClientVersion: "v0.0.1",
|
||||
DBVersion: 1,
|
||||
EventHashVersion: 1,
|
||||
Network: common.NetworkMainnet,
|
||||
BlockHeight: uint64(block),
|
||||
BlockHash: *hash,
|
||||
EventHash: *hash,
|
||||
CumulativeEventHash: *hash,
|
||||
}
|
||||
|
||||
err = clientOfficial.SubmitBlockReport(context.Background(), blockReport)
|
||||
err = client1.SubmitBlockReport(context.Background(), blockReport)
|
||||
err = client2.SubmitBlockReport(context.Background(), blockReport)
|
||||
}
|
||||
Reference in New Issue
Block a user