mirror of
https://github.com/alexgo-io/gaze-indexer.git
synced 2026-01-13 08:40:30 +08:00
Compare commits
37 Commits
v0.1.0.bet
...
feature/do
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4409c65bd | ||
|
|
6b1579ed32 | ||
|
|
6020112e40 | ||
|
|
07bcccd102 | ||
|
|
7e3bfec557 | ||
|
|
f34acfea7d | ||
|
|
f454c40af3 | ||
|
|
603cd74bfd | ||
|
|
b448bebee0 | ||
|
|
fe6988627a | ||
|
|
69ab16c35a | ||
|
|
d7a66ea496 | ||
|
|
d84665633b | ||
|
|
1589f0d9d3 | ||
|
|
e45a8769b5 | ||
|
|
420ba0394a | ||
|
|
3ea4fa82ad | ||
|
|
93c44c8338 | ||
|
|
ac9132b163 | ||
|
|
f3a87d8130 | ||
|
|
7455bc772f | ||
|
|
31f8f8ca3e | ||
|
|
fc5e11b52a | ||
|
|
c371795498 | ||
|
|
ee0877afdd | ||
|
|
84e3724e34 | ||
|
|
17d86be552 | ||
|
|
f756f3178a | ||
|
|
bc23c7380c | ||
|
|
f6f0d83c07 | ||
|
|
db1216df7d | ||
|
|
6238b7e1c0 | ||
|
|
78a7611486 | ||
|
|
e542c7bff2 | ||
|
|
142f6bda69 | ||
|
|
b582b4b111 | ||
|
|
1a512ea4d4 |
1
.github/CODE_OF_CONDUCT.md
vendored
Normal file
1
.github/CODE_OF_CONDUCT.md
vendored
Normal file
@@ -0,0 +1 @@
|
||||
# Contributor Covenant Code of Conduct
|
||||
34
.github/CONTRIBUTING.md
vendored
Normal file
34
.github/CONTRIBUTING.md
vendored
Normal file
@@ -0,0 +1,34 @@
|
||||
# Contributing
|
||||
|
||||
Please note: we have a [code of conduct](https://github.com/gaze-network/gaze-indexer/blob/main/.github/CODE_OF_CONDUCT.md), please follow it in all your interactions with the Gaze Network project.
|
||||
|
||||
## Pull Requests or Commits
|
||||
|
||||
#### Message structured
|
||||
|
||||
```plaintext
|
||||
<type>(optional scope):<description>
|
||||
```
|
||||
|
||||
The `<type>` must be one of the following:
|
||||
|
||||
> feat:, refactor:, fix:, doc:, style:, perf:, test:, chore:, ci:, build:
|
||||
|
||||
- feat(runes): add Runes module to the project
|
||||
- refactor: change project structure
|
||||
- fix(btc): fix chain reorganization issue
|
||||
- doc: update \`run\` command documentation
|
||||
- style: fix linting issues
|
||||
- perf: improve performance of the bitcoin node datasource
|
||||
- test(runes): add unit tests for etching logic
|
||||
- chore: bump dependencies versions
|
||||
- ci: update CI configuration
|
||||
- build: update Dockerfile to use alpine
|
||||
|
||||
# 👍 Contribute
|
||||
|
||||
If you want to say **thank you** and/or support the active development of `Fiber`:
|
||||
|
||||
1. Add a [GitHub Star](https://github.com/gaze-network/gaze-indexer/stargazers) to the project.
|
||||
2. Follow and mention our [Twitter (𝕏)](https://twitter.com/Gaze_Network).
|
||||
3. Write a review or tutorial on [Medium](https://medium.com/), [Dev.to](https://dev.to/) or personal blog.
|
||||
22
.github/PULL_REQUEST_TEMPLATE.md
vendored
Normal file
22
.github/PULL_REQUEST_TEMPLATE.md
vendored
Normal file
@@ -0,0 +1,22 @@
|
||||
## Description
|
||||
|
||||
Please provide a clear and concise description of the changes you've made and the problem they address. Include the purpose of the change, any relevant issues it solves, and the benefits it brings to the project. If this change introduces new features or adjustments, highlight them here.
|
||||
|
||||
Fixes # (issue)
|
||||
|
||||
## Type of change
|
||||
|
||||
What types of changes does your code introduce to Appium?
|
||||
_Put an `x` in the boxes that apply_
|
||||
|
||||
- [ ] Bugfix (non-breaking change which fixes an issue)
|
||||
- [ ] New feature (non-breaking change which adds functionality)
|
||||
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
|
||||
- [ ] Enhancement (improvement to existing features and functionality)
|
||||
- [ ] Documentation update (changes to documentation)
|
||||
- [ ] Performance improvement (non-breaking change which improves efficiency)
|
||||
- [ ] Code consistency (non-breaking change which improves code reliability and robustness)
|
||||
|
||||
## Commit formatting
|
||||
|
||||
Please follow the commit message conventions for an easy way to identify the purpose or intention of a commit. Check out our commit message conventions in the [CONTRIBUTING.md](https://github.com/gaze-network/gaze-indexer/blob/main/.github/CONTRIBUTING.md#pull-requests-or-commits)
|
||||
77
.github/workflows/code-analysis.yml
vendored
Normal file
77
.github/workflows/code-analysis.yml
vendored
Normal file
@@ -0,0 +1,77 @@
|
||||
name: Code Analysis & Test
|
||||
on:
|
||||
workflow_dispatch:
|
||||
pull_request:
|
||||
branches:
|
||||
- develop
|
||||
- main
|
||||
paths:
|
||||
- "go.mod"
|
||||
- "go.sum"
|
||||
- "**.go"
|
||||
- ".golangci.yaml"
|
||||
- ".github/workflows/code-analysis.yml"
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
strategy:
|
||||
matrix:
|
||||
os: ["ubuntu-latest"]
|
||||
name: Lint (${{ matrix.os }})
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: "0"
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
cache-dependency-path: "**/*.sum"
|
||||
cache: true # caching and restoring go modules and build outputs.
|
||||
|
||||
- name: Lint
|
||||
uses: reviewdog/action-golangci-lint@v2
|
||||
with: # https://github.com/reviewdog/action-golangci-lint#inputs
|
||||
go_version_file: "go.mod"
|
||||
workdir: ./
|
||||
golangci_lint_flags: "--config=./.golangci.yaml --verbose --new-from-rev=${{ github.event.pull_request.base.sha }}"
|
||||
fail_on_error: true
|
||||
test:
|
||||
strategy:
|
||||
matrix:
|
||||
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
|
||||
go-version: ["1.22.x", "1.x"] # minimum version and latest version
|
||||
name: Test (${{ matrix.os }}/${{ matrix.go-version }})
|
||||
runs-on: ${{ matrix.os }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: "0"
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version: ${{ matrix.go-version }}
|
||||
cache: true # caching and restoring go modules and build outputs.
|
||||
- run: echo "GOVERSION=$(go version)" >> $GITHUB_ENV
|
||||
|
||||
- name: Build
|
||||
run: go build -v ./...
|
||||
|
||||
- name: Test
|
||||
run: go test -json ./... > test_output.json
|
||||
|
||||
- name: Summary Test Results
|
||||
if: always()
|
||||
uses: robherley/go-test-action@v0
|
||||
with:
|
||||
fromJSONFile: test_output.json
|
||||
|
||||
- name: Annotate Test Results
|
||||
if: always()
|
||||
uses: guyarb/golang-test-annotations@v0.5.1
|
||||
with:
|
||||
test-results: test_output.json
|
||||
2
.github/workflows/release-server.yml
vendored
2
.github/workflows/release-server.yml
vendored
@@ -32,6 +32,6 @@ jobs:
|
||||
uses: ./.github/workflows/reusable-build-and-push-ghcr.yml
|
||||
with:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
dockerfile: ./docker/Dockerfile
|
||||
image-repo: "ghcr.io/gaze-network/gaze-indexer"
|
||||
image-tag: ${{ needs.prepare.outputs.tag }}
|
||||
|
||||
28
.github/workflows/sqlc-verify.yml
vendored
Normal file
28
.github/workflows/sqlc-verify.yml
vendored
Normal file
@@ -0,0 +1,28 @@
|
||||
name: Sqlc ORM Framework Verify
|
||||
on:
|
||||
workflow_dispatch:
|
||||
pull_request:
|
||||
branches:
|
||||
- develop
|
||||
- main
|
||||
paths:
|
||||
- "sqlc.yaml"
|
||||
- "**.sql"
|
||||
- ".github/workflows/sqlc-verify.yml"
|
||||
|
||||
jobs:
|
||||
sqlc-diff:
|
||||
name: Sqlc Diff Checker
|
||||
runs-on: "ubuntu-latest" # "self-hosted", "ubuntu-latest", "macos-latest", "windows-latest"
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: "0"
|
||||
|
||||
- name: Setup Sqlc
|
||||
uses: sqlc-dev/setup-sqlc@v4
|
||||
with:
|
||||
sqlc-version: "1.26.0"
|
||||
|
||||
- name: Check Diff
|
||||
run: sqlc diff
|
||||
140
README.md
140
README.md
@@ -1 +1,139 @@
|
||||
# Gaze Indexer Network
|
||||
<!-- omit from toc -->
|
||||
# Gaze Indexer
|
||||
Gaze Indexer is an open-source and modular indexing client for Bitcoin meta-protocols. It has support for Bitcoin and Runes out of the box, with **Unified Consistent APIs** across fungible token protocols.
|
||||
|
||||
Gaze Indexer is built with **modularity** in mind, allowing users to run all modules in one monolithic instance with a single command, or as a distributed cluster of micro-services.
|
||||
|
||||
Gaze Indexer serves as a foundation for building ANY meta-protocol indexers, with efficient data fetching, reorg detection, and database migration tool.
|
||||
This allows developers to focus on what **truly** matters: Meta-protocol indexing logic. New meta-protocols can be easily added by implementing new modules.
|
||||
|
||||
Gaze Indexer also comes with a block reporting system for verifying data integrity of indexers. Visit the [Gaze Network dashboard](https://dash.gaze.network) to see the status of other indexers.
|
||||
|
||||
- [Modules](#modules)
|
||||
- [1. Bitcoin](#1-bitcoin)
|
||||
- [2. Runes](#2-runes)
|
||||
- [Installation](#installation)
|
||||
- [Prerequisites](#prerequisites)
|
||||
- [1. Hardware Requirements](#1-hardware-requirements)
|
||||
- [2. Prepare Bitcoin Core RPC server.](#2-prepare-bitcoin-core-rpc-server)
|
||||
- [3. Prepare database.](#3-prepare-database)
|
||||
- [4. Prepare `config.yaml` file.](#4-prepare-configyaml-file)
|
||||
- [Install with Docker (recommended)](#install-with-docker-recommended)
|
||||
- [Install from source](#install-from-source)
|
||||
|
||||
## Modules
|
||||
### 1. Bitcoin
|
||||
The Bitcoin Indexer, the heart of every meta-protocol, is responsible for indexing **Bitcoin transactions, blocks, and UTXOs**. It requires a Bitcoin Core RPC as source of Bitcoin transactions,
|
||||
and stores the indexed data in database to be used by other modules.
|
||||
|
||||
### 2. Runes
|
||||
The Runes Indexer is our first meta-protocol indexer. It indexes Runes states, transactions, runestones, and balances using Bitcoin transactions.
|
||||
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://documenter.getpostman.com/view/28396285/2sA3Bn7Cxr) for full details.
|
||||
|
||||
## Installation
|
||||
### Prerequisites
|
||||
#### 1. Hardware Requirements
|
||||
Each module requires different hardware requirements.
|
||||
| Module | CPU | RAM |
|
||||
| ------- | ---------- | ------ |
|
||||
| Bitcoin | 0.25 cores | 256 MB |
|
||||
| Runes | 0.5 cores | 1 GB |
|
||||
|
||||
#### 2. Prepare Bitcoin Core RPC server.
|
||||
Gaze Indexer needs to fetch transaction data from a Bitcoin Core RPC, either self-hosted or using managed providers like QuickNode.
|
||||
To self host a Bitcoin Core, see https://bitcoin.org/en/full-node.
|
||||
|
||||
#### 3. Prepare database.
|
||||
Gaze Indexer has first-class support for PostgreSQL. If you wish to use other databases, you can implement your own database repository that satisfies each module's Data Gateway interface.
|
||||
Here is our minimum database disk space requirement for each module.
|
||||
| Module | Database Storage |
|
||||
| ------- | ---------------- |
|
||||
| Bitcoin | 240 GB |
|
||||
| Runes | 150 GB |
|
||||
|
||||
#### 4. Prepare `config.yaml` file.
|
||||
```yaml
|
||||
# config.yaml
|
||||
logger:
|
||||
output: text # Output format for logs. current supported formats: "text" | "json" | "gcp"
|
||||
debug: false
|
||||
|
||||
bitcoin_node:
|
||||
host: "" # [Required] Host of Bitcoin Core RPC (without https://)
|
||||
user: "" # Username to authenticate with Bitcoin Core RPC
|
||||
pass: "" # Password to authenticate with Bitcoin Core RPC
|
||||
disable_tls: false # Set to true to disable tls
|
||||
|
||||
network: mainnet # Network to run the indexer on. Current supported networks: "mainnet" | "testnet"
|
||||
|
||||
reporting: # Block reporting configuration options. See Block Reporting section for more details.
|
||||
disabled: false # Set to true to disable block reporting to Gaze Network. Default is false.
|
||||
base_url: "https://indexer.api.gaze.network" # Defaults to "https://indexer.api.gaze.network" if left empty
|
||||
name: "" # [Required if not disabled] Name of this indexer to show on the Gaze Network dashboard
|
||||
website_url: "" # Public website URL to show on the dashboard. Can be left empty.
|
||||
indexer_api_url: "" # Public url to access this indexer's API. Can be left empty if you want to keep your indexer private.
|
||||
|
||||
http_server:
|
||||
port: 8080 # Port to run the HTTP server on for modules with HTTP API handlers.
|
||||
|
||||
modules:
|
||||
bitcoin: # Configuration options for Bitcoin module. Can be removed if not used.
|
||||
database: "postgres" # Database to store bitcoin data. current supported databases: "postgres"
|
||||
postgres:
|
||||
host: "localhost"
|
||||
port: 5432
|
||||
user: "postgres"
|
||||
password: "password"
|
||||
db_name: "postgres"
|
||||
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database credentials above.
|
||||
runes: # Configuration options for Runes module. Can be removed if not used.
|
||||
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
|
||||
datasource: "database" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node" | "database". If "database" is used, it will use the database config in bitcoin module as datasource.
|
||||
api_handlers: # API handlers to enable. current supported handlers: "http"
|
||||
- http
|
||||
postgres:
|
||||
host: "localhost"
|
||||
port: 5432
|
||||
user: "postgres"
|
||||
password: "password"
|
||||
db_name: "postgres"
|
||||
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database credentials above.
|
||||
```
|
||||
|
||||
### Install with Docker (recommended)
|
||||
We will be using `docker-compose` for our installation guide. Make sure the `docker-compose.yaml` file is in the same directory as the `config.yaml` file.
|
||||
```yaml
|
||||
# docker-compose.yaml
|
||||
services:
|
||||
gaze-indexer:
|
||||
image: ghcr.io/gaze-network/gaze-indexer:v1.0.0
|
||||
container_name: gaze-indexer
|
||||
restart: unless-stopped
|
||||
volumes:
|
||||
- './config.yaml:/app/config.yaml' # mount config.yaml file to the container as "/app/config.yaml"
|
||||
command: ["/app/main", "run", "--bitcoin", "--runes"] # Put module flags after "run" commands to select which modules to run.
|
||||
```
|
||||
|
||||
### Install from source
|
||||
1. Install `go` version 1.22 or higher. See Go installation guide [here](https://go.dev/doc/install).
|
||||
2. Clone this repository.
|
||||
```bash
|
||||
git clone https://github.com/gaze-network/gaze-indexer.git
|
||||
cd gaze-indexer
|
||||
```
|
||||
3. Build the main binary.
|
||||
```bash
|
||||
# Get dependencies
|
||||
go mod download
|
||||
|
||||
# Build the main binary
|
||||
go build -o gaze main.go
|
||||
```
|
||||
4. Run the main binary with the `run` command and module flags.
|
||||
```bash
|
||||
./gaze run --bitcoin --runes
|
||||
```
|
||||
If `config.yaml` is not located at `./app/config.yaml`, use the `--config` flag to specify the path to the `config.yaml` file.
|
||||
```bash
|
||||
./gaze run --bitcoin --runes --config /path/to/config.yaml
|
||||
```
|
||||
|
||||
@@ -21,6 +21,7 @@ var (
|
||||
cmds = []*cobra.Command{
|
||||
NewVersionCommand(),
|
||||
NewRunCommand(),
|
||||
NewMigrateCommand(),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -43,7 +44,7 @@ func Execute(ctx context.Context) {
|
||||
|
||||
// Initialize logger
|
||||
if err := logger.Init(config.Logger); err != nil {
|
||||
logger.Panic("Failed to initialize logger: %v", slogx.Error(err), slog.Any("config", config.Logger))
|
||||
logger.PanicContext(ctx, "Something went wrong, can't init logger", slogx.Error(err), slog.Any("config", config.Logger))
|
||||
}
|
||||
})
|
||||
|
||||
@@ -52,7 +53,7 @@ func Execute(ctx context.Context) {
|
||||
|
||||
// Execute command
|
||||
if err := cmd.ExecuteContext(ctx); err != nil {
|
||||
// use cobra to log error message by default
|
||||
logger.Debug("Failed to execute root command", slogx.Error(err))
|
||||
// Cobra will print the error message by default
|
||||
logger.DebugContext(ctx, "Error executing command", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
20
cmd/cmd_migrate.go
Normal file
20
cmd/cmd_migrate.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/gaze-network/indexer-network/cmd/migrate"
|
||||
_ "github.com/golang-migrate/migrate/v4/database/postgres"
|
||||
_ "github.com/golang-migrate/migrate/v4/source/file"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func NewMigrateCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "migrate",
|
||||
Short: "Migrate database schema",
|
||||
}
|
||||
cmd.AddCommand(
|
||||
migrate.NewMigrateUpCommand(),
|
||||
migrate.NewMigrateDownCommand(),
|
||||
)
|
||||
return cmd
|
||||
}
|
||||
201
cmd/cmd_run.go
201
cmd/cmd_run.go
@@ -37,7 +37,10 @@ import (
|
||||
fiberrecover "github.com/gofiber/fiber/v2/middleware/recover"
|
||||
"github.com/samber/lo"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
shutdownTimeout = 60 * time.Second
|
||||
)
|
||||
|
||||
type runCmdOptions struct {
|
||||
@@ -84,12 +87,23 @@ type HttpHandler interface {
|
||||
func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
conf := config.Load()
|
||||
|
||||
// Initialize context
|
||||
// Validate inputs
|
||||
{
|
||||
if !conf.Network.IsSupported() {
|
||||
return errors.Wrapf(errs.Unsupported, "%q network is not supported", conf.Network.String())
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize application process context
|
||||
ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
// Initialize worker context to separate worker's lifecycle from main process
|
||||
ctxWorker, stopWorker := context.WithCancel(context.Background())
|
||||
defer stopWorker()
|
||||
|
||||
// Add logger context
|
||||
ctx = logger.WithContext(ctx, slogx.Stringer("network", conf.Network))
|
||||
ctxWorker = logger.WithContext(ctxWorker, slogx.Stringer("network", conf.Network))
|
||||
|
||||
// Initialize Bitcoin Core RPC Client
|
||||
client, err := rpcclient.New(&rpcclient.ConnConfig{
|
||||
@@ -100,19 +114,18 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
HTTPPostMode: true,
|
||||
}, nil)
|
||||
if err != nil {
|
||||
logger.PanicContext(ctx, "Failed to create Bitcoin Core RPC Client", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "Invalid Bitcoin node configuration", slogx.Error(err))
|
||||
}
|
||||
defer client.Shutdown()
|
||||
|
||||
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
|
||||
if err := client.Ping(); err != nil {
|
||||
logger.PanicContext(ctx, "Failed to ping Bitcoin Core RPC Server", slogx.Error(err))
|
||||
}
|
||||
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server")
|
||||
|
||||
// Validate network
|
||||
if !conf.Network.IsSupported() {
|
||||
return errors.Wrapf(errs.Unsupported, "%q network is not supported", conf.Network.String())
|
||||
// Check Bitcoin RPC connection
|
||||
{
|
||||
start := time.Now()
|
||||
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
|
||||
if err := client.Ping(); err != nil {
|
||||
logger.PanicContext(ctx, "Can't connect to Bitcoin Core RPC Server", slogx.String("host", conf.BitcoinNode.Host), slogx.Error(err))
|
||||
}
|
||||
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server", slog.Duration("latency", time.Since(start)))
|
||||
}
|
||||
|
||||
// TODO: create module command package.
|
||||
@@ -121,19 +134,20 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
// TODO: refactor module name to specific type instead of string?
|
||||
httpHandlers := make(map[string]HttpHandler, 0)
|
||||
|
||||
// use gracefulEG to coordinate graceful shutdown after context is done. (e.g. shut down http server, shutdown logic of each module, etc.)
|
||||
gracefulEG, gctx := errgroup.WithContext(context.Background())
|
||||
|
||||
var reportingClient *reportingclient.ReportingClient
|
||||
if !conf.Reporting.Disabled {
|
||||
reportingClient, err = reportingclient.New(conf.Reporting)
|
||||
if err != nil {
|
||||
logger.PanicContext(ctx, "Failed to create reporting client", slogx.Error(err))
|
||||
if errors.Is(err, errs.InvalidArgument) {
|
||||
logger.PanicContext(ctx, "Invalid reporting configuration", slogx.Error(err))
|
||||
}
|
||||
logger.PanicContext(ctx, "Something went wrong, can't create reporting client", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize Bitcoin Indexer
|
||||
if opts.Bitcoin {
|
||||
ctx := logger.WithContext(ctx, slogx.String("module", "bitcoin"))
|
||||
var (
|
||||
btcDB btcdatagateway.BitcoinDataGateway
|
||||
indexerInfoDB btcdatagateway.IndexerInformationDataGateway
|
||||
@@ -142,55 +156,70 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
case "postgresql", "postgres", "pg":
|
||||
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
|
||||
if err != nil {
|
||||
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
|
||||
if errors.Is(err, errs.InvalidArgument) {
|
||||
logger.PanicContext(ctx, "Invalid Postgres configuration for indexer", slogx.Error(err))
|
||||
}
|
||||
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
|
||||
}
|
||||
defer pg.Close()
|
||||
repo := btcpostgres.NewRepository(pg)
|
||||
btcDB = repo
|
||||
indexerInfoDB = repo
|
||||
default:
|
||||
return errors.Wrapf(errs.Unsupported, "%q database is not supported", conf.Modules.Bitcoin.Database)
|
||||
return errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Bitcoin.Database)
|
||||
}
|
||||
if !opts.APIOnly {
|
||||
bitcoinProcessor := bitcoin.NewProcessor(conf, btcDB, indexerInfoDB)
|
||||
bitcoinNodeDatasource := datasources.NewBitcoinNode(client)
|
||||
bitcoinIndexer := indexers.NewBitcoinIndexer(bitcoinProcessor, bitcoinNodeDatasource)
|
||||
processor := bitcoin.NewProcessor(conf, btcDB, indexerInfoDB)
|
||||
datasource := datasources.NewBitcoinNode(client)
|
||||
indexer := indexers.NewBitcoinIndexer(processor, datasource)
|
||||
defer func() {
|
||||
if err := indexer.ShutdownWithTimeout(shutdownTimeout); err != nil {
|
||||
logger.ErrorContext(ctx, "Error during shutdown indexer", slogx.Error(err))
|
||||
return
|
||||
}
|
||||
logger.InfoContext(ctx, "Indexer stopped gracefully")
|
||||
}()
|
||||
|
||||
// Verify states before running Indexer
|
||||
if err := bitcoinProcessor.VerifyStates(ctx); err != nil {
|
||||
if err := processor.VerifyStates(ctx); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Run Indexer
|
||||
go func() {
|
||||
logger.InfoContext(ctx, "Starting Bitcoin Indexer")
|
||||
if err := bitcoinIndexer.Run(ctx); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to run Bitcoin Indexer", slogx.Error(err))
|
||||
}
|
||||
// stop main process if indexer stopped
|
||||
defer stop()
|
||||
|
||||
// stop main process if Bitcoin Indexer failed
|
||||
logger.InfoContext(ctx, "Bitcoin Indexer stopped. Stopping main process...")
|
||||
stop()
|
||||
logger.InfoContext(ctx, "Starting Gaze Indexer")
|
||||
if err := indexer.Run(ctxWorker); err != nil {
|
||||
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize Runes Indexer
|
||||
if opts.Runes {
|
||||
var runesDg runesdatagateway.RunesDataGateway
|
||||
var indexerInfoDg runesdatagateway.IndexerInfoDataGateway
|
||||
ctx := logger.WithContext(ctx, slogx.String("module", "runes"))
|
||||
var (
|
||||
runesDg runesdatagateway.RunesDataGateway
|
||||
indexerInfoDg runesdatagateway.IndexerInfoDataGateway
|
||||
)
|
||||
switch strings.ToLower(conf.Modules.Runes.Database) {
|
||||
case "postgresql", "postgres", "pg":
|
||||
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
|
||||
if err != nil {
|
||||
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
|
||||
if errors.Is(err, errs.InvalidArgument) {
|
||||
logger.PanicContext(ctx, "Invalid Postgres configuration for indexer", slogx.Error(err))
|
||||
}
|
||||
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
|
||||
}
|
||||
defer pg.Close()
|
||||
runesRepo := runespostgres.NewRepository(pg)
|
||||
runesDg = runesRepo
|
||||
indexerInfoDg = runesRepo
|
||||
default:
|
||||
logger.PanicContext(ctx, "Unsupported database", slogx.String("database", conf.Modules.Runes.Database))
|
||||
return errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Runes.Database)
|
||||
}
|
||||
var bitcoinDatasource indexers.BitcoinDatasource
|
||||
var bitcoinClient btcclient.Contract
|
||||
@@ -200,9 +229,12 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
bitcoinDatasource = bitcoinNodeDatasource
|
||||
bitcoinClient = bitcoinNodeDatasource
|
||||
case "database":
|
||||
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
|
||||
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
|
||||
if err != nil {
|
||||
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
|
||||
if errors.Is(err, errs.InvalidArgument) {
|
||||
logger.PanicContext(ctx, "Invalid Postgres configuration for datasource", slogx.Error(err))
|
||||
}
|
||||
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
|
||||
}
|
||||
defer pg.Close()
|
||||
btcRepo := btcpostgres.NewRepository(pg)
|
||||
@@ -212,24 +244,31 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
default:
|
||||
return errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
|
||||
}
|
||||
if !opts.APIOnly {
|
||||
runesProcessor := runes.NewProcessor(runesDg, indexerInfoDg, bitcoinClient, bitcoinDatasource, conf.Network, reportingClient)
|
||||
runesIndexer := indexers.NewBitcoinIndexer(runesProcessor, bitcoinDatasource)
|
||||
|
||||
if err := runesProcessor.VerifyStates(ctx); err != nil {
|
||||
if !opts.APIOnly {
|
||||
processor := runes.NewProcessor(runesDg, indexerInfoDg, bitcoinClient, bitcoinDatasource, conf.Network, reportingClient)
|
||||
indexer := indexers.NewBitcoinIndexer(processor, bitcoinDatasource)
|
||||
defer func() {
|
||||
if err := indexer.ShutdownWithTimeout(shutdownTimeout); err != nil {
|
||||
logger.ErrorContext(ctx, "Error during shutdown indexer", slogx.Error(err))
|
||||
return
|
||||
}
|
||||
logger.InfoContext(ctx, "Indexer stopped gracefully")
|
||||
}()
|
||||
|
||||
if err := processor.VerifyStates(ctx); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Run Indexer
|
||||
go func() {
|
||||
logger.InfoContext(ctx, "Started Runes Indexer")
|
||||
if err := runesIndexer.Run(ctx); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to run Runes Indexer", slogx.Error(err))
|
||||
}
|
||||
// stop main process if indexer stopped
|
||||
defer stop()
|
||||
|
||||
// stop main process if Runes Indexer failed
|
||||
logger.InfoContext(ctx, "Runes Indexer stopped. Stopping main process...")
|
||||
stop()
|
||||
logger.InfoContext(ctx, "Starting Gaze Indexer")
|
||||
if err := indexer.Run(ctxWorker); err != nil {
|
||||
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -242,7 +281,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
runesHTTPHandler := runesapi.NewHTTPHandler(conf.Network, runesUsecase)
|
||||
httpHandlers["runes"] = runesHTTPHandler
|
||||
default:
|
||||
logger.PanicContext(ctx, "Unsupported API handler", slogx.String("handler", handler))
|
||||
logger.PanicContext(ctx, "Something went wrong, unsupported API handler", slogx.String("handler", handler))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -251,7 +290,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
// Setup HTTP server if there are any HTTP handlers
|
||||
if len(httpHandlers) > 0 {
|
||||
app := fiber.New(fiber.Config{
|
||||
AppName: "gaze",
|
||||
AppName: "Gaze Indexer",
|
||||
ErrorHandler: errorhandler.NewHTTPErrorHandler(),
|
||||
})
|
||||
app.
|
||||
@@ -260,13 +299,22 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
StackTraceHandler: func(c *fiber.Ctx, e interface{}) {
|
||||
buf := make([]byte, 1024) // bufLen = 1024
|
||||
buf = buf[:runtime.Stack(buf, false)]
|
||||
logger.ErrorContext(c.UserContext(), "panic in http handler", slogx.Any("panic", e), slog.String("stacktrace", string(buf)))
|
||||
logger.ErrorContext(c.UserContext(), "Something went wrong, panic in http handler", slogx.Any("panic", e), slog.String("stacktrace", string(buf)))
|
||||
},
|
||||
})).
|
||||
Use(compress.New(compress.Config{
|
||||
Level: compress.LevelDefault,
|
||||
}))
|
||||
// ping handler
|
||||
|
||||
defer func() {
|
||||
if err := app.ShutdownWithTimeout(shutdownTimeout); err != nil {
|
||||
logger.ErrorContext(ctx, "Error during shutdown HTTP server", slogx.Error(err))
|
||||
return
|
||||
}
|
||||
logger.InfoContext(ctx, "HTTP server stopped gracefully")
|
||||
}()
|
||||
|
||||
// Health check
|
||||
app.Get("/", func(c *fiber.Ctx) error {
|
||||
return errors.WithStack(c.SendStatus(http.StatusOK))
|
||||
})
|
||||
@@ -274,34 +322,49 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
|
||||
// mount http handlers from each http-enabled module
|
||||
for module, handler := range httpHandlers {
|
||||
if err := handler.Mount(app); err != nil {
|
||||
logger.PanicContext(ctx, "Failed to mount HTTP handler", slogx.Error(err), slogx.String("module", module))
|
||||
logger.PanicContext(ctx, "Something went wrong, can't mount HTTP handler", slogx.Error(err), slogx.String("module", module))
|
||||
}
|
||||
logger.InfoContext(ctx, "Mounted HTTP handler", slogx.String("module", module))
|
||||
}
|
||||
|
||||
go func() {
|
||||
// stop main process if API stopped
|
||||
defer stop()
|
||||
|
||||
logger.InfoContext(ctx, "Started HTTP server", slog.Int("port", conf.HTTPServer.Port))
|
||||
if err := app.Listen(fmt.Sprintf(":%d", conf.HTTPServer.Port)); err != nil {
|
||||
logger.PanicContext(ctx, "Failed to start HTTP server", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "Something went wrong, error during running HTTP server", slogx.Error(err))
|
||||
}
|
||||
}()
|
||||
// handle graceful shutdown
|
||||
gracefulEG.Go(func() error {
|
||||
<-ctx.Done()
|
||||
logger.InfoContext(gctx, "Stopping HTTP server...")
|
||||
if err := app.ShutdownWithTimeout(60 * time.Second); err != nil {
|
||||
logger.ErrorContext(gctx, "Error during shutdown HTTP server", slogx.Error(err))
|
||||
}
|
||||
logger.InfoContext(gctx, "HTTP server stopped gracefully")
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Stop application if worker context is done
|
||||
go func() {
|
||||
<-ctxWorker.Done()
|
||||
defer stop()
|
||||
|
||||
logger.InfoContext(ctx, "Gaze Indexer Worker is stopped. Stopping application...")
|
||||
}()
|
||||
|
||||
logger.InfoContext(ctxWorker, "Gaze Indexer started")
|
||||
|
||||
// Wait for interrupt signal to gracefully stop the server
|
||||
<-ctx.Done()
|
||||
// wait until all graceful shutdown goroutines are done before returning
|
||||
if err := gracefulEG.Wait(); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to shutdown gracefully", slogx.Error(err))
|
||||
} else {
|
||||
logger.InfoContext(ctx, "Successfully shut down gracefully")
|
||||
}
|
||||
|
||||
// Force shutdown if timeout exceeded or got signal again
|
||||
go func() {
|
||||
defer os.Exit(1)
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.FatalContext(ctx, "Received exit signal again. Force shutdown...")
|
||||
case <-time.After(shutdownTimeout + 15*time.Second):
|
||||
logger.FatalContext(ctx, "Shutdown timeout exceeded. Force shutdown...")
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
128
cmd/migrate/cmd_down.go
Normal file
128
cmd/migrate/cmd_down.go
Normal file
@@ -0,0 +1,128 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
_ "github.com/golang-migrate/migrate/v4/database/postgres"
|
||||
_ "github.com/golang-migrate/migrate/v4/source/file"
|
||||
"github.com/samber/lo"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
type migrateDownCmdOptions struct {
|
||||
DatabaseURL string
|
||||
Bitcoin bool
|
||||
Runes bool
|
||||
All bool
|
||||
}
|
||||
|
||||
type migrateDownCmdArgs struct {
|
||||
N int
|
||||
}
|
||||
|
||||
func (a *migrateDownCmdArgs) ParseArgs(args []string) error {
|
||||
if len(args) > 0 {
|
||||
// assume args already validated by cobra to be len(args) <= 1
|
||||
n, err := strconv.Atoi(args[0])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to parse N")
|
||||
}
|
||||
if n < 0 {
|
||||
return errors.New("N must be a positive integer")
|
||||
}
|
||||
a.N = n
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewMigrateDownCommand() *cobra.Command {
|
||||
opts := &migrateDownCmdOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "down [N]",
|
||||
Short: "Apply all or N down migrations",
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
Example: `gaze migrate down --database "postgres://postgres:postgres@localhost:5432/gaze-indexer?sslmode=disable"`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// args already validated by cobra
|
||||
var downArgs migrateDownCmdArgs
|
||||
if err := downArgs.ParseArgs(args); err != nil {
|
||||
return errors.Wrap(err, "failed to parse args")
|
||||
}
|
||||
return migrateDownHandler(opts, cmd, downArgs)
|
||||
},
|
||||
}
|
||||
|
||||
flags := cmd.Flags()
|
||||
flags.BoolVar(&opts.Bitcoin, "bitcoin", false, "Apply Bitcoin down migrations")
|
||||
flags.BoolVar(&opts.Runes, "runes", false, "Apply Runes down migrations")
|
||||
flags.StringVar(&opts.DatabaseURL, "database", "", "Database url to run migration on")
|
||||
flags.BoolVar(&opts.All, "all", false, "Confirm apply ALL down migrations without prompt")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func migrateDownHandler(opts *migrateDownCmdOptions, _ *cobra.Command, args migrateDownCmdArgs) error {
|
||||
if opts.DatabaseURL == "" {
|
||||
return errors.New("--database is required")
|
||||
}
|
||||
databaseURL, err := url.Parse(opts.DatabaseURL)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to parse database URL")
|
||||
}
|
||||
if _, ok := supportedDrivers[databaseURL.Scheme]; !ok {
|
||||
return errors.Errorf("unsupported database driver: %s", databaseURL.Scheme)
|
||||
}
|
||||
// prevent accidental down all migrations
|
||||
if args.N == 0 && !opts.All {
|
||||
input := ""
|
||||
fmt.Print("Are you sure you want to apply all down migrations? (y/N):")
|
||||
fmt.Scanln(&input)
|
||||
if !lo.Contains([]string{"y", "yes"}, strings.ToLower(input)) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
applyDownMigrations := func(module string, sourcePath string, migrationTable string) error {
|
||||
newDatabaseURL := cloneURLWithQuery(databaseURL, url.Values{"x-migrations-table": {migrationTable}})
|
||||
sourceURL := "file://" + sourcePath
|
||||
m, err := migrate.New(sourceURL, newDatabaseURL.String())
|
||||
m.Log = &consoleLogger{
|
||||
prefix: fmt.Sprintf("[%s] ", module),
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create Migrate instance")
|
||||
}
|
||||
if args.N == 0 {
|
||||
m.Log.Printf("Applying down migrations...\n")
|
||||
err = m.Down()
|
||||
} else {
|
||||
m.Log.Printf("Applying %d down migrations...\n", args.N)
|
||||
err = m.Steps(-args.N)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, migrate.ErrNoChange) {
|
||||
return errors.Wrapf(err, "failed to apply %s down migrations", module)
|
||||
}
|
||||
m.Log.Printf("No more down migrations to apply\n")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if opts.Bitcoin {
|
||||
if err := applyDownMigrations("Bitcoin", bitcoinMigrationSource, "bitcoin_schema_migrations"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
if opts.Runes {
|
||||
if err := applyDownMigrations("Runes", runesMigrationSource, "runes_schema_migrations"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
112
cmd/migrate/cmd_up.go
Normal file
112
cmd/migrate/cmd_up.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
_ "github.com/golang-migrate/migrate/v4/database/postgres"
|
||||
_ "github.com/golang-migrate/migrate/v4/source/file"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
type migrateUpCmdOptions struct {
|
||||
DatabaseURL string
|
||||
Bitcoin bool
|
||||
Runes bool
|
||||
}
|
||||
|
||||
type migrateUpCmdArgs struct {
|
||||
N int
|
||||
}
|
||||
|
||||
func (a *migrateUpCmdArgs) ParseArgs(args []string) error {
|
||||
if len(args) > 0 {
|
||||
// assume args already validated by cobra to be len(args) <= 1
|
||||
n, err := strconv.Atoi(args[0])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to parse N")
|
||||
}
|
||||
a.N = n
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewMigrateUpCommand() *cobra.Command {
|
||||
opts := &migrateUpCmdOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "up [N]",
|
||||
Short: "Apply all or N up migrations",
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
Example: `gaze migrate up --database "postgres://postgres:postgres@localhost:5432/gaze-indexer?sslmode=disable"`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// args already validated by cobra
|
||||
var upArgs migrateUpCmdArgs
|
||||
if err := upArgs.ParseArgs(args); err != nil {
|
||||
return errors.Wrap(err, "failed to parse args")
|
||||
}
|
||||
return migrateUpHandler(opts, cmd, upArgs)
|
||||
},
|
||||
}
|
||||
|
||||
flags := cmd.Flags()
|
||||
flags.BoolVar(&opts.Bitcoin, "bitcoin", false, "Apply Bitcoin up migrations")
|
||||
flags.BoolVar(&opts.Runes, "runes", false, "Apply Runes up migrations")
|
||||
flags.StringVar(&opts.DatabaseURL, "database", "", "Database url to run migration on")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func migrateUpHandler(opts *migrateUpCmdOptions, _ *cobra.Command, args migrateUpCmdArgs) error {
|
||||
if opts.DatabaseURL == "" {
|
||||
return errors.New("--database is required")
|
||||
}
|
||||
databaseURL, err := url.Parse(opts.DatabaseURL)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to parse database URL")
|
||||
}
|
||||
if _, ok := supportedDrivers[databaseURL.Scheme]; !ok {
|
||||
return errors.Errorf("unsupported database driver: %s", databaseURL.Scheme)
|
||||
}
|
||||
|
||||
applyUpMigrations := func(module string, sourcePath string, migrationTable string) error {
|
||||
newDatabaseURL := cloneURLWithQuery(databaseURL, url.Values{"x-migrations-table": {migrationTable}})
|
||||
sourceURL := "file://" + sourcePath
|
||||
m, err := migrate.New(sourceURL, newDatabaseURL.String())
|
||||
m.Log = &consoleLogger{
|
||||
prefix: fmt.Sprintf("[%s] ", module),
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create Migrate instance")
|
||||
}
|
||||
if args.N == 0 {
|
||||
m.Log.Printf("Applying up migrations...\n")
|
||||
err = m.Up()
|
||||
} else {
|
||||
m.Log.Printf("Applying %d up migrations...\n", args.N)
|
||||
err = m.Steps(args.N)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, migrate.ErrNoChange) {
|
||||
return errors.Wrapf(err, "failed to apply %s up migrations", module)
|
||||
}
|
||||
m.Log.Printf("Migrations already up-to-date\n")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if opts.Bitcoin {
|
||||
if err := applyUpMigrations("Bitcoin", bitcoinMigrationSource, "bitcoin_schema_migrations"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
if opts.Runes {
|
||||
if err := applyUpMigrations("Runes", runesMigrationSource, "runes_schema_migrations"); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
22
cmd/migrate/logger.go
Normal file
22
cmd/migrate/logger.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
)
|
||||
|
||||
var _ migrate.Logger = (*consoleLogger)(nil)
|
||||
|
||||
type consoleLogger struct {
|
||||
prefix string
|
||||
verbose bool
|
||||
}
|
||||
|
||||
func (l *consoleLogger) Printf(format string, v ...interface{}) {
|
||||
fmt.Printf(l.prefix+format, v...)
|
||||
}
|
||||
|
||||
func (l *consoleLogger) Verbose() bool {
|
||||
return l.verbose
|
||||
}
|
||||
25
cmd/migrate/migrate.go
Normal file
25
cmd/migrate/migrate.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package migrate
|
||||
|
||||
import "net/url"
|
||||
|
||||
const (
|
||||
bitcoinMigrationSource = "modules/bitcoin/database/postgresql/migrations"
|
||||
runesMigrationSource = "modules/runes/database/postgresql/migrations"
|
||||
)
|
||||
|
||||
func cloneURLWithQuery(u *url.URL, newQuery url.Values) *url.URL {
|
||||
clone := *u
|
||||
query := clone.Query()
|
||||
for key, values := range newQuery {
|
||||
for _, value := range values {
|
||||
query.Add(key, value)
|
||||
}
|
||||
}
|
||||
clone.RawQuery = query.Encode()
|
||||
return &clone
|
||||
}
|
||||
|
||||
var supportedDrivers = map[string]struct{}{
|
||||
"postgres": {},
|
||||
"postgresql": {},
|
||||
}
|
||||
@@ -32,7 +32,7 @@ modules:
|
||||
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database connection configurations
|
||||
runes:
|
||||
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
|
||||
datasource: "bitcoin-node" # Data source to be used. current supported data sources: "bitcoin-node" | "postgres"
|
||||
datasource: "postgres" # Data source to be used (to fetch bitcoin blocks). current supported data sources: "bitcoin-node" | "postgres"
|
||||
api_handlers: # API handlers to be used. current supported handlers: "http"
|
||||
- http
|
||||
postgres:
|
||||
|
||||
@@ -39,7 +39,7 @@ func NewBitcoinNode(btcclient *rpcclient.Client) *BitcoinNodeDatasource {
|
||||
}
|
||||
|
||||
func (p BitcoinNodeDatasource) Name() string {
|
||||
return "BitcoinNode"
|
||||
return "bitcoin_node"
|
||||
}
|
||||
|
||||
// Fetch polling blocks from Bitcoin node
|
||||
@@ -83,6 +83,11 @@ func (d *BitcoinNodeDatasource) Fetch(ctx context.Context, from, to int64) ([]*t
|
||||
// - from: block height to start fetching, if -1, it will start from genesis block
|
||||
// - to: block height to stop fetching, if -1, it will fetch until the latest block
|
||||
func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
|
||||
ctx = logger.WithContext(ctx,
|
||||
slogx.String("package", "datasources"),
|
||||
slogx.String("datasource", d.Name()),
|
||||
)
|
||||
|
||||
from, to, skip, err := d.prepareRange(from, to)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to prepare fetch range")
|
||||
@@ -138,10 +143,10 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
if errors.Is(err, errs.Closed) {
|
||||
return
|
||||
}
|
||||
logger.WarnContext(ctx, "failed while dispatch block",
|
||||
slogx.Error(err),
|
||||
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
|
||||
slogx.Int64("start", data[0].Header.Height),
|
||||
slogx.Int64("end", data[len(data)-1].Header.Height),
|
||||
slogx.Error(err),
|
||||
)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
@@ -157,6 +162,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
done := subscription.Done()
|
||||
chunks := lo.Chunk(blockHeights, blockStreamChunkSize)
|
||||
for _, chunk := range chunks {
|
||||
// TODO: Implement throttling logic to control the rate of fetching blocks (block/sec)
|
||||
chunk := chunk
|
||||
select {
|
||||
case <-done:
|
||||
@@ -167,12 +173,11 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
stream.Go(func() []*types.Block {
|
||||
startAt := time.Now()
|
||||
defer func() {
|
||||
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched blocks",
|
||||
logger.DebugContext(ctx, "Fetched chunk of blocks from Bitcoin node",
|
||||
slogx.Int("total_blocks", len(chunk)),
|
||||
slogx.Int64("from", chunk[0]),
|
||||
slogx.Int64("to", chunk[len(chunk)-1]),
|
||||
slogx.Stringer("duration", time.Since(startAt)),
|
||||
slogx.Int64("duration_ms", time.Since(startAt).Milliseconds()),
|
||||
slogx.Duration("duration", time.Since(startAt)),
|
||||
)
|
||||
}()
|
||||
// TODO: should concurrent fetch block or not ?
|
||||
@@ -180,22 +185,21 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
for _, height := range chunk {
|
||||
hash, err := d.btcclient.GetBlockHash(height)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "failed to get block hash", slogx.Error(err), slogx.Int64("height", height))
|
||||
logger.ErrorContext(ctx, "Can't get block hash from Bitcoin node rpc", slogx.Error(err), slogx.Int64("height", height))
|
||||
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get block hash: height: %d", height)); err != nil {
|
||||
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
block, err := d.btcclient.GetBlock(hash)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "failed to get block", slogx.Error(err), slogx.Int64("height", height))
|
||||
logger.ErrorContext(ctx, "Can't get block data from Bitcoin node rpc", slogx.Error(err), slogx.Int64("height", height))
|
||||
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get block: height: %d, hash: %s", height, hash)); err != nil {
|
||||
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched block", slogx.Int64("height", height), slogx.String("hash", hash.String()))
|
||||
|
||||
blocks = append(blocks, types.ParseMsgBlock(block, height))
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package indexers
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
@@ -30,6 +31,10 @@ type BitcoinIndexer struct {
|
||||
Processor BitcoinProcessor
|
||||
Datasource BitcoinDatasource
|
||||
currentBlock types.BlockHeader
|
||||
|
||||
quitOnce sync.Once
|
||||
quit chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewBitcoinIndexer create new BitcoinIndexer
|
||||
@@ -37,12 +42,46 @@ func NewBitcoinIndexer(processor BitcoinProcessor, datasource BitcoinDatasource)
|
||||
return &BitcoinIndexer{
|
||||
Processor: processor,
|
||||
Datasource: datasource,
|
||||
|
||||
quit: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (*BitcoinIndexer) Type() string {
|
||||
return "bitcoin"
|
||||
}
|
||||
|
||||
func (i *BitcoinIndexer) Shutdown() error {
|
||||
return i.ShutdownWithContext(context.Background())
|
||||
}
|
||||
|
||||
func (i *BitcoinIndexer) ShutdownWithTimeout(timeout time.Duration) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
return i.ShutdownWithContext(ctx)
|
||||
}
|
||||
|
||||
func (i *BitcoinIndexer) ShutdownWithContext(ctx context.Context) (err error) {
|
||||
i.quitOnce.Do(func() {
|
||||
close(i.quit)
|
||||
select {
|
||||
case <-i.done:
|
||||
case <-time.After(180 * time.Second):
|
||||
err = errors.Wrap(errs.Timeout, "indexer shutdown timeout")
|
||||
case <-ctx.Done():
|
||||
err = errors.Wrap(ctx.Err(), "indexer shutdown context canceled")
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
|
||||
defer close(i.done)
|
||||
|
||||
ctx = logger.WithContext(ctx,
|
||||
slog.String("indexer", "bitcoin"),
|
||||
slog.String("package", "indexers"),
|
||||
slog.String("indexer", i.Type()),
|
||||
slog.String("processor", i.Processor.Name()),
|
||||
slog.String("datasource", i.Datasource.Name()),
|
||||
)
|
||||
@@ -56,38 +95,41 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
|
||||
i.currentBlock.Height = -1
|
||||
}
|
||||
|
||||
// TODO:
|
||||
// - compare db version in constants and database
|
||||
// - compare current network and local indexed network
|
||||
// - update indexer stats
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
ticker := time.NewTicker(pollingInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-i.quit:
|
||||
logger.InfoContext(ctx, "Got quit signal, stopping indexer")
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := i.process(ctx); err != nil {
|
||||
logger.ErrorContext(ctx, "failed to process", slogx.Error(err))
|
||||
return errors.Wrap(err, "failed to process")
|
||||
logger.ErrorContext(ctx, "Indexer failed while processing", slogx.Error(err))
|
||||
return errors.Wrap(err, "process failed")
|
||||
}
|
||||
logger.DebugContext(ctx, "Waiting for next polling interval")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
ch := make(chan []*types.Block)
|
||||
// height range to fetch data
|
||||
from, to := i.currentBlock.Height+1, int64(-1)
|
||||
logger.InfoContext(ctx, "Fetching blocks", slog.Int64("from", from), slog.Int64("to", to))
|
||||
|
||||
logger.InfoContext(ctx, "Start fetching bitcoin blocks", slog.Int64("from", from))
|
||||
ch := make(chan []*types.Block)
|
||||
subscription, err := i.Datasource.FetchAsync(ctx, from, to, ch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to call fetch async")
|
||||
return errors.Wrap(err, "failed to fetch data")
|
||||
}
|
||||
defer subscription.Unsubscribe()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-i.quit:
|
||||
return nil
|
||||
case blocks := <-ch:
|
||||
// empty blocks
|
||||
if len(blocks) == 0 {
|
||||
@@ -96,7 +138,6 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
|
||||
startAt := time.Now()
|
||||
ctx := logger.WithContext(ctx,
|
||||
slog.Int("total_blocks", len(blocks)),
|
||||
slogx.Int64("from", blocks[0].Header.Height),
|
||||
slogx.Int64("to", blocks[len(blocks)-1].Header.Height),
|
||||
)
|
||||
@@ -105,7 +146,8 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
{
|
||||
remoteBlockHeader := blocks[0].Header
|
||||
if !remoteBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
|
||||
logger.WarnContext(ctx, "Reorg detected",
|
||||
logger.WarnContext(ctx, "Detected chain reorganization. Searching for fork point...",
|
||||
slogx.String("event", "reorg_detected"),
|
||||
slogx.Stringer("current_hash", i.currentBlock.Hash),
|
||||
slogx.Stringer("expected_hash", remoteBlockHeader.PrevBlock),
|
||||
)
|
||||
@@ -144,12 +186,15 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
return errors.Wrap(errs.SomethingWentWrong, "reorg look back limit reached")
|
||||
}
|
||||
|
||||
// Revert all data since the reorg block
|
||||
logger.WarnContext(ctx, "reverting reorg data",
|
||||
slogx.Int64("reorg_from", beforeReorgBlockHeader.Height+1),
|
||||
slogx.Int64("total_reorg_blocks", i.currentBlock.Height-beforeReorgBlockHeader.Height),
|
||||
slogx.Stringer("detect_duration", time.Since(start)),
|
||||
logger.InfoContext(ctx, "Found reorg fork point, starting to revert data...",
|
||||
slogx.String("event", "reorg_forkpoint"),
|
||||
slogx.Int64("since", beforeReorgBlockHeader.Height+1),
|
||||
slogx.Int64("total_blocks", i.currentBlock.Height-beforeReorgBlockHeader.Height),
|
||||
slogx.Duration("search_duration", time.Since(start)),
|
||||
)
|
||||
|
||||
// Revert all data since the reorg block
|
||||
start = time.Now()
|
||||
if err := i.Processor.RevertData(ctx, beforeReorgBlockHeader.Height+1); err != nil {
|
||||
return errors.Wrap(err, "failed to revert data")
|
||||
}
|
||||
@@ -157,10 +202,9 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
// Set current block to before reorg block and
|
||||
// end current round to fetch again
|
||||
i.currentBlock = beforeReorgBlockHeader
|
||||
logger.Info("Reverted data successfully",
|
||||
slogx.Any("current_block", i.currentBlock),
|
||||
slogx.Stringer("duration", time.Since(start)),
|
||||
slogx.Int64("duration_ms", time.Since(start).Milliseconds()),
|
||||
logger.Info("Fixing chain reorganization completed",
|
||||
slogx.Int64("current_block", i.currentBlock.Height),
|
||||
slogx.Duration("duration", time.Since(start)),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
@@ -173,12 +217,15 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
if !blocks[i].Header.PrevBlock.IsEqual(&blocks[i-1].Header.Hash) {
|
||||
logger.WarnContext(ctx, "reorg occurred while batch fetching blocks, need to try to fetch again")
|
||||
logger.WarnContext(ctx, "Chain Reorganization occurred in the middle of batch fetching blocks, need to try to fetch again")
|
||||
|
||||
// end current round
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
ctx = logger.WithContext(ctx, slog.Int("total_blocks", len(blocks)))
|
||||
|
||||
// Start processing blocks
|
||||
logger.InfoContext(ctx, "Processing blocks")
|
||||
if err := i.Processor.Process(ctx, blocks); err != nil {
|
||||
@@ -189,8 +236,9 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
i.currentBlock = blocks[len(blocks)-1].Header
|
||||
|
||||
logger.InfoContext(ctx, "Processed blocks successfully",
|
||||
slogx.Stringer("duration", time.Since(startAt)),
|
||||
slogx.Int64("duration_ms", time.Since(startAt).Milliseconds()),
|
||||
slogx.String("event", "processed_blocks"),
|
||||
slogx.Int64("current_block", i.currentBlock.Height),
|
||||
slogx.Duration("duration", time.Since(startAt)),
|
||||
)
|
||||
case <-subscription.Done():
|
||||
// end current round
|
||||
|
||||
@@ -2,12 +2,22 @@ package indexers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
)
|
||||
|
||||
const (
|
||||
// pollingInterval is the default polling interval for the indexer polling worker
|
||||
pollingInterval = 15 * time.Second
|
||||
)
|
||||
|
||||
type IndexerWorker interface {
|
||||
Type() string
|
||||
Run(ctx context.Context) error
|
||||
Shutdown() error
|
||||
ShutdownWithTimeout(timeout time.Duration) error
|
||||
ShutdownWithContext(ctx context.Context) error
|
||||
}
|
||||
|
||||
type Processor[T any] interface {
|
||||
|
||||
1
docker/.gitignore
vendored
Normal file
1
docker/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
volumes
|
||||
43
docker/config.example.yaml
Normal file
43
docker/config.example.yaml
Normal file
@@ -0,0 +1,43 @@
|
||||
logger:
|
||||
output: text
|
||||
debug: false
|
||||
|
||||
bitcoin_node:
|
||||
host: "bitcoin-mainnet-archive.allthatnode.com"
|
||||
user: "user"
|
||||
pass: "pass"
|
||||
disable_tls: false
|
||||
|
||||
network: mainnet
|
||||
|
||||
reporting:
|
||||
disabled: false
|
||||
base_url: "https://staging.indexer.api.gaze.network" # defaults to "https://indexer.api.gaze.network" if empty
|
||||
name: "Local Indexer"
|
||||
website_url: "" # public website URL to show on the dashboard. Can be left empty.
|
||||
indexer_api_url: "" # public url to access this api. Can be left empty.
|
||||
|
||||
้http_server:
|
||||
port: 8080
|
||||
|
||||
modules:
|
||||
bitcoin:
|
||||
database: "postgres" # Store bitcoin data in postgres
|
||||
postgres:
|
||||
host: "db"
|
||||
port: 5432
|
||||
user: "postgres"
|
||||
password: "password"
|
||||
db_name: "postgres"
|
||||
|
||||
runes:
|
||||
database: "postgres" # Store Runes data in postgres
|
||||
datasource: "postgres" # Fetch bitcoin blocks from postgres
|
||||
api_handlers:
|
||||
- http
|
||||
postgres:
|
||||
host: "db"
|
||||
port: 5432
|
||||
user: "postgres"
|
||||
password: "password"
|
||||
db_name: "postgres"
|
||||
47
docker/docker-compose.yaml
Normal file
47
docker/docker-compose.yaml
Normal file
@@ -0,0 +1,47 @@
|
||||
services:
|
||||
# TODO: need to mount migrations folder
|
||||
gaze-migrator:
|
||||
# image: ghcr.io/gaze-network/gaze-indexer:v0.1.0
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: ./docker/Dockerfile
|
||||
restart: on-failure
|
||||
depends_on:
|
||||
- db
|
||||
networks:
|
||||
- indexer
|
||||
command:
|
||||
["/app/main", "migrate", "up", "--bitcoin", "--runes", "--database", "postgres://postgres:password@db:5432/postgres?sslmode=disable"]
|
||||
|
||||
gaze-indexer:
|
||||
# image: ghcr.io/gaze-network/gaze-indexer:v0.1.0
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: ./docker/Dockerfile
|
||||
restart: unless-stopped
|
||||
depends_on:
|
||||
- db
|
||||
- gaze-migrator:
|
||||
condition: service_completed_successfully
|
||||
networks:
|
||||
- indexer
|
||||
ports:
|
||||
- 8080:8080
|
||||
volumes:
|
||||
- "./config.example.yaml:/app/config.yaml"
|
||||
command: ["/app/main", "run", "--bitcoin", "--runes"]
|
||||
|
||||
db:
|
||||
image: postgres:16-alpine
|
||||
volumes:
|
||||
- "./volumes/postgresql/data:/var/lib/postgresql/data"
|
||||
networks:
|
||||
- indexer
|
||||
environment:
|
||||
- POSTGRES_USER=postgres
|
||||
- POSTGRES_PASSWORD=password
|
||||
- POSTGRES_DB=postgres
|
||||
|
||||
networks:
|
||||
indexer:
|
||||
driver: bridge
|
||||
10
go.mod
10
go.mod
@@ -10,6 +10,7 @@ require (
|
||||
github.com/cockroachdb/errors v1.11.1
|
||||
github.com/gaze-network/uint128 v1.3.0
|
||||
github.com/gofiber/fiber/v2 v2.52.4
|
||||
github.com/golang-migrate/migrate/v4 v4.17.1
|
||||
github.com/jackc/pgx v3.6.2+incompatible
|
||||
github.com/jackc/pgx/v5 v5.5.5
|
||||
github.com/mcosta74/pgx-slog v0.3.0
|
||||
@@ -20,6 +21,7 @@ require (
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.18.2
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/valyala/fasthttp v1.51.0
|
||||
go.uber.org/automaxprocs v1.5.3
|
||||
golang.org/x/sync v0.5.0
|
||||
)
|
||||
@@ -39,6 +41,8 @@ require (
|
||||
github.com/getsentry/sentry-go v0.18.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/google/uuid v1.5.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
@@ -47,6 +51,7 @@ require (
|
||||
github.com/klauspost/compress v1.17.0 // indirect
|
||||
github.com/kr/pretty v0.3.1 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/lib/pq v1.10.9 // indirect
|
||||
github.com/magiconair/properties v1.8.7 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
@@ -64,13 +69,12 @@ require (
|
||||
github.com/spf13/cast v1.6.0 // indirect
|
||||
github.com/subosito/gotenv v1.6.0 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasthttp v1.51.0 // indirect
|
||||
github.com/valyala/tcplisten v1.0.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.9.0 // indirect
|
||||
golang.org/x/crypto v0.17.0 // indirect
|
||||
golang.org/x/crypto v0.20.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
|
||||
golang.org/x/sys v0.15.0 // indirect
|
||||
golang.org/x/sys v0.17.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
|
||||
43
go.sum
43
go.sum
@@ -1,5 +1,9 @@
|
||||
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
|
||||
github.com/Cleverse/go-utilities/utils v0.0.0-20240119201306-d71eb577ef11 h1:Xpbu03JdzqWEXcL6xr43Wxjnwh/Txt16WXJ7IlzvoxA=
|
||||
github.com/Cleverse/go-utilities/utils v0.0.0-20240119201306-d71eb577ef11/go.mod h1:ft8CEDBt0csuZ+yM/bKf7ZlV6lWvWY/TFXzp7+Ze9Jw=
|
||||
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
|
||||
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
|
||||
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
||||
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
|
||||
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
|
||||
@@ -51,6 +55,16 @@ github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc=
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
|
||||
github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218=
|
||||
github.com/dhui/dktest v0.4.1 h1:/w+IWuDXVymg3IrRJCHHOkMK10m9aNVMOyD0X12YVTg=
|
||||
github.com/dhui/dktest v0.4.1/go.mod h1:DdOqcUpL7vgyP4GlF3X3w7HbSlz8cEQzwewPveYEQbA=
|
||||
github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8=
|
||||
github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
|
||||
github.com/docker/docker v24.0.9+incompatible h1:HPGzNmwfLZWdxHqK9/II92pyi1EpYKsAqcl4G0Of9v0=
|
||||
github.com/docker/docker v24.0.9+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
|
||||
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
|
||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
@@ -69,6 +83,8 @@ github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1
|
||||
github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4=
|
||||
github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
@@ -86,6 +102,11 @@ github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
|
||||
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
|
||||
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
|
||||
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
|
||||
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
|
||||
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
@@ -130,6 +151,10 @@ github.com/mcosta74/pgx-slog v0.3.0 h1:v7nl8XKE4ObGxZfYUUs8uUWrimvNib2V4P7Mp0WjS
|
||||
github.com/mcosta74/pgx-slog v0.3.0/go.mod h1:73/rhilX7+ybQ9RH/BZBtOkTDiGAH1yBrcatN6jQW5E=
|
||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
|
||||
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
|
||||
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
|
||||
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
@@ -139,6 +164,10 @@ github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5
|
||||
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
|
||||
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
|
||||
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
|
||||
github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
|
||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
|
||||
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
|
||||
@@ -208,12 +237,14 @@ golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnf
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
|
||||
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
|
||||
golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg=
|
||||
golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ=
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
|
||||
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
@@ -222,6 +253,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL
|
||||
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
|
||||
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@@ -240,8 +273,8 @@ golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
|
||||
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
|
||||
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
@@ -251,6 +284,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
|
||||
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
||||
@@ -18,8 +18,9 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
configOnce sync.Once
|
||||
config = &Config{
|
||||
isInit bool
|
||||
mu sync.Mutex
|
||||
config = &Config{
|
||||
Logger: logger.Config{
|
||||
Output: "TEXT",
|
||||
},
|
||||
@@ -58,6 +59,38 @@ type HTTPServerConfig struct {
|
||||
|
||||
// Parse parse the configuration from environment variables
|
||||
func Parse(configFile ...string) Config {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return parse(configFile...)
|
||||
}
|
||||
|
||||
// Load returns the loaded configuration
|
||||
func Load() Config {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if isInit {
|
||||
return *config
|
||||
}
|
||||
return parse()
|
||||
}
|
||||
|
||||
// BindPFlag binds a specific key to a pflag (as used by cobra).
|
||||
// Example (where serverCmd is a Cobra instance):
|
||||
//
|
||||
// serverCmd.Flags().Int("port", 1138, "Port to run Application server on")
|
||||
// Viper.BindPFlag("port", serverCmd.Flags().Lookup("port"))
|
||||
func BindPFlag(key string, flag *pflag.Flag) {
|
||||
if err := viper.BindPFlag(key, flag); err != nil {
|
||||
logger.Panic("Something went wrong, failed to bind flag for config", slog.String("package", "config"), slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// SetDefault sets the default value for this key.
|
||||
// SetDefault is case-insensitive for a key.
|
||||
// Default only used when no value is provided by the user via flag, config or ENV.
|
||||
func SetDefault(key string, value any) { viper.SetDefault(key, value) }
|
||||
|
||||
func parse(configFile ...string) Config {
|
||||
ctx := logger.WithContext(context.Background(), slog.String("package", "config"))
|
||||
|
||||
if len(configFile) > 0 && configFile[0] != "" {
|
||||
@@ -72,39 +105,16 @@ func Parse(configFile ...string) Config {
|
||||
if err := viper.ReadInConfig(); err != nil {
|
||||
var errNotfound viper.ConfigFileNotFoundError
|
||||
if errors.As(err, &errNotfound) {
|
||||
logger.WarnContext(ctx, "config file not found, use default value", slogx.Error(err))
|
||||
logger.WarnContext(ctx, "Config file not found, use default config value", slogx.Error(err))
|
||||
} else {
|
||||
logger.PanicContext(ctx, "invalid config file", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "Invalid config file", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := viper.Unmarshal(&config); err != nil {
|
||||
logger.PanicContext(ctx, "failed to unmarshal config", slogx.Error(err))
|
||||
logger.PanicContext(ctx, "Something went wrong, failed to unmarshal config", slogx.Error(err))
|
||||
}
|
||||
|
||||
isInit = true
|
||||
return *config
|
||||
}
|
||||
|
||||
// Load returns the loaded configuration
|
||||
func Load() Config {
|
||||
configOnce.Do(func() {
|
||||
_ = Parse()
|
||||
})
|
||||
return *config
|
||||
}
|
||||
|
||||
// BindPFlag binds a specific key to a pflag (as used by cobra).
|
||||
// Example (where serverCmd is a Cobra instance):
|
||||
//
|
||||
// serverCmd.Flags().Int("port", 1138, "Port to run Application server on")
|
||||
// Viper.BindPFlag("port", serverCmd.Flags().Lookup("port"))
|
||||
func BindPFlag(key string, flag *pflag.Flag) {
|
||||
if err := viper.BindPFlag(key, flag); err != nil {
|
||||
logger.Panic("Failed to bind pflag for config", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// SetDefault sets the default value for this key.
|
||||
// SetDefault is case-insensitive for a key.
|
||||
// Default only used when no value is provided by the user via flag, config or ENV.
|
||||
func SetDefault(key string, value any) { viper.SetDefault(key, value) }
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/Cleverse/go-utilities/utils"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/common/errs"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
@@ -62,7 +63,7 @@ func NewPool(ctx context.Context, conf Config) (*pgxpool.Pool, error) {
|
||||
// Prepare connection pool configuration
|
||||
connConfig, err := pgxpool.ParseConfig(conf.String())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to parse config to create a new connection pool")
|
||||
return nil, errors.Join(errs.InvalidArgument, errors.Wrap(err, "failed while parse config"))
|
||||
}
|
||||
connConfig.MaxConns = utils.Default(conf.MaxConns, DefaultMaxConns)
|
||||
connConfig.MinConns = utils.Default(conf.MinConns, DefaultMinConns)
|
||||
|
||||
@@ -37,13 +37,13 @@ func NewClientDatabase(bitcoinDg datagateway.BitcoinDataGateway) *ClientDatabase
|
||||
}
|
||||
}
|
||||
|
||||
func (c ClientDatabase) Name() string {
|
||||
return "BitcoinDatabase"
|
||||
func (d ClientDatabase) Name() string {
|
||||
return "bitcoin_database"
|
||||
}
|
||||
|
||||
func (c *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
|
||||
func (d *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
|
||||
ch := make(chan []*types.Block)
|
||||
subscription, err := c.FetchAsync(ctx, from, to, ch)
|
||||
subscription, err := d.FetchAsync(ctx, from, to, ch)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
@@ -73,8 +73,13 @@ func (c *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Bl
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
|
||||
from, to, skip, err := c.prepareRange(ctx, from, to)
|
||||
func (d *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
|
||||
ctx = logger.WithContext(ctx,
|
||||
slogx.String("package", "datasources"),
|
||||
slogx.String("datasource", d.Name()),
|
||||
)
|
||||
|
||||
from, to, skip, err := d.prepareRange(ctx, from, to)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to prepare fetch range")
|
||||
}
|
||||
@@ -129,10 +134,10 @@ func (c *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan
|
||||
if errors.Is(err, errs.Closed) {
|
||||
return
|
||||
}
|
||||
logger.WarnContext(ctx, "failed while dispatch block",
|
||||
slogx.Error(err),
|
||||
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
|
||||
slogx.Int64("start", data[0].Header.Height),
|
||||
slogx.Int64("end", data[len(data)-1].Header.Height),
|
||||
slogx.Error(err),
|
||||
)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
@@ -159,16 +164,26 @@ func (c *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan
|
||||
continue
|
||||
}
|
||||
stream.Go(func() []*types.Block {
|
||||
startAt := time.Now()
|
||||
defer func() {
|
||||
logger.DebugContext(ctx, "Fetched chunk of blocks from Bitcoin node",
|
||||
slogx.Int("total_blocks", len(chunk)),
|
||||
slogx.Int64("from", chunk[0]),
|
||||
slogx.Int64("to", chunk[len(chunk)-1]),
|
||||
slogx.Duration("duration", time.Since(startAt)),
|
||||
)
|
||||
}()
|
||||
|
||||
fromHeight, toHeight := chunk[0], chunk[len(chunk)-1]
|
||||
blocks, err := c.bitcoinDg.GetBlocksByHeightRange(ctx, fromHeight, toHeight)
|
||||
blocks, err := d.bitcoinDg.GetBlocksByHeightRange(ctx, fromHeight, toHeight)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "failed to get blocks",
|
||||
logger.ErrorContext(ctx, "Can't get block data from Bitcoin database",
|
||||
slogx.Error(err),
|
||||
slogx.Int64("from_height", fromHeight),
|
||||
slogx.Int64("to_height", toHeight),
|
||||
slogx.Int64("from", fromHeight),
|
||||
slogx.Int64("to", toHeight),
|
||||
)
|
||||
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get blocks: from_height: %d, to_height: %d", fromHeight, toHeight)); err != nil {
|
||||
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ func NewProcessor(config config.Config, bitcoinDg datagateway.BitcoinDataGateway
|
||||
}
|
||||
|
||||
func (p Processor) Name() string {
|
||||
return "Bitcoin"
|
||||
return "bitcoin"
|
||||
}
|
||||
|
||||
func (p *Processor) Process(ctx context.Context, inputs []*types.Block) error {
|
||||
|
||||
@@ -70,8 +70,8 @@ func (p *Processor) isContinueFromLatestIndexedBlock(ctx context.Context, block
|
||||
// if the given block version is v1 and height is `91842` or `91880`,
|
||||
// then remove transaction inputs/outputs to prevent duplicate txin/txout error when inserting to the database.
|
||||
//
|
||||
// Theses duplicated coinbase transactions are having the same transaction input(from coinbase)/output and
|
||||
// utxo from these 2 duplicated coinbase txs can redeem only once), so, it's safe to remove them and can
|
||||
// Theses duplicated coinbase transactions are having the same transaction input/output and
|
||||
// utxo from these 2 duplicated coinbase txs can redeem only once. so, it's safe to remove them and can
|
||||
// use inputs/outputs from the previous block.
|
||||
//
|
||||
// Duplicate Coinbase Transactions:
|
||||
|
||||
@@ -14,22 +14,6 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
func mapBlockHeaderTypeToModel(src types.BlockHeader) gen.BitcoinBlock {
|
||||
return gen.BitcoinBlock{
|
||||
BlockHeight: int32(src.Height),
|
||||
BlockHash: src.Hash.String(),
|
||||
Version: src.Version,
|
||||
MerkleRoot: src.MerkleRoot.String(),
|
||||
PrevBlockHash: src.PrevBlock.String(),
|
||||
Timestamp: pgtype.Timestamptz{
|
||||
Time: src.Timestamp,
|
||||
Valid: true,
|
||||
},
|
||||
Bits: int64(src.Bits),
|
||||
Nonce: int64(src.Nonce),
|
||||
}
|
||||
}
|
||||
|
||||
func mapBlockHeaderModelToType(src gen.BitcoinBlock) (types.BlockHeader, error) {
|
||||
hash, err := chainhash.NewHashFromStr(src.BlockHash)
|
||||
if err != nil {
|
||||
@@ -55,80 +39,6 @@ func mapBlockHeaderModelToType(src gen.BitcoinBlock) (types.BlockHeader, error)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func mapBlockTypeToParams(src *types.Block) (gen.InsertBlockParams, gen.BatchInsertTransactionsParams, gen.BatchInsertTransactionTxOutsParams, gen.BatchInsertTransactionTxInsParams) {
|
||||
block := gen.InsertBlockParams{
|
||||
BlockHeight: int32(src.Header.Height),
|
||||
BlockHash: src.Header.Hash.String(),
|
||||
Version: src.Header.Version,
|
||||
MerkleRoot: src.Header.MerkleRoot.String(),
|
||||
PrevBlockHash: src.Header.PrevBlock.String(),
|
||||
Timestamp: pgtype.Timestamptz{
|
||||
Time: src.Header.Timestamp,
|
||||
Valid: true,
|
||||
},
|
||||
Bits: int64(src.Header.Bits),
|
||||
Nonce: int64(src.Header.Nonce),
|
||||
}
|
||||
txs := gen.BatchInsertTransactionsParams{
|
||||
TxHashArr: []string{},
|
||||
VersionArr: []int32{},
|
||||
LocktimeArr: []int64{},
|
||||
BlockHeightArr: []int32{},
|
||||
BlockHashArr: []string{},
|
||||
IdxArr: []int32{},
|
||||
}
|
||||
txouts := gen.BatchInsertTransactionTxOutsParams{
|
||||
TxHashArr: []string{},
|
||||
TxIdxArr: []int32{},
|
||||
PkscriptArr: []string{},
|
||||
ValueArr: []int64{},
|
||||
}
|
||||
txins := gen.BatchInsertTransactionTxInsParams{
|
||||
PrevoutTxHashArr: []string{},
|
||||
PrevoutTxIdxArr: []int32{},
|
||||
TxHashArr: []string{},
|
||||
TxIdxArr: []int32{},
|
||||
ScriptsigArr: []string{},
|
||||
WitnessArr: []string{},
|
||||
SequenceArr: []int32{},
|
||||
}
|
||||
|
||||
for txIdx, srcTx := range src.Transactions {
|
||||
txHash := srcTx.TxHash.String()
|
||||
// Batch insert transactions
|
||||
txs.TxHashArr = append(txs.TxHashArr, txHash)
|
||||
txs.VersionArr = append(txs.VersionArr, srcTx.Version)
|
||||
txs.LocktimeArr = append(txs.LocktimeArr, int64(srcTx.LockTime))
|
||||
txs.BlockHeightArr = append(txs.BlockHeightArr, int32(src.Header.Height))
|
||||
txs.BlockHashArr = append(txs.BlockHashArr, src.Header.Hash.String())
|
||||
txs.IdxArr = append(txs.IdxArr, int32(txIdx))
|
||||
|
||||
// Batch insert txins
|
||||
for idx, txin := range srcTx.TxIn {
|
||||
var witness string
|
||||
if len(txin.Witness) > 0 {
|
||||
witness = btcutils.WitnessToString(txin.Witness)
|
||||
}
|
||||
txins.TxHashArr = append(txins.TxHashArr, txHash)
|
||||
txins.TxIdxArr = append(txins.TxIdxArr, int32(idx))
|
||||
txins.PrevoutTxHashArr = append(txins.PrevoutTxHashArr, txin.PreviousOutTxHash.String())
|
||||
txins.PrevoutTxIdxArr = append(txins.PrevoutTxIdxArr, int32(txin.PreviousOutIndex))
|
||||
txins.ScriptsigArr = append(txins.ScriptsigArr, hex.EncodeToString(txin.SignatureScript))
|
||||
txins.WitnessArr = append(txins.WitnessArr, witness)
|
||||
txins.SequenceArr = append(txins.SequenceArr, int32(txin.Sequence))
|
||||
}
|
||||
|
||||
// Batch insert txouts
|
||||
for idx, txout := range srcTx.TxOut {
|
||||
txouts.TxHashArr = append(txouts.TxHashArr, txHash)
|
||||
txouts.TxIdxArr = append(txouts.TxIdxArr, int32(idx))
|
||||
txouts.PkscriptArr = append(txouts.PkscriptArr, hex.EncodeToString(txout.PkScript))
|
||||
txouts.ValueArr = append(txouts.ValueArr, txout.Value)
|
||||
}
|
||||
}
|
||||
return block, txs, txouts, txins
|
||||
}
|
||||
|
||||
func mapBlocksTypeToParams(src []*types.Block) (gen.BatchInsertBlocksParams, gen.BatchInsertTransactionsParams, gen.BatchInsertTransactionTxOutsParams, gen.BatchInsertTransactionTxInsParams) {
|
||||
blocks := gen.BatchInsertBlocksParams{
|
||||
BlockHeightArr: make([]int32, 0, len(src)),
|
||||
|
||||
@@ -82,6 +82,7 @@ func (p *Processor) getHashPayload(header types.BlockHeader) ([]byte, error) {
|
||||
func serializeNewRuneEntry(entry *runes.RuneEntry) []byte {
|
||||
var sb strings.Builder
|
||||
sb.WriteString("newRuneEntry:")
|
||||
// nolint:goconst
|
||||
sb.WriteString("runeId:" + entry.RuneId.String())
|
||||
sb.WriteString("number:" + strconv.Itoa(int(entry.Number)))
|
||||
sb.WriteString("divisibility:" + strconv.Itoa(int(entry.Divisibility)))
|
||||
@@ -93,6 +94,7 @@ func serializeNewRuneEntry(entry *runes.RuneEntry) []byte {
|
||||
sb.WriteString("terms:")
|
||||
terms := entry.Terms
|
||||
if terms.Amount != nil {
|
||||
// nolint:goconst
|
||||
sb.WriteString("amount:" + terms.Amount.String())
|
||||
}
|
||||
if terms.Cap != nil {
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
|
||||
"github.com/gaze-network/indexer-network/modules/runes/runes"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclient"
|
||||
"github.com/gaze-network/uint128"
|
||||
"github.com/samber/lo"
|
||||
@@ -158,7 +159,7 @@ func (p *Processor) ensureGenesisRune(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (p *Processor) Name() string {
|
||||
return "Runes"
|
||||
return "runes"
|
||||
}
|
||||
|
||||
func (p *Processor) CurrentBlock(ctx context.Context) (types.BlockHeader, error) {
|
||||
@@ -192,7 +193,10 @@ func (p *Processor) RevertData(ctx context.Context, from int64) error {
|
||||
}
|
||||
defer func() {
|
||||
if err := runesDgTx.Rollback(ctx); err != nil {
|
||||
logger.ErrorContext(ctx, "failed to rollback transaction", err)
|
||||
logger.WarnContext(ctx, "failed to rollback transaction",
|
||||
slogx.Error(err),
|
||||
slogx.String("event", "rollback_runes_revert"),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -25,17 +25,20 @@ import (
|
||||
|
||||
func (p *Processor) Process(ctx context.Context, blocks []*types.Block) error {
|
||||
for _, block := range blocks {
|
||||
ctx := logger.WithContext(ctx, slog.Int("block_height", int(block.Header.Height)))
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Processing block", slog.Int("txs", len(block.Transactions)))
|
||||
ctx := logger.WithContext(ctx, slog.Int64("height", block.Header.Height))
|
||||
logger.DebugContext(ctx, "Processing new block", slog.Int("txs", len(block.Transactions)))
|
||||
|
||||
for _, tx := range block.Transactions {
|
||||
if err := p.processTx(ctx, tx, block.Header); err != nil {
|
||||
return errors.Wrap(err, "failed to process tx")
|
||||
}
|
||||
}
|
||||
|
||||
if err := p.flushBlock(ctx, block.Header); err != nil {
|
||||
return errors.Wrap(err, "failed to flush block")
|
||||
}
|
||||
|
||||
logger.DebugContext(ctx, "Inserted new block")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -66,13 +69,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
for runeId, balance := range balances {
|
||||
unallocated[runeId] = unallocated[runeId].Add(balance.Amount)
|
||||
p.newSpendOutPoints = append(p.newSpendOutPoints, balance.OutPoint)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Found runes in tx input",
|
||||
slogx.Any("runeId", runeId),
|
||||
slogx.Stringer("amount", balance.Amount),
|
||||
slogx.Stringer("txHash", balance.OutPoint.Hash),
|
||||
slog.Int("txOutIndex", int(balance.OutPoint.Index)),
|
||||
slog.Int("blockHeight", int(tx.BlockHeight)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,13 +88,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
}
|
||||
allocated[output][runeId] = allocated[output][runeId].Add(amount)
|
||||
unallocated[runeId] = unallocated[runeId].Sub(amount)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Allocated runes to tx output",
|
||||
slogx.Any("runeId", runeId),
|
||||
slogx.Stringer("amount", amount),
|
||||
slog.Int("output", output),
|
||||
slogx.Stringer("txHash", tx.TxHash),
|
||||
slog.Int("blockHeight", int(tx.BlockHeight)),
|
||||
)
|
||||
}
|
||||
|
||||
mints := make(map[runes.RuneId]uint128.Uint128)
|
||||
@@ -131,7 +120,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
if !premine.IsZero() {
|
||||
unallocated[etchedRuneId] = unallocated[etchedRuneId].Add(premine)
|
||||
mints[etchedRuneId] = mints[etchedRuneId].Add(premine)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Minted premine", slogx.Any("runeId", etchedRuneId), slogx.Stringer("amount", premine))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,7 +194,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
// all input runes and minted runes in a tx with cenotaph are burned
|
||||
for runeId, amount := range unallocated {
|
||||
burns[runeId] = burns[runeId].Add(amount)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Burned runes in cenotaph", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
|
||||
}
|
||||
} else {
|
||||
// assign all un-allocated runes to the default output (pointer), or the first non
|
||||
@@ -236,7 +223,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
// if pointer is still nil, then no output is available. Burn all unallocated runes.
|
||||
for runeId, amount := range unallocated {
|
||||
burns[runeId] = burns[runeId].Add(amount)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Burned runes to no output", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -247,7 +233,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
// burn all allocated runes to OP_RETURN outputs
|
||||
for runeId, amount := range balances {
|
||||
burns[runeId] = burns[runeId].Add(amount)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Burned runes to OP_RETURN output", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -316,7 +301,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
|
||||
}
|
||||
}
|
||||
p.newRuneTxs = append(p.newRuneTxs, &runeTx)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] created RuneTransaction", slogx.Any("runeTx", runeTx))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -413,7 +397,6 @@ func (p *Processor) mint(ctx context.Context, runeId runes.RuneId, blockHeader t
|
||||
if err := p.incrementMintCount(ctx, runeId, blockHeader); err != nil {
|
||||
return uint128.Zero, errors.Wrap(err, "failed to increment mint count")
|
||||
}
|
||||
logger.DebugContext(ctx, "[RunesProcessor] Minted rune", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount), slogx.Stringer("mintCount", runeEntry.Mints), slogx.Stringer("cap", lo.FromPtr(runeEntry.Terms.Cap)))
|
||||
return amount, nil
|
||||
}
|
||||
|
||||
@@ -425,11 +408,9 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
|
||||
if rune != nil {
|
||||
minimumRune := runes.MinimumRuneAtHeight(p.network, uint64(tx.BlockHeight))
|
||||
if rune.Cmp(minimumRune) < 0 {
|
||||
logger.DebugContext(ctx, "invalid etching: rune is lower than minimum rune at this height", slogx.Any("rune", rune), slogx.Any("minimumRune", minimumRune))
|
||||
return nil, runes.RuneId{}, runes.Rune{}, nil
|
||||
}
|
||||
if rune.IsReserved() {
|
||||
logger.DebugContext(ctx, "invalid etching: rune is reserved", slogx.Any("rune", rune))
|
||||
return nil, runes.RuneId{}, runes.Rune{}, nil
|
||||
}
|
||||
|
||||
@@ -438,7 +419,6 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
|
||||
return nil, runes.RuneId{}, runes.Rune{}, errors.Wrap(err, "error during check rune existence")
|
||||
}
|
||||
if ok {
|
||||
logger.DebugContext(ctx, "invalid etching: rune already exists", slogx.Any("rune", rune))
|
||||
return nil, runes.RuneId{}, runes.Rune{}, nil
|
||||
}
|
||||
|
||||
@@ -448,7 +428,6 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
|
||||
return nil, runes.RuneId{}, runes.Rune{}, errors.Wrap(err, "error during check tx commits to rune")
|
||||
}
|
||||
if !commit {
|
||||
logger.DebugContext(ctx, "invalid etching: tx does not commit to the rune", slogx.Any("rune", rune))
|
||||
return nil, runes.RuneId{}, runes.Rune{}, nil
|
||||
}
|
||||
} else {
|
||||
@@ -464,7 +443,7 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
|
||||
|
||||
func (p *Processor) txCommitsToRune(ctx context.Context, tx *types.Transaction, rune runes.Rune) (bool, error) {
|
||||
commitment := rune.Commitment()
|
||||
for _, txIn := range tx.TxIn {
|
||||
for i, txIn := range tx.TxIn {
|
||||
tapscript, ok := extractTapScript(txIn.Witness)
|
||||
if !ok {
|
||||
continue
|
||||
@@ -492,8 +471,7 @@ func (p *Processor) txCommitsToRune(ctx context.Context, tx *types.Transaction,
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "failed to get pk script at out point", err)
|
||||
continue
|
||||
return false, errors.Wrapf(err, "can't get previous txout for txin `%v:%v`", tx.TxHash.String(), i)
|
||||
}
|
||||
pkScript := prevTx.TxOut[txIn.PreviousOutIndex].PkScript
|
||||
// input utxo must be P2TR
|
||||
@@ -576,7 +554,6 @@ func (p *Processor) createRuneEntry(ctx context.Context, runestone *runes.Runest
|
||||
}
|
||||
p.newRuneEntries[runeId] = runeEntry
|
||||
p.newRuneEntryStates[runeId] = runeEntry
|
||||
logger.DebugContext(ctx, "[RunesProcessor] created RuneEntry", slogx.Any("runeEntry", runeEntry))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -630,7 +607,6 @@ func (p *Processor) incrementBurnedAmount(ctx context.Context, burned map[runes.
|
||||
continue
|
||||
}
|
||||
runeEntry.BurnedAmount = runeEntry.BurnedAmount.Add(amount)
|
||||
logger.DebugContext(ctx, "[RunesProcessor] burned amount incremented", slogx.Any("runeId", runeId), slogx.Any("amount", amount))
|
||||
p.newRuneEntryStates[runeId] = runeEntry
|
||||
}
|
||||
return nil
|
||||
@@ -698,7 +674,10 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
|
||||
}
|
||||
defer func() {
|
||||
if err := runesDgTx.Rollback(ctx); err != nil {
|
||||
logger.ErrorContext(ctx, "[RunesProcessor] failed to rollback runes tx", err)
|
||||
logger.WarnContext(ctx, "failed to rollback transaction",
|
||||
slogx.Error(err),
|
||||
slogx.String("event", "rollback_runes_insertion"),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -824,6 +803,5 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
|
||||
return errors.Wrap(err, "failed to submit block report")
|
||||
}
|
||||
}
|
||||
logger.InfoContext(ctx, "[RunesProcessor] block flushed")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ func (r *Repository) Rollback(ctx context.Context) error {
|
||||
return errors.Wrap(err, "failed to rollback transaction")
|
||||
}
|
||||
if err == nil {
|
||||
logger.InfoContext(ctx, "rolled back transaction")
|
||||
logger.DebugContext(ctx, "rolled back transaction")
|
||||
}
|
||||
r.tx = nil
|
||||
return nil
|
||||
|
||||
@@ -21,7 +21,11 @@ func NewHTTPErrorHandler() func(ctx *fiber.Ctx, err error) error {
|
||||
return errors.WithStack(ctx.Status(e.Code).SendString(e.Error()))
|
||||
}
|
||||
|
||||
logger.ErrorContext(ctx.UserContext(), "unhandled error", slogx.Error(err))
|
||||
logger.ErrorContext(ctx.UserContext(), "Something went wrong, unhandled api error",
|
||||
slogx.String("event", "api_unhandled_error"),
|
||||
slogx.Error(err),
|
||||
)
|
||||
|
||||
return errors.WithStack(ctx.Status(http.StatusInternalServerError).JSON(map[string]any{
|
||||
"error": "Internal Server Error",
|
||||
}))
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/Cleverse/go-utilities/utils"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/common/errs"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/valyala/fasthttp"
|
||||
)
|
||||
@@ -29,7 +30,7 @@ type Client struct {
|
||||
|
||||
func New(baseURL string, config ...Config) (*Client, error) {
|
||||
if _, err := url.Parse(baseURL); err != nil {
|
||||
return nil, errors.Wrap(err, "can't parse base url")
|
||||
return nil, errors.Join(errs.InvalidArgument, errors.Wrap(err, "can't parse base url"))
|
||||
}
|
||||
var cf Config
|
||||
if len(config) > 0 {
|
||||
@@ -114,7 +115,7 @@ func (h *Client) request(ctx context.Context, reqOptions RequestOptions) (*HttpR
|
||||
)
|
||||
}
|
||||
|
||||
logger.Info("Finished make request")
|
||||
logger.InfoContext(ctx, "Finished make request", slog.String("package", "httpclient"))
|
||||
}
|
||||
|
||||
fasthttp.ReleaseResponse(resp)
|
||||
|
||||
15
pkg/logger/duration.go
Normal file
15
pkg/logger/duration.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
func durationToMsAttrReplacer(groups []string, attr slog.Attr) slog.Attr {
|
||||
if attr.Value.Kind() == slog.KindDuration {
|
||||
return slog.Attr{
|
||||
Key: attr.Key,
|
||||
Value: slog.Int64Value(attr.Value.Duration().Milliseconds()),
|
||||
}
|
||||
}
|
||||
return attr
|
||||
}
|
||||
@@ -7,12 +7,18 @@ import (
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
)
|
||||
|
||||
// NewGCPHandler returns a new GCP handler.
|
||||
// The handler writes logs to the os.Stdout and
|
||||
// replaces the default attribute keys/values with the GCP logging attribute keys/values
|
||||
//
|
||||
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
|
||||
func NewGCPHandler(opts *slog.HandlerOptions) slog.Handler {
|
||||
return slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||||
AddSource: true,
|
||||
Level: opts.Level,
|
||||
ReplaceAttr: attrReplacerChain(
|
||||
GCPAttrReplacer,
|
||||
durationToMsAttrReplacer,
|
||||
opts.ReplaceAttr,
|
||||
),
|
||||
})
|
||||
|
||||
@@ -34,6 +34,9 @@ func New(config Config) (*ReportingClient, error) {
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create http client")
|
||||
}
|
||||
if config.Name == "" {
|
||||
return nil, errors.New("reporting.name config is required if reporting is enabled")
|
||||
}
|
||||
return &ReportingClient{
|
||||
httpClient: httpClient,
|
||||
config: config,
|
||||
@@ -53,6 +56,8 @@ type SubmitBlockReportPayload struct {
|
||||
}
|
||||
|
||||
func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitBlockReportPayload) error {
|
||||
ctx = logger.WithContext(ctx, slog.String("package", "reporting_client"), slog.Any("payload", payload))
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't marshal payload")
|
||||
@@ -64,9 +69,11 @@ func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitB
|
||||
return errors.Wrap(err, "can't send request")
|
||||
}
|
||||
if resp.StatusCode() >= 400 {
|
||||
logger.WarnContext(ctx, "failed to submit block report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
|
||||
// TODO: unmashal response body and log it
|
||||
logger.WarnContext(ctx, "Reporting block event failed", slog.Any("resp_body", resp.Body()))
|
||||
return nil
|
||||
}
|
||||
logger.DebugContext(ctx, "block report submitted", slog.Any("payload", payload))
|
||||
logger.DebugContext(ctx, "Reported block event")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -86,6 +93,9 @@ func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, n
|
||||
WebsiteURL: r.config.WebsiteURL,
|
||||
IndexerAPIURL: r.config.IndexerAPIURL,
|
||||
}
|
||||
|
||||
ctx = logger.WithContext(ctx, slog.String("package", "reporting_client"), slog.Any("payload", payload))
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't marshal payload")
|
||||
@@ -97,8 +107,8 @@ func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, n
|
||||
return errors.Wrap(err, "can't send request")
|
||||
}
|
||||
if resp.StatusCode() >= 400 {
|
||||
logger.WarnContext(ctx, "failed to submit node report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
|
||||
logger.WarnContext(ctx, "Reporting node info failed", slog.Any("resp_body", resp.Body()))
|
||||
}
|
||||
logger.InfoContext(ctx, "node report submitted", slog.Any("payload", payload))
|
||||
logger.DebugContext(ctx, "Reported node info")
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user