Compare commits

..

14 Commits

Author SHA1 Message Date
gazenw
6d4f1d0e87 Release v0.2.0
Release v0.2.0
2024-05-16 14:50:03 +07:00
Gaze
b9fac74026 Merge remote-tracking branch 'origin/main' into develop
# Conflicts:
#	README.md
#	cmd/cmd_run.go
2024-05-16 14:37:37 +07:00
Nut Pinyo
62ecd7ea49 fix: runes tag parsing (#19) 2024-05-16 13:54:13 +07:00
gazenw
66ea2766a0 refactor: Separated modules initiator (#17)
* refactor: separated modules initiator

* fix: able to run with empty modules

* doc: update cmd desc

* refactor: sorting code flow

* fix: invalid apionly flow

* refactor: remove unnecessary flags

* feat: add default value for runes config

* refactor: use config instead bind flag to opts struct

* chore: remove unused comment

* refactor(runes): invoke only when in case

* feat: add http server default port

* feat: add logger context

* doc: update readme

---------

Co-authored-by: Gaze <gazenw@users.noreply.github.com>
2024-05-15 16:14:29 +07:00
gazenw
575c144428 fix: invalid pgx version (#18)
Co-authored-by: Planxnx <planxnx@users.noreply.github.com>
2024-05-15 03:19:10 +07:00
gazenw
f8fbd67bd8 fix: invalid pgx version (#18)
Co-authored-by: Planxnx <planxnx@users.noreply.github.com>
2024-05-15 03:17:54 +07:00
gazenw
c75b62bdf9 Remove bitcoin indexer (#16)
* doc: update README.md

* fix: remove bitcoin module

* fix: remove more config
2024-05-14 19:29:43 +07:00
gazenw
cc2649dd64 Update README.md: fix datasource 2024-05-13 14:19:32 +07:00
gazenw
d96370454b Remove bitcoin module (#15)
* fix: remove bitcoin module

* fix: remove more config
2024-05-13 14:18:45 +07:00
gazenw
c9a5c6d217 refactor: Generic Indexer (#14)
* refactor(indexer): init generic indexer

* refactor(btc): updatet datasource

* refactor: remove old indexers pkg

* doc: update comment

---------

Co-authored-by: Gaze <gazenw@users.noreply.github.com>
Co-authored-by: Planxnx <thanee@cleverse.com>
2024-05-08 16:15:33 +07:00
Gaze
86716c1915 feat: auto maxproc only for run command 2024-05-08 14:28:06 +07:00
gazenw
371d1fe008 doc: update README.md 2024-05-08 14:13:21 +07:00
Gaze
c6057d9511 perf(btc): remove unnecessary db index 2024-05-07 21:29:12 +07:00
Gaze
d37be5997b fix: remove parts of README.md 2024-04-30 10:44:53 +07:00
47 changed files with 444 additions and 2302 deletions

View File

@@ -2,18 +2,15 @@
# Gaze Indexer
Gaze Indexer is an open-source and modular indexing client for Bitcoin meta-protocols. It has support for Bitcoin and Runes out of the box, with **Unified Consistent APIs** across fungible token protocols.
Gaze Indexer is an open-source and modular indexing client for Bitcoin meta-protocols. It has support for Runes out of the box, with **Unified Consistent APIs** across fungible token protocols.
Gaze Indexer is built with **modularity** in mind, allowing users to run all modules in one monolithic instance with a single command, or as a distributed cluster of micro-services.
Gaze Indexer serves as a foundation for building ANY meta-protocol indexers, with efficient data fetching, reorg detection, and database migration tool.
This allows developers to focus on what **truly** matters: Meta-protocol indexing logic. New meta-protocols can be easily added by implementing new modules.
Gaze Indexer also comes with a block reporting system for verifying data integrity of indexers. Visit the [Gaze Network dashboard](https://dash.gaze.network) to see the status of other indexers.
- [Modules](#modules)
- [1. Bitcoin](#1-bitcoin)
- [2. Runes](#2-runes)
- [1. Runes](#1-runes)
- [Installation](#installation)
- [Prerequisites](#prerequisites)
- [1. Hardware Requirements](#1-hardware-requirements)
@@ -25,12 +22,7 @@ Gaze Indexer also comes with a block reporting system for verifying data integri
## Modules
### 1. Bitcoin
The Bitcoin Indexer, the heart of every meta-protocol, is responsible for indexing **Bitcoin transactions, blocks, and UTXOs**. It requires a Bitcoin Core RPC as source of Bitcoin transactions,
and stores the indexed data in database to be used by other modules.
### 2. Runes
### 1. Runes
The Runes Indexer is our first meta-protocol indexer. It indexes Runes states, transactions, runestones, and balances using Bitcoin transactions.
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://documenter.getpostman.com/view/28396285/2sA3Bn7Cxr) for full details.
@@ -42,10 +34,9 @@ It comes with a set of APIs for querying historical Runes data. See our [API Ref
#### 1. Hardware Requirements
Each module requires different hardware requirements.
| Module | CPU | RAM |
| ------- | ---------- | ------ |
| Bitcoin | 0.25 cores | 256 MB |
| Runes | 0.5 cores | 1 GB |
| Module | CPU | RAM |
| ------ | --------- | ---- |
| Runes | 0.5 cores | 1 GB |
#### 2. Prepare Bitcoin Core RPC server.
@@ -56,10 +47,11 @@ To self host a Bitcoin Core, see https://bitcoin.org/en/full-node.
Gaze Indexer has first-class support for PostgreSQL. If you wish to use other databases, you can implement your own database repository that satisfies each module's Data Gateway interface.
Here is our minimum database disk space requirement for each module.
| Module | Database Storage |
| ------- | ---------------- |
| Bitcoin | 240 GB |
| Runes | 150 GB |
| Module | Database Storage (current) | Database Storage (in 1 year) |
| ------ | -------------------------- | ---------------------------- |
| Runes | 10 GB | 150 GB |
Here is our minimum database disk space requirement for each module.
#### 4. Prepare `config.yaml` file.
@@ -93,21 +85,10 @@ http_server:
# Meta-protocol modules configuration options.
modules:
# Configuration options for Bitcoin module. Can be removed if not used.
bitcoin:
database: "postgres" # Database to store bitcoin data. current supported databases: "postgres"
postgres:
host: "localhost"
port: 5432
user: "postgres"
password: "password"
db_name: "postgres"
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database credentials above.
# Configuration options for Runes module. Can be removed if not used.
runes:
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
datasource: "database" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node" | "database". If "database" is used, it will use the database config in bitcoin module as datasource.
datasource: "bitcoin-node" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node".
api_handlers: # API handlers to enable. current supported handlers: "http"
- http
postgres:
@@ -134,7 +115,7 @@ services:
- 8080:8080 # Expose HTTP server port to host
volumes:
- "./config.yaml:/app/config.yaml" # mount config.yaml file to the container as "/app/config.yaml"
command: ["/app/main", "run", "--bitcoin", "--runes"] # Put module flags after "run" commands to select which modules to run.
command: ["/app/main", "run", "--runes"] # Put module flags after "run" commands to select which modules to run.
```
### Install from source
@@ -160,17 +141,17 @@ go build -o gaze main.go
4. Run database migrations with the `migrate` command and module flags.
```bash
./gaze migrate up --bitcoin --runes --database postgres://postgres:password@localhost:5432/postgres
./gaze migrate up --runes --database postgres://postgres:password@localhost:5432/postgres
```
5. Start the indexer with the `run` command and module flags.
```bash
./gaze run --bitcoin --runes
./gaze run --modules runes
```
If `config.yaml` is not located at `./app/config.yaml`, use the `--config` flag to specify the path to the `config.yaml` file.
```bash
./gaze run --bitcoin --runes --config /path/to/config.yaml
./gaze run --modules runes --config /path/to/config.yaml
```

View File

@@ -15,19 +15,10 @@ import (
"github.com/btcsuite/btcd/rpcclient"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/datasources"
"github.com/gaze-network/indexer-network/core/indexers"
"github.com/gaze-network/indexer-network/core/indexer"
"github.com/gaze-network/indexer-network/internal/config"
"github.com/gaze-network/indexer-network/internal/postgres"
"github.com/gaze-network/indexer-network/modules/bitcoin"
"github.com/gaze-network/indexer-network/modules/bitcoin/btcclient"
btcdatagateway "github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
btcpostgres "github.com/gaze-network/indexer-network/modules/bitcoin/repository/postgres"
"github.com/gaze-network/indexer-network/modules/runes"
runesapi "github.com/gaze-network/indexer-network/modules/runes/api"
runesdatagateway "github.com/gaze-network/indexer-network/modules/runes/datagateway"
runespostgres "github.com/gaze-network/indexer-network/modules/runes/repository/postgres"
runesusecase "github.com/gaze-network/indexer-network/modules/runes/usecase"
"github.com/gaze-network/indexer-network/pkg/automaxprocs"
"github.com/gaze-network/indexer-network/pkg/errorhandler"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
@@ -35,59 +26,49 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/compress"
fiberrecover "github.com/gofiber/fiber/v2/middleware/recover"
"github.com/samber/do/v2"
"github.com/samber/lo"
"github.com/spf13/cobra"
)
const (
shutdownTimeout = 60 * time.Second
// Register Modules
var Modules = do.Package(
do.LazyNamed("runes", runes.New),
)
type runCmdOptions struct {
APIOnly bool
Bitcoin bool
Runes bool
}
func NewRunCommand() *cobra.Command {
opts := &runCmdOptions{}
// Create command
runCmd := &cobra.Command{
Use: "run",
Short: "Start indexer-network service",
RunE: func(cmd *cobra.Command, args []string) error {
return runHandler(opts, cmd, args)
if err := automaxprocs.Init(); err != nil {
logger.Error("Failed to set GOMAXPROCS", slogx.Error(err))
}
return runHandler(cmd, args)
},
}
// TODO: separate flags and bind flags to each module cmd package.
// Add local flags
flags := runCmd.Flags()
flags.BoolVar(&opts.APIOnly, "api-only", false, "Run only API server")
flags.BoolVar(&opts.Bitcoin, "bitcoin", false, "Enable Bitcoin indexer module")
flags.String("bitcoin-db", "postgres", `Database to store bitcoin data. current supported databases: "postgres"`)
flags.BoolVar(&opts.Runes, "runes", false, "Enable Runes indexer module")
flags.String("runes-db", "postgres", `Database to store runes data. current supported databases: "postgres"`)
flags.String("runes-datasource", "bitcoin-node", `Datasource to fetch bitcoin data for processing Meta-Protocol data. current supported datasources: "bitcoin-node" | "database"`)
flags.Bool("api-only", false, "Run only API server")
flags.String("modules", "", "Enable specific modules to run. E.g. `runes,brc20`")
// Bind flags to configuration
config.BindPFlag("modules.bitcoin.database", flags.Lookup("bitcoin-db"))
config.BindPFlag("modules.runes.database", flags.Lookup("runes-db"))
config.BindPFlag("modules.runes.datasource", flags.Lookup("runes-datasource"))
config.BindPFlag("api_only", flags.Lookup("api-only"))
config.BindPFlag("enable_modules", flags.Lookup("modules"))
return runCmd
}
type HttpHandler interface {
Mount(router fiber.Router) error
}
const (
shutdownTimeout = 60 * time.Second
)
func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
func runHandler(cmd *cobra.Command, _ []string) error {
conf := config.Load()
// Validate inputs
// Validate inputs and configurations
{
if !conf.Network.IsSupported() {
return errors.Wrapf(errs.Unsupported, "%q network is not supported", conf.Network.String())
@@ -98,197 +79,57 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Initialize worker context to separate worker's lifecycle from main process
ctxWorker, stopWorker := context.WithCancel(context.Background())
defer stopWorker()
injector := do.New(Modules)
do.ProvideValue(injector, conf)
do.ProvideValue(injector, ctx)
// Add logger context
ctxWorker = logger.WithContext(ctxWorker, slogx.Stringer("network", conf.Network))
// Initialize Bitcoin RPC client
do.Provide(injector, func(i do.Injector) (*rpcclient.Client, error) {
conf := do.MustInvoke[config.Config](i)
// Initialize Bitcoin Core RPC Client
client, err := rpcclient.New(&rpcclient.ConnConfig{
Host: conf.BitcoinNode.Host,
User: conf.BitcoinNode.User,
Pass: conf.BitcoinNode.Pass,
DisableTLS: conf.BitcoinNode.DisableTLS,
HTTPPostMode: true,
}, nil)
if err != nil {
logger.PanicContext(ctx, "Invalid Bitcoin node configuration", slogx.Error(err))
}
defer client.Shutdown()
// Check Bitcoin RPC connection
{
start := time.Now()
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
if err := client.Ping(); err != nil {
logger.PanicContext(ctx, "Can't connect to Bitcoin Core RPC Server", slogx.String("host", conf.BitcoinNode.Host), slogx.Error(err))
client, err := rpcclient.New(&rpcclient.ConnConfig{
Host: conf.BitcoinNode.Host,
User: conf.BitcoinNode.User,
Pass: conf.BitcoinNode.Pass,
DisableTLS: conf.BitcoinNode.DisableTLS,
HTTPPostMode: true,
}, nil)
if err != nil {
return nil, errors.Wrap(err, "invalid Bitcoin node configuration")
}
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server", slog.Duration("latency", time.Since(start)))
}
// TODO: create module command package.
// each module should have its own command package and main package will routing the command to the module command package.
// Check Bitcoin RPC connection
{
start := time.Now()
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
if err := client.Ping(); err != nil {
return nil, errors.Wrapf(err, "can't connect to Bitcoin Core RPC Server %q", conf.BitcoinNode.Host)
}
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server", slog.Duration("latency", time.Since(start)))
}
// TODO: refactor module name to specific type instead of string?
httpHandlers := make(map[string]HttpHandler, 0)
return client, nil
})
var reportingClient *reportingclient.ReportingClient
if !conf.Reporting.Disabled {
reportingClient, err = reportingclient.New(conf.Reporting)
// Initialize reporting client
do.Provide(injector, func(i do.Injector) (*reportingclient.ReportingClient, error) {
conf := do.MustInvoke[config.Config](i)
if conf.Reporting.Disabled {
return nil, nil
}
reportingClient, err := reportingclient.New(conf.Reporting)
if err != nil {
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid reporting configuration", slogx.Error(err))
return nil, errors.Wrap(err, "invalid reporting configuration")
}
logger.PanicContext(ctx, "Something went wrong, can't create reporting client", slogx.Error(err))
return nil, errors.Wrap(err, "can't create reporting client")
}
}
return reportingClient, nil
})
// Initialize Bitcoin Indexer
if opts.Bitcoin {
ctx := logger.WithContext(ctx, slogx.String("module", "bitcoin"))
var (
btcDB btcdatagateway.BitcoinDataGateway
indexerInfoDB btcdatagateway.IndexerInformationDataGateway
)
switch strings.ToLower(conf.Modules.Bitcoin.Database) {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
if err != nil {
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid Postgres configuration for indexer", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
repo := btcpostgres.NewRepository(pg)
btcDB = repo
indexerInfoDB = repo
default:
return errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Bitcoin.Database)
}
if !opts.APIOnly {
processor := bitcoin.NewProcessor(conf, btcDB, indexerInfoDB)
datasource := datasources.NewBitcoinNode(client)
indexer := indexers.NewBitcoinIndexer(processor, datasource)
defer func() {
if err := indexer.ShutdownWithTimeout(shutdownTimeout); err != nil {
logger.ErrorContext(ctx, "Error during shutdown indexer", slogx.Error(err))
return
}
logger.InfoContext(ctx, "Indexer stopped gracefully")
}()
// Verify states before running Indexer
if err := processor.VerifyStates(ctx); err != nil {
return errors.WithStack(err)
}
// Run Indexer
go func() {
// stop main process if indexer stopped
defer stop()
logger.InfoContext(ctx, "Starting Gaze Indexer")
if err := indexer.Run(ctxWorker); err != nil {
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
}
}()
}
}
// Initialize Runes Indexer
if opts.Runes {
ctx := logger.WithContext(ctx, slogx.String("module", "runes"))
var (
runesDg runesdatagateway.RunesDataGateway
indexerInfoDg runesdatagateway.IndexerInfoDataGateway
)
switch strings.ToLower(conf.Modules.Runes.Database) {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
if err != nil {
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid Postgres configuration for indexer", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
runesRepo := runespostgres.NewRepository(pg)
runesDg = runesRepo
indexerInfoDg = runesRepo
default:
return errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Runes.Database)
}
var bitcoinDatasource indexers.BitcoinDatasource
var bitcoinClient btcclient.Contract
switch strings.ToLower(conf.Modules.Runes.Datasource) {
case "bitcoin-node":
bitcoinNodeDatasource := datasources.NewBitcoinNode(client)
bitcoinDatasource = bitcoinNodeDatasource
bitcoinClient = bitcoinNodeDatasource
case "database":
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
if err != nil {
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid Postgres configuration for datasource", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
btcRepo := btcpostgres.NewRepository(pg)
btcClientDB := btcclient.NewClientDatabase(btcRepo)
bitcoinDatasource = btcClientDB
bitcoinClient = btcClientDB
default:
return errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
}
if !opts.APIOnly {
processor := runes.NewProcessor(runesDg, indexerInfoDg, bitcoinClient, bitcoinDatasource, conf.Network, reportingClient)
indexer := indexers.NewBitcoinIndexer(processor, bitcoinDatasource)
defer func() {
if err := indexer.ShutdownWithTimeout(shutdownTimeout); err != nil {
logger.ErrorContext(ctx, "Error during shutdown indexer", slogx.Error(err))
return
}
logger.InfoContext(ctx, "Indexer stopped gracefully")
}()
if err := processor.VerifyStates(ctx); err != nil {
return errors.WithStack(err)
}
// Run Indexer
go func() {
// stop main process if indexer stopped
defer stop()
logger.InfoContext(ctx, "Starting Gaze Indexer")
if err := indexer.Run(ctxWorker); err != nil {
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
}
}()
}
// Mount API
apiHandlers := lo.Uniq(conf.Modules.Runes.APIHandlers)
for _, handler := range apiHandlers {
switch handler { // TODO: support more handlers (e.g. gRPC)
case "http":
runesUsecase := runesusecase.New(runesDg, bitcoinClient)
runesHTTPHandler := runesapi.NewHTTPHandler(conf.Network, runesUsecase)
httpHandlers["runes"] = runesHTTPHandler
default:
logger.PanicContext(ctx, "Something went wrong, unsupported API handler", slogx.String("handler", handler))
}
}
}
// Wait for interrupt signal to gracefully stop the server with
// Setup HTTP server if there are any HTTP handlers
if len(httpHandlers) > 0 {
// Initialize HTTP server
do.Provide(injector, func(i do.Injector) (*fiber.App, error) {
app := fiber.New(fiber.Config{
AppName: "Gaze Indexer",
ErrorHandler: errorhandler.NewHTTPErrorHandler(),
@@ -306,38 +147,64 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
Level: compress.LevelDefault,
}))
defer func() {
if err := app.ShutdownWithTimeout(shutdownTimeout); err != nil {
logger.ErrorContext(ctx, "Error during shutdown HTTP server", slogx.Error(err))
return
}
logger.InfoContext(ctx, "HTTP server stopped gracefully")
}()
// Health check
app.Get("/", func(c *fiber.Ctx) error {
return errors.WithStack(c.SendStatus(http.StatusOK))
})
// mount http handlers from each http-enabled module
for module, handler := range httpHandlers {
if err := handler.Mount(app); err != nil {
logger.PanicContext(ctx, "Something went wrong, can't mount HTTP handler", slogx.Error(err), slogx.String("module", module))
return app, nil
})
// Initialize worker context to separate worker's lifecycle from main process
ctxWorker, stopWorker := context.WithCancel(context.Background())
defer stopWorker()
// Add logger context
ctxWorker = logger.WithContext(ctxWorker, slogx.Stringer("network", conf.Network))
// Run modules
{
modules := lo.Uniq(conf.EnableModules)
modules = lo.Map(modules, func(item string, _ int) string { return strings.TrimSpace(item) })
modules = lo.Filter(modules, func(item string, _ int) bool { return item != "" })
for _, module := range modules {
ctx := logger.WithContext(ctxWorker, slogx.String("module", module))
indexer, err := do.InvokeNamed[indexer.IndexerWorker](injector, module)
if err != nil {
if errors.Is(err, do.ErrServiceNotFound) {
return errors.Errorf("Module %q is not supported", module)
}
return errors.Wrapf(err, "can't init module %q", module)
}
// Run Indexer
if !conf.APIOnly {
go func() {
// stop main process if indexer stopped
defer stop()
logger.InfoContext(ctx, "Starting Gaze Indexer")
if err := indexer.Run(ctx); err != nil {
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
}
}()
}
logger.InfoContext(ctx, "Mounted HTTP handler", slogx.String("module", module))
}
go func() {
// stop main process if API stopped
defer stop()
logger.InfoContext(ctx, "Started HTTP server", slog.Int("port", conf.HTTPServer.Port))
if err := app.Listen(fmt.Sprintf(":%d", conf.HTTPServer.Port)); err != nil {
logger.PanicContext(ctx, "Something went wrong, error during running HTTP server", slogx.Error(err))
}
}()
}
// Run API server
httpServer := do.MustInvoke[*fiber.App](injector)
go func() {
// stop main process if API stopped
defer stop()
logger.InfoContext(ctx, "Started HTTP server", slog.Int("port", conf.HTTPServer.Port))
if err := httpServer.Listen(fmt.Sprintf(":%d", conf.HTTPServer.Port)); err != nil {
logger.PanicContext(ctx, "Something went wrong, error during running HTTP server", slogx.Error(err))
}
}()
// Stop application if worker context is done
go func() {
<-ctxWorker.Done()
@@ -366,5 +233,9 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
}
}()
if err := injector.Shutdown(); err != nil {
logger.PanicContext(ctx, "Failed while gracefully shutting down", slogx.Error(err))
}
return nil
}

View File

@@ -6,15 +6,13 @@ import (
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/constants"
"github.com/gaze-network/indexer-network/modules/bitcoin"
"github.com/gaze-network/indexer-network/modules/runes"
"github.com/spf13/cobra"
)
var versions = map[string]string{
"": constants.Version,
"bitcoin": bitcoin.Version,
"runes": runes.Version,
"": constants.Version,
"runes": runes.Version,
}
type versionCmdOptions struct {
@@ -33,7 +31,7 @@ func NewVersionCommand() *cobra.Command {
}
flags := cmd.Flags()
flags.StringVar(&opts.Modules, "module", "", `Show version of a specific module. E.g. "bitcoin" | "runes"`)
flags.StringVar(&opts.Modules, "module", "", `Show version of a specific module. E.g. "runes"`)
return cmd
}

View File

@@ -17,7 +17,6 @@ import (
type migrateDownCmdOptions struct {
DatabaseURL string
Bitcoin bool
Runes bool
All bool
}
@@ -60,7 +59,6 @@ func NewMigrateDownCommand() *cobra.Command {
}
flags := cmd.Flags()
flags.BoolVar(&opts.Bitcoin, "bitcoin", false, "Apply Bitcoin down migrations")
flags.BoolVar(&opts.Runes, "runes", false, "Apply Runes down migrations")
flags.StringVar(&opts.DatabaseURL, "database", "", "Database url to run migration on")
flags.BoolVar(&opts.All, "all", false, "Confirm apply ALL down migrations without prompt")
@@ -118,11 +116,6 @@ func migrateDownHandler(opts *migrateDownCmdOptions, _ *cobra.Command, args migr
return nil
}
if opts.Bitcoin {
if err := applyDownMigrations("Bitcoin", bitcoinMigrationSource, "bitcoin_schema_migrations"); err != nil {
return errors.WithStack(err)
}
}
if opts.Runes {
if err := applyDownMigrations("Runes", runesMigrationSource, "runes_schema_migrations"); err != nil {
return errors.WithStack(err)

View File

@@ -16,7 +16,6 @@ import (
type migrateUpCmdOptions struct {
DatabaseURL string
Bitcoin bool
Runes bool
}
@@ -55,7 +54,6 @@ func NewMigrateUpCommand() *cobra.Command {
}
flags := cmd.Flags()
flags.BoolVar(&opts.Bitcoin, "bitcoin", false, "Apply Bitcoin up migrations")
flags.BoolVar(&opts.Runes, "runes", false, "Apply Runes up migrations")
flags.StringVar(&opts.DatabaseURL, "database", "", "Database url to run migration on")
@@ -103,11 +101,6 @@ func migrateUpHandler(opts *migrateUpCmdOptions, _ *cobra.Command, args migrateU
return nil
}
if opts.Bitcoin {
if err := applyUpMigrations("Bitcoin", bitcoinMigrationSource, "bitcoin_schema_migrations"); err != nil {
return errors.WithStack(err)
}
}
if opts.Runes {
if err := applyUpMigrations("Runes", runesMigrationSource, "runes_schema_migrations"); err != nil {
return errors.WithStack(err)

View File

@@ -3,8 +3,7 @@ package migrate
import "net/url"
const (
bitcoinMigrationSource = "modules/bitcoin/database/postgresql/migrations"
runesMigrationSource = "modules/runes/database/postgresql/migrations"
runesMigrationSource = "modules/runes/database/postgresql/migrations"
)
func cloneURLWithQuery(u *url.URL, newQuery url.Values) *url.URL {

View File

@@ -26,21 +26,10 @@ http_server:
# Meta-protocol modules configuration options.
modules:
# Configuration options for Bitcoin module. Can be removed if not used.
bitcoin:
database: "postgres" # Database to store bitcoin data. current supported databases: "postgres"
postgres:
host: "localhost"
port: 5432
user: "postgres"
password: "password"
db_name: "postgres"
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database credentials above.
# Configuration options for Runes module. Can be removed if not used.
runes:
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
datasource: "database" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node" | "database". If "database" is used, it will use the database config in bitcoin module as datasource.
datasource: "database" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node".
api_handlers: # API handlers to enable. current supported handlers: "http"
- http
postgres:

View File

@@ -24,7 +24,7 @@ const (
)
// Make sure to implement the BitcoinDatasource interface
var _ Datasource[[]*types.Block] = (*BitcoinNodeDatasource)(nil)
var _ Datasource[*types.Block] = (*BitcoinNodeDatasource)(nil)
// BitcoinNodeDatasource fetch data from Bitcoin node for Bitcoin Indexer
type BitcoinNodeDatasource struct {

View File

@@ -10,7 +10,7 @@ import (
// Datasource is an interface for indexer data sources.
type Datasource[T any] interface {
Name() string
Fetch(ctx context.Context, from, to int64) (T, error)
FetchAsync(ctx context.Context, from, to int64, ch chan<- T) (*subscription.ClientSubscription[T], error)
Fetch(ctx context.Context, from, to int64) ([]T, error)
FetchAsync(ctx context.Context, from, to int64, ch chan<- []T) (*subscription.ClientSubscription[[]T], error)
GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error)
}

View File

@@ -1,4 +1,4 @@
package indexers
package indexer
import (
"context"
@@ -16,20 +16,15 @@ import (
const (
maxReorgLookBack = 1000
// pollingInterval is the default polling interval for the indexer polling worker
pollingInterval = 15 * time.Second
)
type (
BitcoinProcessor Processor[[]*types.Block]
BitcoinDatasource datasources.Datasource[[]*types.Block]
)
// Make sure to implement the IndexerWorker interface
var _ IndexerWorker = (*BitcoinIndexer)(nil)
// BitcoinIndexer is the polling indexer for sync Bitcoin data to the database.
type BitcoinIndexer struct {
Processor BitcoinProcessor
Datasource BitcoinDatasource
// Indexer generic indexer for fetching and processing data
type Indexer[T Input] struct {
Processor Processor[T]
Datasource datasources.Datasource[T]
currentBlock types.BlockHeader
quitOnce sync.Once
@@ -37,9 +32,9 @@ type BitcoinIndexer struct {
done chan struct{}
}
// NewBitcoinIndexer create new BitcoinIndexer
func NewBitcoinIndexer(processor BitcoinProcessor, datasource BitcoinDatasource) *BitcoinIndexer {
return &BitcoinIndexer{
// New create new generic indexer
func New[T Input](processor Processor[T], datasource datasources.Datasource[T]) *Indexer[T] {
return &Indexer[T]{
Processor: processor,
Datasource: datasource,
@@ -48,21 +43,17 @@ func NewBitcoinIndexer(processor BitcoinProcessor, datasource BitcoinDatasource)
}
}
func (*BitcoinIndexer) Type() string {
return "bitcoin"
}
func (i *BitcoinIndexer) Shutdown() error {
func (i *Indexer[T]) Shutdown() error {
return i.ShutdownWithContext(context.Background())
}
func (i *BitcoinIndexer) ShutdownWithTimeout(timeout time.Duration) error {
func (i *Indexer[T]) ShutdownWithTimeout(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return i.ShutdownWithContext(ctx)
}
func (i *BitcoinIndexer) ShutdownWithContext(ctx context.Context) (err error) {
func (i *Indexer[T]) ShutdownWithContext(ctx context.Context) (err error) {
i.quitOnce.Do(func() {
close(i.quit)
select {
@@ -76,12 +67,11 @@ func (i *BitcoinIndexer) ShutdownWithContext(ctx context.Context) (err error) {
return
}
func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
func (i *Indexer[T]) Run(ctx context.Context) (err error) {
defer close(i.done)
ctx = logger.WithContext(ctx,
slog.String("package", "indexers"),
slog.String("indexer", i.Type()),
slog.String("processor", i.Processor.Name()),
slog.String("datasource", i.Datasource.Name()),
)
@@ -114,15 +104,15 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
}
}
func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
func (i *Indexer[T]) process(ctx context.Context) (err error) {
// height range to fetch data
from, to := i.currentBlock.Height+1, int64(-1)
logger.InfoContext(ctx, "Start fetching bitcoin blocks", slog.Int64("from", from))
ch := make(chan []*types.Block)
logger.InfoContext(ctx, "Start fetching input data", slog.Int64("from", from))
ch := make(chan []T)
subscription, err := i.Datasource.FetchAsync(ctx, from, to, ch)
if err != nil {
return errors.Wrap(err, "failed to fetch data")
return errors.Wrap(err, "failed to fetch input data")
}
defer subscription.Unsubscribe()
@@ -130,21 +120,24 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
select {
case <-i.quit:
return nil
case blocks := <-ch:
// empty blocks
if len(blocks) == 0 {
case inputs := <-ch:
// empty inputs
if len(inputs) == 0 {
continue
}
firstInput := inputs[0]
firstInputHeader := firstInput.BlockHeader()
startAt := time.Now()
ctx := logger.WithContext(ctx,
slogx.Int64("from", blocks[0].Header.Height),
slogx.Int64("to", blocks[len(blocks)-1].Header.Height),
slogx.Int64("from", firstInputHeader.Height),
slogx.Int64("to", inputs[len(inputs)-1].BlockHeader().Height),
)
// validate reorg from first block
// validate reorg from first input
{
remoteBlockHeader := blocks[0].Header
remoteBlockHeader := firstInputHeader
if !remoteBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
logger.WarnContext(ctx, "Detected chain reorganization. Searching for fork point...",
slogx.String("event", "reorg_detected"),
@@ -210,33 +203,35 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
}
}
// validate is block is continuous and no reorg
for i := 1; i < len(blocks); i++ {
if blocks[i].Header.Height != blocks[i-1].Header.Height+1 {
return errors.Wrapf(errs.InternalError, "block is not continuous, block[%d] height: %d, block[%d] height: %d", i-1, blocks[i-1].Header.Height, i, blocks[i].Header.Height)
// validate is input is continuous and no reorg
for i := 1; i < len(inputs); i++ {
header := inputs[i].BlockHeader()
prevHeader := inputs[i-1].BlockHeader()
if header.Height != prevHeader.Height+1 {
return errors.Wrapf(errs.InternalError, "input is not continuous, input[%d] height: %d, input[%d] height: %d", i-1, prevHeader.Height, i, header.Height)
}
if !blocks[i].Header.PrevBlock.IsEqual(&blocks[i-1].Header.Hash) {
logger.WarnContext(ctx, "Chain Reorganization occurred in the middle of batch fetching blocks, need to try to fetch again")
if !header.PrevBlock.IsEqual(&prevHeader.Hash) {
logger.WarnContext(ctx, "Chain Reorganization occurred in the middle of batch fetching inputs, need to try to fetch again")
// end current round
return nil
}
}
ctx = logger.WithContext(ctx, slog.Int("total_blocks", len(blocks)))
ctx = logger.WithContext(ctx, slog.Int("total_inputs", len(inputs)))
// Start processing blocks
logger.InfoContext(ctx, "Processing blocks")
if err := i.Processor.Process(ctx, blocks); err != nil {
// Start processing input
logger.InfoContext(ctx, "Processing inputs")
if err := i.Processor.Process(ctx, inputs); err != nil {
return errors.WithStack(err)
}
// Update current state
i.currentBlock = blocks[len(blocks)-1].Header
i.currentBlock = inputs[len(inputs)-1].BlockHeader()
logger.InfoContext(ctx, "Processed blocks successfully",
slogx.String("event", "processed_blocks"),
logger.InfoContext(ctx, "Processed inputs successfully",
slogx.String("event", "processed_inputs"),
slogx.Int64("current_block", i.currentBlock.Height),
slogx.Duration("duration", time.Since(startAt)),
)

View File

@@ -1,4 +1,4 @@
package indexers
package indexer
import (
"context"
@@ -7,24 +7,15 @@ import (
"github.com/gaze-network/indexer-network/core/types"
)
const (
// pollingInterval is the default polling interval for the indexer polling worker
pollingInterval = 15 * time.Second
)
type IndexerWorker interface {
Type() string
Run(ctx context.Context) error
Shutdown() error
ShutdownWithTimeout(timeout time.Duration) error
ShutdownWithContext(ctx context.Context) error
type Input interface {
BlockHeader() types.BlockHeader
}
type Processor[T any] interface {
type Processor[T Input] interface {
Name() string
// Process processes the input data and indexes it.
Process(ctx context.Context, inputs T) error
Process(ctx context.Context, inputs []T) error
// CurrentBlock returns the latest indexed block header.
CurrentBlock(ctx context.Context) (types.BlockHeader, error)
@@ -39,3 +30,10 @@ type Processor[T any] interface {
// to ensure the last shutdown was graceful and no missing data.
VerifyStates(ctx context.Context) error
}
type IndexerWorker interface {
Shutdown() error
ShutdownWithTimeout(timeout time.Duration) error
ShutdownWithContext(ctx context.Context) (err error)
Run(ctx context.Context) (err error)
}

View File

@@ -38,6 +38,10 @@ type Block struct {
Transactions []*Transaction
}
func (b *Block) BlockHeader() BlockHeader {
return b.Header
}
func ParseMsgBlock(src *wire.MsgBlock, height int64) *Block {
hash := src.Header.BlockHash()
return &Block{

4
go.mod
View File

@@ -11,10 +11,10 @@ require (
github.com/gaze-network/uint128 v1.3.0
github.com/gofiber/fiber/v2 v2.52.4
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/jackc/pgx v3.6.2+incompatible
github.com/jackc/pgx/v5 v5.5.5
github.com/mcosta74/pgx-slog v0.3.0
github.com/planxnx/concurrent-stream v0.1.5
github.com/samber/do/v2 v2.0.0-beta.7
github.com/samber/lo v1.39.0
github.com/shopspring/decimal v1.3.1
github.com/spf13/cobra v1.8.0
@@ -47,6 +47,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx v3.6.2+incompatible // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
@@ -64,6 +65,7 @@ require (
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/samber/go-type-to-string v1.4.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect

6
go.sum
View File

@@ -191,6 +191,10 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/samber/do/v2 v2.0.0-beta.7 h1:tmdLOVSCbTA6uGWLU5poi/nZvMRh5QxXFJ9vHytU+Jk=
github.com/samber/do/v2 v2.0.0-beta.7/go.mod h1:+LpV3vu4L81Q1JMZNSkMvSkW9lt4e5eJoXoZHkeBS4c=
github.com/samber/go-type-to-string v1.4.0 h1:KXphToZgiFdnJQxryU25brhlh/CqY/cwJVeX2rfmow0=
github.com/samber/go-type-to-string v1.4.0/go.mod h1:jpU77vIDoIxkahknKDoEx9C8bQ1ADnh2sotZ8I4QqBU=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
@@ -231,6 +235,8 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

View File

@@ -8,7 +8,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
btcconfig "github.com/gaze-network/indexer-network/modules/bitcoin/config"
runesconfig "github.com/gaze-network/indexer-network/modules/runes/config"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
@@ -25,20 +24,31 @@ var (
Output: "TEXT",
},
Network: common.NetworkMainnet,
HTTPServer: HTTPServerConfig{
Port: 8080,
},
BitcoinNode: BitcoinNodeClient{
User: "user",
Pass: "pass",
},
Modules: Modules{
Runes: runesconfig.Config{
Datasource: "bitcoin-node",
Database: "postgres",
},
},
}
)
type Config struct {
Logger logger.Config `mapstructure:"logger"`
BitcoinNode BitcoinNodeClient `mapstructure:"bitcoin_node"`
Network common.Network `mapstructure:"network"`
HTTPServer HTTPServerConfig `mapstructure:"http_server"`
Modules Modules `mapstructure:"modules"`
Reporting reportingclient.Config `mapstructure:"reporting"`
EnableModules []string `mapstructure:"enable_modules"`
APIOnly bool `mapstructure:"api_only"`
Logger logger.Config `mapstructure:"logger"`
BitcoinNode BitcoinNodeClient `mapstructure:"bitcoin_node"`
Network common.Network `mapstructure:"network"`
HTTPServer HTTPServerConfig `mapstructure:"http_server"`
Modules Modules `mapstructure:"modules"`
Reporting reportingclient.Config `mapstructure:"reporting"`
}
type BitcoinNodeClient struct {
@@ -49,8 +59,7 @@ type BitcoinNodeClient struct {
}
type Modules struct {
Bitcoin btcconfig.Config `mapstructure:"bitcoin"`
Runes runesconfig.Config `mapstructure:"runes"`
Runes runesconfig.Config `mapstructure:"runes"`
}
type HTTPServerConfig struct {

View File

@@ -7,7 +7,6 @@ import (
"syscall"
"github.com/gaze-network/indexer-network/cmd"
_ "go.uber.org/automaxprocs"
)
func main() {

View File

@@ -1,244 +0,0 @@
package btcclient
import (
"context"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/datasources"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/subscription"
"github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
cstream "github.com/planxnx/concurrent-stream"
"github.com/samber/lo"
)
// TODO: Refactor this, datasources.BitcoinNode and This package is the same.
const (
blockStreamChunkSize = 100
)
// Make sure to implement the BitcoinDatasource interface
var _ datasources.Datasource[[]*types.Block] = (*ClientDatabase)(nil)
// ClientDatabase is a client to connect to the bitcoin database.
type ClientDatabase struct {
bitcoinDg datagateway.BitcoinDataGateway
}
func NewClientDatabase(bitcoinDg datagateway.BitcoinDataGateway) *ClientDatabase {
return &ClientDatabase{
bitcoinDg: bitcoinDg,
}
}
func (d ClientDatabase) Name() string {
return "bitcoin_database"
}
func (d *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
ch := make(chan []*types.Block)
subscription, err := d.FetchAsync(ctx, from, to, ch)
if err != nil {
return nil, errors.WithStack(err)
}
defer subscription.Unsubscribe()
blocks := make([]*types.Block, 0)
for {
select {
case b, ok := <-ch:
if !ok {
return blocks, nil
}
blocks = append(blocks, b...)
case <-subscription.Done():
if err := ctx.Err(); err != nil {
return nil, errors.Wrap(err, "context done")
}
return blocks, nil
case err := <-subscription.Err():
if err != nil {
return nil, errors.Wrap(err, "got error while fetch async")
}
return blocks, nil
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "context done")
}
}
}
func (d *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
ctx = logger.WithContext(ctx,
slogx.String("package", "datasources"),
slogx.String("datasource", d.Name()),
)
from, to, skip, err := d.prepareRange(ctx, from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
}
subscription := subscription.NewSubscription(ch)
if skip {
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
return subscription.Client(), nil
}
// Create parallel stream
out := make(chan []*types.Block)
stream := cstream.NewStream(ctx, 8, out)
// create slice of block height to fetch
blockHeights := make([]int64, 0, to-from+1)
for i := from; i <= to; i++ {
blockHeights = append(blockHeights, i)
}
// Wait for stream to finish and close out channel
go func() {
defer close(out)
_ = stream.Wait()
}()
// Fan-out blocks to subscription channel
go func() {
defer func() {
// add a bit delay to prevent shutdown before client receive all blocks
time.Sleep(100 * time.Millisecond)
subscription.Unsubscribe()
}()
for {
select {
case data, ok := <-out:
// stream closed
if !ok {
return
}
// empty blocks
if len(data) == 0 {
continue
}
// send blocks to subscription channel
if err := subscription.Send(ctx, data); err != nil {
if errors.Is(err, errs.Closed) {
return
}
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
slogx.Int64("start", data[0].Header.Height),
slogx.Int64("end", data[len(data)-1].Header.Height),
slogx.Error(err),
)
}
case <-ctx.Done():
return
}
}
}()
// Parallel fetch blocks from Bitcoin node until complete all block heights
// or subscription is done.
go func() {
defer stream.Close()
done := subscription.Done()
chunks := lo.Chunk(blockHeights, blockStreamChunkSize)
for _, chunk := range chunks {
chunk := chunk
select {
case <-done:
return
case <-ctx.Done():
return
default:
if len(chunk) == 0 {
continue
}
stream.Go(func() []*types.Block {
startAt := time.Now()
defer func() {
logger.DebugContext(ctx, "Fetched chunk of blocks from Bitcoin node",
slogx.Int("total_blocks", len(chunk)),
slogx.Int64("from", chunk[0]),
slogx.Int64("to", chunk[len(chunk)-1]),
slogx.Duration("duration", time.Since(startAt)),
)
}()
fromHeight, toHeight := chunk[0], chunk[len(chunk)-1]
blocks, err := d.bitcoinDg.GetBlocksByHeightRange(ctx, fromHeight, toHeight)
if err != nil {
logger.ErrorContext(ctx, "Can't get block data from Bitcoin database",
slogx.Error(err),
slogx.Int64("from", fromHeight),
slogx.Int64("to", toHeight),
)
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get blocks: from_height: %d, to_height: %d", fromHeight, toHeight)); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return nil
}
return blocks
})
}
}
}()
return subscription.Client(), nil
}
func (c *ClientDatabase) GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error) {
header, err := c.bitcoinDg.GetBlockHeaderByHeight(ctx, height)
if err != nil {
return types.BlockHeader{}, errors.WithStack(err)
}
return header, nil
}
func (c *ClientDatabase) prepareRange(ctx context.Context, fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
start = fromHeight
end = toHeight
// get current bitcoin block height
latestBlock, err := c.bitcoinDg.GetLatestBlockHeader(ctx)
if err != nil {
return -1, -1, false, errors.Wrap(err, "failed to get block count")
}
// set start to genesis block height
if start < 0 {
start = 0
}
// set end to current bitcoin block height if
// - end is -1
// - end is greater that current bitcoin block height
if end < 0 || end > latestBlock.Height {
end = latestBlock.Height
}
// if start is greater than end, skip this round
if start > end {
return -1, -1, true, nil
}
return start, end, false, nil
}
// GetTransactionByHash returns a transaction with the given hash. Returns errs.NotFound if transaction does not exist.
func (c *ClientDatabase) GetTransactionByHash(ctx context.Context, txHash chainhash.Hash) (*types.Transaction, error) {
tx, err := c.bitcoinDg.GetTransactionByHash(ctx, txHash)
if err != nil {
return nil, errors.Wrap(err, "failed to get transaction by hash")
}
return tx, nil
}

View File

@@ -1,8 +0,0 @@
package config
import "github.com/gaze-network/indexer-network/internal/postgres"
type Config struct {
Database string `mapstructure:"database"` // Database to store bitcoin data.
Postgres postgres.Config `mapstructure:"postgres"`
}

View File

@@ -1,26 +0,0 @@
package bitcoin
import (
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/core/types"
)
const (
Version = "v0.0.1"
DBVersion = 1
)
var (
// defaultCurrentBlockHeight is the default value for the current block height for first time indexing
defaultCurrentBlock = types.BlockHeader{
Hash: common.ZeroHash,
Height: -1,
}
lastV1Block = types.BlockHeader{
Hash: *utils.Must(chainhash.NewHashFromStr("00000000000001aa077d7aa84c532a4d69bdbff519609d1da0835261b7a74eb6")),
Height: 227835,
}
)

View File

@@ -1,18 +0,0 @@
BEGIN;
-- DROP INDEX
DROP INDEX IF EXISTS bitcoin_blocks_block_hash_idx;
DROP INDEX IF EXISTS bitcoin_transactions_tx_hash_idx;
DROP INDEX IF EXISTS bitcoin_transactions_block_hash_idx;
DROP INDEX IF EXISTS bitcoin_transaction_txouts_pkscript_idx;
DROP INDEX IF EXISTS bitcoin_transaction_txins_prevout_idx;
-- DROP TABLE
DROP TABLE IF EXISTS "bitcoin_indexer_stats";
DROP TABLE IF EXISTS "bitcoin_indexer_db_version";
DROP TABLE IF EXISTS "bitcoin_transaction_txins";
DROP TABLE IF EXISTS "bitcoin_transaction_txouts";
DROP TABLE IF EXISTS "bitcoin_transactions";
DROP TABLE IF EXISTS "bitcoin_blocks";
COMMIT;

View File

@@ -1,72 +0,0 @@
BEGIN;
-- Indexer Client Information
CREATE TABLE IF NOT EXISTS "bitcoin_indexer_stats" (
"id" BIGSERIAL PRIMARY KEY,
"client_version" TEXT NOT NULL,
"network" TEXT NOT NULL,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS "bitcoin_indexer_db_version" (
"id" BIGSERIAL PRIMARY KEY,
"version" INT NOT NULL,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO "bitcoin_indexer_db_version" ("version") VALUES (1);
-- Bitcoin Data
CREATE TABLE IF NOT EXISTS "bitcoin_blocks" (
"block_height" INT NOT NULL PRIMARY KEY,
"block_hash" TEXT NOT NULL,
"version" INT NOT NULL,
"merkle_root" TEXT NOT NULL,
"prev_block_hash" TEXT NOT NULL,
"timestamp" TIMESTAMP WITH TIME ZONE NOT NULL,
"bits" BIGINT NOT NULL,
"nonce" BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS bitcoin_blocks_block_hash_idx ON "bitcoin_blocks" USING HASH ("block_hash");
CREATE TABLE IF NOT EXISTS "bitcoin_transactions" (
"tx_hash" TEXT NOT NULL, -- can't use as primary key because block v1 has duplicate tx hashes (coinbase tx). See: https://github.com/bitcoin/bitcoin/commit/a206b0ea12eb4606b93323268fc81a4f1f952531
"version" INT NOT NULL,
"locktime" BIGINT NOT NULL,
"block_height" INT NOT NULL,
"block_hash" TEXT NOT NULL,
"idx" INT NOT NULL,
PRIMARY KEY ("block_height", "idx")
);
CREATE INDEX IF NOT EXISTS bitcoin_transactions_tx_hash_idx ON "bitcoin_transactions" USING HASH ("tx_hash");
CREATE INDEX IF NOT EXISTS bitcoin_transactions_block_hash_idx ON "bitcoin_transactions" USING HASH ("block_hash");
CREATE TABLE IF NOT EXISTS "bitcoin_transaction_txouts" (
"tx_hash" TEXT NOT NULL,
"tx_idx" INT NOT NULL,
"pkscript" TEXT NOT NULL, -- Hex String
"value" BIGINT NOT NULL,
"is_spent" BOOLEAN NOT NULL DEFAULT false,
PRIMARY KEY ("tx_hash", "tx_idx")
);
CREATE INDEX IF NOT EXISTS bitcoin_transaction_txouts_pkscript_idx ON "bitcoin_transaction_txouts" USING HASH ("pkscript");
CREATE TABLE IF NOT EXISTS "bitcoin_transaction_txins" (
"tx_hash" TEXT NOT NULL,
"tx_idx" INT NOT NULL,
"prevout_tx_hash" TEXT NOT NULL,
"prevout_tx_idx" INT NOT NULL,
"prevout_pkscript" TEXT NULL, -- Hex String, Can be NULL if the prevout is a coinbase transaction
"scriptsig" TEXT NOT NULL, -- Hex String
"witness" TEXT NOT NULL DEFAULT '', -- Hex String
"sequence" BIGINT NOT NULL,
PRIMARY KEY ("tx_hash", "tx_idx")
);
CREATE INDEX IF NOT EXISTS bitcoin_transaction_txins_prevout_idx ON "bitcoin_transaction_txins" USING BTREE ("prevout_tx_hash", "prevout_tx_idx");
COMMIT;

View File

@@ -1,99 +0,0 @@
-- name: GetLatestBlockHeader :one
SELECT * FROM bitcoin_blocks ORDER BY block_height DESC LIMIT 1;
-- name: InsertBlock :exec
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce") VALUES ($1, $2, $3, $4, $5, $6, $7, $8);
-- name: BatchInsertBlocks :exec
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce")
VALUES (
unnest(@block_height_arr::INT[]),
unnest(@block_hash_arr::TEXT[]),
unnest(@version_arr::INT[]),
unnest(@merkle_root_arr::TEXT[]),
unnest(@prev_block_hash_arr::TEXT[]),
unnest(@timestamp_arr::TIMESTAMP WITH TIME ZONE[]), -- or use TIMESTAMPTZ
unnest(@bits_arr::BIGINT[]),
unnest(@nonce_arr::BIGINT[])
);
-- name: BatchInsertTransactions :exec
INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height","block_hash","idx")
VALUES (
unnest(@tx_hash_arr::TEXT[]),
unnest(@version_arr::INT[]),
unnest(@locktime_arr::BIGINT[]),
unnest(@block_height_arr::INT[]),
unnest(@block_hash_arr::TEXT[]),
unnest(@idx_arr::INT[])
);
-- name: BatchInsertTransactionTxIns :exec
WITH update_txout AS (
UPDATE "bitcoin_transaction_txouts"
SET "is_spent" = true
FROM (SELECT unnest(@prevout_tx_hash_arr::TEXT[]) as tx_hash, unnest(@prevout_tx_idx_arr::INT[]) as tx_idx) as txin
WHERE "bitcoin_transaction_txouts"."tx_hash" = txin.tx_hash AND "bitcoin_transaction_txouts"."tx_idx" = txin.tx_idx AND "is_spent" = false
RETURNING "bitcoin_transaction_txouts"."tx_hash", "bitcoin_transaction_txouts"."tx_idx", "pkscript"
), prepare_insert AS (
SELECT input.tx_hash, input.tx_idx, prevout_tx_hash, prevout_tx_idx, update_txout.pkscript as prevout_pkscript, scriptsig, witness, sequence
FROM (
SELECT
unnest(@tx_hash_arr::TEXT[]) as tx_hash,
unnest(@tx_idx_arr::INT[]) as tx_idx,
unnest(@prevout_tx_hash_arr::TEXT[]) as prevout_tx_hash,
unnest(@prevout_tx_idx_arr::INT[]) as prevout_tx_idx,
unnest(@scriptsig_arr::TEXT[]) as scriptsig,
unnest(@witness_arr::TEXT[]) as witness,
unnest(@sequence_arr::INT[]) as sequence
) input LEFT JOIN update_txout ON "update_txout"."tx_hash" = "input"."prevout_tx_hash" AND "update_txout"."tx_idx" = "input"."prevout_tx_idx"
)
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx", "prevout_pkscript","scriptsig","witness","sequence")
SELECT "tx_hash", "tx_idx", "prevout_tx_hash", "prevout_tx_idx", "prevout_pkscript", "scriptsig", "witness", "sequence" FROM prepare_insert;
-- name: BatchInsertTransactionTxOuts :exec
INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value")
VALUES (
unnest(@tx_hash_arr::TEXT[]),
unnest(@tx_idx_arr::INT[]),
unnest(@pkscript_arr::TEXT[]),
unnest(@value_arr::BIGINT[])
);
-- name: RevertData :exec
WITH delete_tx AS (
DELETE FROM "bitcoin_transactions" WHERE "block_height" >= @from_height
RETURNING "tx_hash"
), delete_txin AS (
DELETE FROM "bitcoin_transaction_txins" WHERE "tx_hash" = ANY(SELECT "tx_hash" FROM delete_tx)
RETURNING "prevout_tx_hash", "prevout_tx_idx"
), delete_txout AS (
DELETE FROM "bitcoin_transaction_txouts" WHERE "tx_hash" = ANY(SELECT "tx_hash" FROM delete_tx)
RETURNING "tx_hash", "tx_idx"
), revert_txout_spent AS (
UPDATE "bitcoin_transaction_txouts"
SET "is_spent" = false
WHERE
("tx_hash", "tx_idx") IN (SELECT "prevout_tx_hash", "prevout_tx_idx" FROM delete_txin) AND
("tx_hash", "tx_idx") NOT IN (SELECT "tx_hash", "tx_idx" FROM delete_txout) -- avoid to modified same row twice (modified the same row twice in a single statement is not supported)
RETURNING NULL
)
DELETE FROM "bitcoin_blocks" WHERE "bitcoin_blocks"."block_height" >= @from_height;
-- name: GetBlockByHeight :one
SELECT * FROM bitcoin_blocks WHERE block_height = $1;
-- name: GetBlocksByHeightRange :many
SELECT * FROM bitcoin_blocks WHERE block_height >= @from_height AND block_height <= @to_height ORDER BY block_height ASC;
-- name: GetTransactionsByHeightRange :many
SELECT * FROM bitcoin_transactions WHERE block_height >= @from_height AND block_height <= @to_height;
-- name: GetTransactionByHash :one
SELECT * FROM bitcoin_transactions WHERE tx_hash = $1;
-- name: GetTransactionTxOutsByTxHashes :many
SELECT * FROM bitcoin_transaction_txouts WHERE tx_hash = ANY(@tx_hashes::TEXT[]);
-- name: GetTransactionTxInsByTxHashes :many
SELECT * FROM bitcoin_transaction_txins WHERE tx_hash = ANY(@tx_hashes::TEXT[]);

View File

@@ -1,8 +0,0 @@
-- name: GetCurrentDBVersion :one
SELECT "version" FROM bitcoin_indexer_db_version ORDER BY id DESC LIMIT 1;
-- name: GetCurrentIndexerStats :one
SELECT "client_version", "network" FROM bitcoin_indexer_stats ORDER BY id DESC LIMIT 1;
-- name: UpdateIndexerStats :exec
INSERT INTO bitcoin_indexer_stats (client_version, network) VALUES ($1, $2);

View File

@@ -1,25 +0,0 @@
package datagateway
import (
"context"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/gaze-network/indexer-network/core/types"
)
type BitcoinDataGateway interface {
BitcoinWriterDataDataGateway
BitcoinReaderDataDataGateway
}
type BitcoinWriterDataDataGateway interface {
InsertBlocks(ctx context.Context, blocks []*types.Block) error
RevertBlocks(context.Context, int64) error
}
type BitcoinReaderDataDataGateway interface {
GetLatestBlockHeader(context.Context) (types.BlockHeader, error)
GetBlockHeaderByHeight(ctx context.Context, blockHeight int64) (types.BlockHeader, error)
GetBlocksByHeightRange(ctx context.Context, from int64, to int64) ([]*types.Block, error)
GetTransactionByHash(ctx context.Context, txHash chainhash.Hash) (*types.Transaction, error)
}

View File

@@ -1,13 +0,0 @@
package datagateway
import (
"context"
"github.com/gaze-network/indexer-network/common"
)
type IndexerInformationDataGateway interface {
GetCurrentDBVersion(ctx context.Context) (int32, error)
GetLatestIndexerStats(ctx context.Context) (version string, network common.Network, err error)
UpdateIndexerStats(ctx context.Context, clientVersion string, network common.Network) error
}

View File

@@ -1,122 +0,0 @@
package bitcoin
import (
"context"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/indexers"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/config"
"github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
)
// Make sure to implement the BitcoinProcessor interface
var _ indexers.BitcoinProcessor = (*Processor)(nil)
type Processor struct {
config config.Config
bitcoinDg datagateway.BitcoinDataGateway
indexerInfoDg datagateway.IndexerInformationDataGateway
}
func NewProcessor(config config.Config, bitcoinDg datagateway.BitcoinDataGateway, indexerInfoDg datagateway.IndexerInformationDataGateway) *Processor {
return &Processor{
config: config,
bitcoinDg: bitcoinDg,
indexerInfoDg: indexerInfoDg,
}
}
func (p Processor) Name() string {
return "bitcoin"
}
func (p *Processor) Process(ctx context.Context, inputs []*types.Block) error {
if len(inputs) == 0 {
return nil
}
// Process the given blocks before inserting to the database
inputs, err := p.process(ctx, inputs)
if err != nil {
return errors.WithStack(err)
}
// Insert blocks
if err := p.bitcoinDg.InsertBlocks(ctx, inputs); err != nil {
return errors.Wrapf(err, "error during insert blocks, from: %d, to: %d", inputs[0].Header.Height, inputs[len(inputs)-1].Header.Height)
}
return nil
}
func (p *Processor) CurrentBlock(ctx context.Context) (types.BlockHeader, error) {
b, err := p.bitcoinDg.GetLatestBlockHeader(ctx)
if err != nil {
if errors.Is(err, errs.NotFound) {
return defaultCurrentBlock, nil
}
return types.BlockHeader{}, errors.WithStack(err)
}
return b, nil
}
func (p *Processor) GetIndexedBlock(ctx context.Context, height int64) (types.BlockHeader, error) {
header, err := p.bitcoinDg.GetBlockHeaderByHeight(ctx, height)
if err != nil {
return types.BlockHeader{}, errors.WithStack(err)
}
return header, nil
}
func (p *Processor) RevertData(ctx context.Context, from int64) error {
// to prevent remove txin/txout of duplicated coinbase transaction in the blocks 91842 and 91880
// if you really want to revert the data before the block `227835`, you should reset the database and reindex the data instead.
if from <= lastV1Block.Height {
return errors.Wrapf(errs.InvalidArgument, "can't revert data before block version 2, height: %d", lastV1Block.Height)
}
if err := p.bitcoinDg.RevertBlocks(ctx, from); err != nil {
return errors.WithStack(err)
}
return nil
}
func (p *Processor) VerifyStates(ctx context.Context) error {
// Check current db version with the required db version
{
dbVersion, err := p.indexerInfoDg.GetCurrentDBVersion(ctx)
if err != nil {
return errors.Wrap(err, "can't get current db version")
}
if dbVersion != DBVersion {
return errors.Wrapf(errs.ConflictSetting, "db version mismatch, please upgrade to version %d", DBVersion)
}
}
// Check if the latest indexed network is mismatched with configured network
{
_, network, err := p.indexerInfoDg.GetLatestIndexerStats(ctx)
if err != nil {
if errors.Is(err, errs.NotFound) {
goto end
}
return errors.Wrap(err, "can't get latest indexer stats")
}
if network != p.config.Network {
return errors.Wrapf(errs.ConflictSetting, "network mismatch, latest indexed network: %q, configured network: %q. If you want to change the network, please reset the database", network, p.config.Network)
}
}
// TODO: Verify the states of the indexed data to ensure the last shutdown was graceful and no missing data.
end:
if err := p.indexerInfoDg.UpdateIndexerStats(ctx, Version, p.config.Network); err != nil {
return errors.Wrap(err, "can't update indexer stats")
}
return nil
}

View File

@@ -1,91 +0,0 @@
package bitcoin
import (
"cmp"
"context"
"slices"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/core/types"
)
// process is a processing rules for the given blocks before inserting to the database
//
// this function will modify the given data directly.
func (p *Processor) process(ctx context.Context, blocks []*types.Block) ([]*types.Block, error) {
if len(blocks) == 0 {
return blocks, nil
}
// Sort ASC by block height
slices.SortFunc(blocks, func(t1, t2 *types.Block) int {
return cmp.Compare(t1.Header.Height, t2.Header.Height)
})
if !p.isContinueFromLatestIndexedBlock(ctx, blocks[0]) {
return nil, errors.New("given blocks are not continue from the latest indexed block")
}
if !p.isBlocksSequential(blocks) {
return nil, errors.New("given blocks are not in sequence")
}
p.removeDuplicateCoinbaseTxInputsOutputs(blocks)
return blocks, nil
}
// check if the given blocks are continue from the latest indexed block
// to prevent inserting out-of-order blocks or duplicate blocks
func (p *Processor) isBlocksSequential(blocks []*types.Block) bool {
if len(blocks) == 0 {
return true
}
for i, block := range blocks {
if i == 0 {
continue
}
if block.Header.Height != blocks[i-1].Header.Height+1 {
return false
}
}
return true
}
// check if the given blocks are continue from the latest indexed block
// to prevent inserting out-of-order blocks or duplicate blocks
func (p *Processor) isContinueFromLatestIndexedBlock(ctx context.Context, block *types.Block) bool {
latestBlock, err := p.CurrentBlock(ctx)
if err != nil {
return false
}
return block.Header.Height == latestBlock.Height+1
}
// there 2 coinbase transaction that are duplicated in the blocks 91842 and 91880.
// if the given block version is v1 and height is `91842` or `91880`,
// then remove transaction inputs/outputs to prevent duplicate txin/txout error when inserting to the database.
//
// Theses duplicated coinbase transactions are having the same transaction input/output and
// utxo from these 2 duplicated coinbase txs can redeem only once. so, it's safe to remove them and can
// use inputs/outputs from the previous block.
//
// Duplicate Coinbase Transactions:
// - `454279874213763724535987336644243549a273058910332236515429488599` in blocks 91812, 91842
// - `e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468` in blocks 91722, 91880
//
// This function will modify the given data directly.
func (p *Processor) removeDuplicateCoinbaseTxInputsOutputs(blocks []*types.Block) {
for _, block := range blocks {
header := block.Header
if header.Version == 1 && (header.Height == 91842 || header.Height == 91880) {
// remove transaction inputs/outputs from coinbase transaction (first transaction)
block.Transactions[0].TxIn = nil
block.Transactions[0].TxOut = nil
}
}
}

View File

@@ -1,144 +0,0 @@
package bitcoin
import (
"fmt"
"testing"
"github.com/gaze-network/indexer-network/core/types"
"github.com/stretchr/testify/assert"
)
func TestDuplicateCoinbaseTxHashHandling(t *testing.T) {
processor := Processor{}
generator := func() []*types.Block {
return []*types.Block{
{
Header: types.BlockHeader{Height: 91842, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
},
{
Header: types.BlockHeader{Height: 91880, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
},
}
}
t.Run("all_duplicated_txs", func(t *testing.T) {
blocks := generator()
processor.removeDuplicateCoinbaseTxInputsOutputs(blocks)
assert.Len(t, blocks, 2, "should not remove any blocks")
for _, block := range blocks {
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
assert.Len(t, block.Transactions[0].TxIn, 0, "should remove tx inputs from coinbase transaction")
assert.Len(t, block.Transactions[0].TxOut, 0, "should remove tx outputs from coinbase transaction")
}
})
t.Run("not_duplicated_txs", func(t *testing.T) {
blocks := []*types.Block{
{
Header: types.BlockHeader{Height: 91812, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
},
{
Header: types.BlockHeader{Height: 91722, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
},
}
processor.removeDuplicateCoinbaseTxInputsOutputs(blocks)
assert.Len(t, blocks, 2, "should not remove any blocks")
for _, block := range blocks {
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
assert.Len(t, block.Transactions[0].TxIn, 4, "should not remove tx inputs from coinbase transaction")
assert.Len(t, block.Transactions[0].TxOut, 4, "should not remove tx outputs from coinbase transaction")
}
})
t.Run("mixed", func(t *testing.T) {
blocks := []*types.Block{
{
Header: types.BlockHeader{Height: 91812, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
},
}
blocks = append(blocks, generator()...)
blocks = append(blocks, &types.Block{
Header: types.BlockHeader{Height: 91722, Version: 1},
Transactions: []*types.Transaction{
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
{
TxIn: []*types.TxIn{{}, {}, {}, {}},
TxOut: []*types.TxOut{{}, {}, {}, {}},
},
},
})
processor.removeDuplicateCoinbaseTxInputsOutputs(blocks)
assert.Len(t, blocks, 4, "should not remove any blocks")
// only 2nd and 3rd blocks should be modified
for i, block := range blocks {
t.Run(fmt.Sprint(i), func(t *testing.T) {
if i == 1 || i == 2 {
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
assert.Len(t, block.Transactions[0].TxIn, 0, "should remove tx inputs from coinbase transaction")
assert.Len(t, block.Transactions[0].TxOut, 0, "should remove tx outputs from coinbase transaction")
} else {
assert.Len(t, block.Transactions, 2, "should not remove any transactions")
assert.Lenf(t, block.Transactions[0].TxIn, 4, "should not remove tx inputs from coinbase transaction")
assert.Len(t, block.Transactions[0].TxOut, 4, "should not remove tx outputs from coinbase transaction")
}
})
}
})
}

View File

@@ -1,169 +0,0 @@
package postgres
import (
"context"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/bitcoin/repository/postgres/gen"
"github.com/jackc/pgx/v5"
"github.com/samber/lo"
)
func (r *Repository) GetLatestBlockHeader(ctx context.Context) (types.BlockHeader, error) {
model, err := r.queries.GetLatestBlockHeader(ctx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return types.BlockHeader{}, errors.Join(errs.NotFound, err)
}
return types.BlockHeader{}, errors.Wrap(err, "failed to get latest block header")
}
data, err := mapBlockHeaderModelToType(model)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "failed to map block header model to type")
}
return data, nil
}
func (r *Repository) InsertBlocks(ctx context.Context, blocks []*types.Block) error {
if len(blocks) == 0 {
return nil
}
blockParams, txParams, txoutParams, txinParams := mapBlocksTypeToParams(blocks)
tx, err := r.db.Begin(ctx)
if err != nil {
return errors.Wrap(err, "failed to begin transaction")
}
defer tx.Rollback(ctx)
queries := r.queries.WithTx(tx)
if err := queries.BatchInsertBlocks(ctx, blockParams); err != nil {
return errors.Wrap(err, "failed to batch insert block headers")
}
if err := queries.BatchInsertTransactions(ctx, txParams); err != nil {
return errors.Wrap(err, "failed to batch insert transactions")
}
// Should insert txout first, then txin
// Because txin references txout
if err := queries.BatchInsertTransactionTxOuts(ctx, txoutParams); err != nil {
return errors.Wrap(err, "failed to batch insert transaction txins")
}
if err := queries.BatchInsertTransactionTxIns(ctx, txinParams); err != nil {
return errors.Wrap(err, "failed to batch insert transaction txins")
}
if err := tx.Commit(ctx); err != nil {
return errors.Wrap(err, "failed to commit transaction")
}
return nil
}
func (r *Repository) RevertBlocks(ctx context.Context, from int64) error {
tx, err := r.db.Begin(ctx)
if err != nil {
return errors.Wrap(err, "failed to begin transaction")
}
defer tx.Rollback(ctx)
queries := r.queries.WithTx(tx)
if err := queries.RevertData(ctx, int32(from)); err != nil && !errors.Is(err, pgx.ErrNoRows) {
return errors.Wrap(err, "failed to revert data")
}
if err := tx.Commit(ctx); err != nil {
return errors.Wrap(err, "failed to commit transaction")
}
return nil
}
func (r *Repository) GetBlockHeaderByHeight(ctx context.Context, blockHeight int64) (types.BlockHeader, error) {
blockModel, err := r.queries.GetBlockByHeight(ctx, int32(blockHeight))
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return types.BlockHeader{}, errors.Join(errs.NotFound, err)
}
return types.BlockHeader{}, errors.Wrap(err, "failed to get block by height")
}
data, err := mapBlockHeaderModelToType(blockModel)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "failed to map block header model to type")
}
return data, nil
}
func (r *Repository) GetBlocksByHeightRange(ctx context.Context, from int64, to int64) ([]*types.Block, error) {
blocks, err := r.queries.GetBlocksByHeightRange(ctx, gen.GetBlocksByHeightRangeParams{
FromHeight: int32(from),
ToHeight: int32(to),
})
if err != nil {
return nil, errors.Wrap(err, "failed to get blocks by height range")
}
if len(blocks) == 0 {
return []*types.Block{}, nil
}
txs, err := r.queries.GetTransactionsByHeightRange(ctx, gen.GetTransactionsByHeightRangeParams{
FromHeight: int32(from),
ToHeight: int32(to),
})
if err != nil {
return nil, errors.Wrap(err, "failed to get transactions by height range")
}
txHashes := lo.Map(txs, func(tx gen.BitcoinTransaction, _ int) string { return tx.TxHash })
txOuts, err := r.queries.GetTransactionTxOutsByTxHashes(ctx, txHashes)
if err != nil {
return nil, errors.Wrap(err, "failed to get transaction txouts by tx hashes")
}
txIns, err := r.queries.GetTransactionTxInsByTxHashes(ctx, txHashes)
if err != nil {
return nil, errors.Wrap(err, "failed to get transaction txins by tx hashes")
}
// Grouping result by block height and tx hash
groupedTxs := lo.GroupBy(txs, func(tx gen.BitcoinTransaction) int32 { return tx.BlockHeight })
groupedTxOuts := lo.GroupBy(txOuts, func(txOut gen.BitcoinTransactionTxout) string { return txOut.TxHash })
groupedTxIns := lo.GroupBy(txIns, func(txIn gen.BitcoinTransactionTxin) string { return txIn.TxHash })
var errs []error
result := lo.Map(blocks, func(blockModel gen.BitcoinBlock, _ int) *types.Block {
header, err := mapBlockHeaderModelToType(blockModel)
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to map block header model to type"))
return nil
}
txsModel := groupedTxs[blockModel.BlockHeight]
return &types.Block{
Header: header,
Transactions: lo.Map(txsModel, func(txModel gen.BitcoinTransaction, _ int) *types.Transaction {
tx, err := mapTransactionModelToType(txModel, groupedTxIns[txModel.TxHash], groupedTxOuts[txModel.TxHash])
if err != nil {
errs = append(errs, errors.Wrap(err, "failed to map transaction model to type"))
return nil
}
return &tx
}),
}
})
if len(errs) > 0 {
return nil, errors.Wrap(errors.Join(errs...), "failed while mapping result")
}
return result, nil
}

View File

@@ -1,408 +0,0 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.26.0
// source: data.sql
package gen
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const batchInsertBlocks = `-- name: BatchInsertBlocks :exec
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce")
VALUES (
unnest($1::INT[]),
unnest($2::TEXT[]),
unnest($3::INT[]),
unnest($4::TEXT[]),
unnest($5::TEXT[]),
unnest($6::TIMESTAMP WITH TIME ZONE[]), -- or use TIMESTAMPTZ
unnest($7::BIGINT[]),
unnest($8::BIGINT[])
)
`
type BatchInsertBlocksParams struct {
BlockHeightArr []int32
BlockHashArr []string
VersionArr []int32
MerkleRootArr []string
PrevBlockHashArr []string
TimestampArr []pgtype.Timestamptz
BitsArr []int64
NonceArr []int64
}
func (q *Queries) BatchInsertBlocks(ctx context.Context, arg BatchInsertBlocksParams) error {
_, err := q.db.Exec(ctx, batchInsertBlocks,
arg.BlockHeightArr,
arg.BlockHashArr,
arg.VersionArr,
arg.MerkleRootArr,
arg.PrevBlockHashArr,
arg.TimestampArr,
arg.BitsArr,
arg.NonceArr,
)
return err
}
const batchInsertTransactionTxIns = `-- name: BatchInsertTransactionTxIns :exec
WITH update_txout AS (
UPDATE "bitcoin_transaction_txouts"
SET "is_spent" = true
FROM (SELECT unnest($1::TEXT[]) as tx_hash, unnest($2::INT[]) as tx_idx) as txin
WHERE "bitcoin_transaction_txouts"."tx_hash" = txin.tx_hash AND "bitcoin_transaction_txouts"."tx_idx" = txin.tx_idx AND "is_spent" = false
RETURNING "bitcoin_transaction_txouts"."tx_hash", "bitcoin_transaction_txouts"."tx_idx", "pkscript"
), prepare_insert AS (
SELECT input.tx_hash, input.tx_idx, prevout_tx_hash, prevout_tx_idx, update_txout.pkscript as prevout_pkscript, scriptsig, witness, sequence
FROM (
SELECT
unnest($3::TEXT[]) as tx_hash,
unnest($4::INT[]) as tx_idx,
unnest($1::TEXT[]) as prevout_tx_hash,
unnest($2::INT[]) as prevout_tx_idx,
unnest($5::TEXT[]) as scriptsig,
unnest($6::TEXT[]) as witness,
unnest($7::INT[]) as sequence
) input LEFT JOIN update_txout ON "update_txout"."tx_hash" = "input"."prevout_tx_hash" AND "update_txout"."tx_idx" = "input"."prevout_tx_idx"
)
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx", "prevout_pkscript","scriptsig","witness","sequence")
SELECT "tx_hash", "tx_idx", "prevout_tx_hash", "prevout_tx_idx", "prevout_pkscript", "scriptsig", "witness", "sequence" FROM prepare_insert
`
type BatchInsertTransactionTxInsParams struct {
PrevoutTxHashArr []string
PrevoutTxIdxArr []int32
TxHashArr []string
TxIdxArr []int32
ScriptsigArr []string
WitnessArr []string
SequenceArr []int32
}
func (q *Queries) BatchInsertTransactionTxIns(ctx context.Context, arg BatchInsertTransactionTxInsParams) error {
_, err := q.db.Exec(ctx, batchInsertTransactionTxIns,
arg.PrevoutTxHashArr,
arg.PrevoutTxIdxArr,
arg.TxHashArr,
arg.TxIdxArr,
arg.ScriptsigArr,
arg.WitnessArr,
arg.SequenceArr,
)
return err
}
const batchInsertTransactionTxOuts = `-- name: BatchInsertTransactionTxOuts :exec
INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value")
VALUES (
unnest($1::TEXT[]),
unnest($2::INT[]),
unnest($3::TEXT[]),
unnest($4::BIGINT[])
)
`
type BatchInsertTransactionTxOutsParams struct {
TxHashArr []string
TxIdxArr []int32
PkscriptArr []string
ValueArr []int64
}
func (q *Queries) BatchInsertTransactionTxOuts(ctx context.Context, arg BatchInsertTransactionTxOutsParams) error {
_, err := q.db.Exec(ctx, batchInsertTransactionTxOuts,
arg.TxHashArr,
arg.TxIdxArr,
arg.PkscriptArr,
arg.ValueArr,
)
return err
}
const batchInsertTransactions = `-- name: BatchInsertTransactions :exec
INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height","block_hash","idx")
VALUES (
unnest($1::TEXT[]),
unnest($2::INT[]),
unnest($3::BIGINT[]),
unnest($4::INT[]),
unnest($5::TEXT[]),
unnest($6::INT[])
)
`
type BatchInsertTransactionsParams struct {
TxHashArr []string
VersionArr []int32
LocktimeArr []int64
BlockHeightArr []int32
BlockHashArr []string
IdxArr []int32
}
func (q *Queries) BatchInsertTransactions(ctx context.Context, arg BatchInsertTransactionsParams) error {
_, err := q.db.Exec(ctx, batchInsertTransactions,
arg.TxHashArr,
arg.VersionArr,
arg.LocktimeArr,
arg.BlockHeightArr,
arg.BlockHashArr,
arg.IdxArr,
)
return err
}
const getBlockByHeight = `-- name: GetBlockByHeight :one
SELECT block_height, block_hash, version, merkle_root, prev_block_hash, timestamp, bits, nonce FROM bitcoin_blocks WHERE block_height = $1
`
func (q *Queries) GetBlockByHeight(ctx context.Context, blockHeight int32) (BitcoinBlock, error) {
row := q.db.QueryRow(ctx, getBlockByHeight, blockHeight)
var i BitcoinBlock
err := row.Scan(
&i.BlockHeight,
&i.BlockHash,
&i.Version,
&i.MerkleRoot,
&i.PrevBlockHash,
&i.Timestamp,
&i.Bits,
&i.Nonce,
)
return i, err
}
const getBlocksByHeightRange = `-- name: GetBlocksByHeightRange :many
SELECT block_height, block_hash, version, merkle_root, prev_block_hash, timestamp, bits, nonce FROM bitcoin_blocks WHERE block_height >= $1 AND block_height <= $2 ORDER BY block_height ASC
`
type GetBlocksByHeightRangeParams struct {
FromHeight int32
ToHeight int32
}
func (q *Queries) GetBlocksByHeightRange(ctx context.Context, arg GetBlocksByHeightRangeParams) ([]BitcoinBlock, error) {
rows, err := q.db.Query(ctx, getBlocksByHeightRange, arg.FromHeight, arg.ToHeight)
if err != nil {
return nil, err
}
defer rows.Close()
var items []BitcoinBlock
for rows.Next() {
var i BitcoinBlock
if err := rows.Scan(
&i.BlockHeight,
&i.BlockHash,
&i.Version,
&i.MerkleRoot,
&i.PrevBlockHash,
&i.Timestamp,
&i.Bits,
&i.Nonce,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getLatestBlockHeader = `-- name: GetLatestBlockHeader :one
SELECT block_height, block_hash, version, merkle_root, prev_block_hash, timestamp, bits, nonce FROM bitcoin_blocks ORDER BY block_height DESC LIMIT 1
`
func (q *Queries) GetLatestBlockHeader(ctx context.Context) (BitcoinBlock, error) {
row := q.db.QueryRow(ctx, getLatestBlockHeader)
var i BitcoinBlock
err := row.Scan(
&i.BlockHeight,
&i.BlockHash,
&i.Version,
&i.MerkleRoot,
&i.PrevBlockHash,
&i.Timestamp,
&i.Bits,
&i.Nonce,
)
return i, err
}
const getTransactionByHash = `-- name: GetTransactionByHash :one
SELECT tx_hash, version, locktime, block_height, block_hash, idx FROM bitcoin_transactions WHERE tx_hash = $1
`
func (q *Queries) GetTransactionByHash(ctx context.Context, txHash string) (BitcoinTransaction, error) {
row := q.db.QueryRow(ctx, getTransactionByHash, txHash)
var i BitcoinTransaction
err := row.Scan(
&i.TxHash,
&i.Version,
&i.Locktime,
&i.BlockHeight,
&i.BlockHash,
&i.Idx,
)
return i, err
}
const getTransactionTxInsByTxHashes = `-- name: GetTransactionTxInsByTxHashes :many
SELECT tx_hash, tx_idx, prevout_tx_hash, prevout_tx_idx, prevout_pkscript, scriptsig, witness, sequence FROM bitcoin_transaction_txins WHERE tx_hash = ANY($1::TEXT[])
`
func (q *Queries) GetTransactionTxInsByTxHashes(ctx context.Context, txHashes []string) ([]BitcoinTransactionTxin, error) {
rows, err := q.db.Query(ctx, getTransactionTxInsByTxHashes, txHashes)
if err != nil {
return nil, err
}
defer rows.Close()
var items []BitcoinTransactionTxin
for rows.Next() {
var i BitcoinTransactionTxin
if err := rows.Scan(
&i.TxHash,
&i.TxIdx,
&i.PrevoutTxHash,
&i.PrevoutTxIdx,
&i.PrevoutPkscript,
&i.Scriptsig,
&i.Witness,
&i.Sequence,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getTransactionTxOutsByTxHashes = `-- name: GetTransactionTxOutsByTxHashes :many
SELECT tx_hash, tx_idx, pkscript, value, is_spent FROM bitcoin_transaction_txouts WHERE tx_hash = ANY($1::TEXT[])
`
func (q *Queries) GetTransactionTxOutsByTxHashes(ctx context.Context, txHashes []string) ([]BitcoinTransactionTxout, error) {
rows, err := q.db.Query(ctx, getTransactionTxOutsByTxHashes, txHashes)
if err != nil {
return nil, err
}
defer rows.Close()
var items []BitcoinTransactionTxout
for rows.Next() {
var i BitcoinTransactionTxout
if err := rows.Scan(
&i.TxHash,
&i.TxIdx,
&i.Pkscript,
&i.Value,
&i.IsSpent,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getTransactionsByHeightRange = `-- name: GetTransactionsByHeightRange :many
SELECT tx_hash, version, locktime, block_height, block_hash, idx FROM bitcoin_transactions WHERE block_height >= $1 AND block_height <= $2
`
type GetTransactionsByHeightRangeParams struct {
FromHeight int32
ToHeight int32
}
func (q *Queries) GetTransactionsByHeightRange(ctx context.Context, arg GetTransactionsByHeightRangeParams) ([]BitcoinTransaction, error) {
rows, err := q.db.Query(ctx, getTransactionsByHeightRange, arg.FromHeight, arg.ToHeight)
if err != nil {
return nil, err
}
defer rows.Close()
var items []BitcoinTransaction
for rows.Next() {
var i BitcoinTransaction
if err := rows.Scan(
&i.TxHash,
&i.Version,
&i.Locktime,
&i.BlockHeight,
&i.BlockHash,
&i.Idx,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const insertBlock = `-- name: InsertBlock :exec
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce") VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`
type InsertBlockParams struct {
BlockHeight int32
BlockHash string
Version int32
MerkleRoot string
PrevBlockHash string
Timestamp pgtype.Timestamptz
Bits int64
Nonce int64
}
func (q *Queries) InsertBlock(ctx context.Context, arg InsertBlockParams) error {
_, err := q.db.Exec(ctx, insertBlock,
arg.BlockHeight,
arg.BlockHash,
arg.Version,
arg.MerkleRoot,
arg.PrevBlockHash,
arg.Timestamp,
arg.Bits,
arg.Nonce,
)
return err
}
const revertData = `-- name: RevertData :exec
WITH delete_tx AS (
DELETE FROM "bitcoin_transactions" WHERE "block_height" >= $1
RETURNING "tx_hash"
), delete_txin AS (
DELETE FROM "bitcoin_transaction_txins" WHERE "tx_hash" = ANY(SELECT "tx_hash" FROM delete_tx)
RETURNING "prevout_tx_hash", "prevout_tx_idx"
), delete_txout AS (
DELETE FROM "bitcoin_transaction_txouts" WHERE "tx_hash" = ANY(SELECT "tx_hash" FROM delete_tx)
RETURNING "tx_hash", "tx_idx"
), revert_txout_spent AS (
UPDATE "bitcoin_transaction_txouts"
SET "is_spent" = false
WHERE
("tx_hash", "tx_idx") IN (SELECT "prevout_tx_hash", "prevout_tx_idx" FROM delete_txin) AND
("tx_hash", "tx_idx") NOT IN (SELECT "tx_hash", "tx_idx" FROM delete_txout) -- avoid to modified same row twice (modified the same row twice in a single statement is not supported)
RETURNING NULL
)
DELETE FROM "bitcoin_blocks" WHERE "bitcoin_blocks"."block_height" >= $1
`
func (q *Queries) RevertData(ctx context.Context, fromHeight int32) error {
_, err := q.db.Exec(ctx, revertData, fromHeight)
return err
}

View File

@@ -1,32 +0,0 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.26.0
package gen
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)
type DBTX interface {
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
QueryRow(context.Context, string, ...interface{}) pgx.Row
}
func New(db DBTX) *Queries {
return &Queries{db: db}
}
type Queries struct {
db DBTX
}
func (q *Queries) WithTx(tx pgx.Tx) *Queries {
return &Queries{
db: tx,
}
}

View File

@@ -1,51 +0,0 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.26.0
// source: info.sql
package gen
import (
"context"
)
const getCurrentDBVersion = `-- name: GetCurrentDBVersion :one
SELECT "version" FROM bitcoin_indexer_db_version ORDER BY id DESC LIMIT 1
`
func (q *Queries) GetCurrentDBVersion(ctx context.Context) (int32, error) {
row := q.db.QueryRow(ctx, getCurrentDBVersion)
var version int32
err := row.Scan(&version)
return version, err
}
const getCurrentIndexerStats = `-- name: GetCurrentIndexerStats :one
SELECT "client_version", "network" FROM bitcoin_indexer_stats ORDER BY id DESC LIMIT 1
`
type GetCurrentIndexerStatsRow struct {
ClientVersion string
Network string
}
func (q *Queries) GetCurrentIndexerStats(ctx context.Context) (GetCurrentIndexerStatsRow, error) {
row := q.db.QueryRow(ctx, getCurrentIndexerStats)
var i GetCurrentIndexerStatsRow
err := row.Scan(&i.ClientVersion, &i.Network)
return i, err
}
const updateIndexerStats = `-- name: UpdateIndexerStats :exec
INSERT INTO bitcoin_indexer_stats (client_version, network) VALUES ($1, $2)
`
type UpdateIndexerStatsParams struct {
ClientVersion string
Network string
}
func (q *Queries) UpdateIndexerStats(ctx context.Context, arg UpdateIndexerStatsParams) error {
_, err := q.db.Exec(ctx, updateIndexerStats, arg.ClientVersion, arg.Network)
return err
}

View File

@@ -1,61 +0,0 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.26.0
package gen
import (
"github.com/jackc/pgx/v5/pgtype"
)
type BitcoinBlock struct {
BlockHeight int32
BlockHash string
Version int32
MerkleRoot string
PrevBlockHash string
Timestamp pgtype.Timestamptz
Bits int64
Nonce int64
}
type BitcoinIndexerDbVersion struct {
Id int64
Version int32
CreatedAt pgtype.Timestamptz
}
type BitcoinIndexerStat struct {
Id int64
ClientVersion string
Network string
CreatedAt pgtype.Timestamptz
}
type BitcoinTransaction struct {
TxHash string
Version int32
Locktime int64
BlockHeight int32
BlockHash string
Idx int32
}
type BitcoinTransactionTxin struct {
TxHash string
TxIdx int32
PrevoutTxHash string
PrevoutTxIdx int32
PrevoutPkscript pgtype.Text
Scriptsig string
Witness string
Sequence int64
}
type BitcoinTransactionTxout struct {
TxHash string
TxIdx int32
Pkscript string
Value int64
IsSpent bool
}

View File

@@ -1,44 +0,0 @@
package postgres
import (
"context"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
"github.com/gaze-network/indexer-network/modules/bitcoin/repository/postgres/gen"
"github.com/jackc/pgx/v5"
)
// Make sure Repository implements the IndexerInformationDataGateway interface
var _ datagateway.IndexerInformationDataGateway = (*Repository)(nil)
func (r *Repository) GetCurrentDBVersion(ctx context.Context) (int32, error) {
version, err := r.queries.GetCurrentDBVersion(ctx)
if err != nil {
return 0, errors.WithStack(err)
}
return version, nil
}
func (r *Repository) GetLatestIndexerStats(ctx context.Context) (string, common.Network, error) {
stats, err := r.queries.GetCurrentIndexerStats(ctx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return "", "", errors.Join(errs.NotFound, err)
}
return "", "", errors.WithStack(err)
}
return stats.ClientVersion, common.Network(stats.Network), nil
}
func (r *Repository) UpdateIndexerStats(ctx context.Context, clientVersion string, network common.Network) error {
if err := r.queries.UpdateIndexerStats(ctx, gen.UpdateIndexerStatsParams{
ClientVersion: clientVersion,
Network: network.String(),
}); err != nil {
return errors.WithStack(err)
}
return nil
}

View File

@@ -1,197 +0,0 @@
package postgres
import (
"cmp"
"encoding/hex"
"slices"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/bitcoin/repository/postgres/gen"
"github.com/gaze-network/indexer-network/pkg/btcutils"
"github.com/jackc/pgx/v5/pgtype"
)
func mapBlockHeaderModelToType(src gen.BitcoinBlock) (types.BlockHeader, error) {
hash, err := chainhash.NewHashFromStr(src.BlockHash)
if err != nil {
return types.BlockHeader{}, errors.Join(errors.Wrap(err, "failed to parse block hash"), errs.InternalError)
}
prevHash, err := chainhash.NewHashFromStr(src.PrevBlockHash)
if err != nil {
return types.BlockHeader{}, errors.Join(errors.Wrap(err, "failed to parse prev block hash"), errs.InternalError)
}
merkleRoot, err := chainhash.NewHashFromStr(src.MerkleRoot)
if err != nil {
return types.BlockHeader{}, errors.Join(errors.Wrap(err, "failed to parse merkle root"), errs.InternalError)
}
return types.BlockHeader{
Hash: *hash,
Height: int64(src.BlockHeight),
Version: src.Version,
PrevBlock: *prevHash,
MerkleRoot: *merkleRoot,
Timestamp: src.Timestamp.Time,
Bits: uint32(src.Bits),
Nonce: uint32(src.Nonce),
}, nil
}
func mapBlocksTypeToParams(src []*types.Block) (gen.BatchInsertBlocksParams, gen.BatchInsertTransactionsParams, gen.BatchInsertTransactionTxOutsParams, gen.BatchInsertTransactionTxInsParams) {
blocks := gen.BatchInsertBlocksParams{
BlockHeightArr: make([]int32, 0, len(src)),
BlockHashArr: make([]string, 0, len(src)),
VersionArr: make([]int32, 0, len(src)),
MerkleRootArr: make([]string, 0, len(src)),
PrevBlockHashArr: make([]string, 0, len(src)),
TimestampArr: make([]pgtype.Timestamptz, 0, len(src)),
BitsArr: make([]int64, 0, len(src)),
NonceArr: make([]int64, 0, len(src)),
}
txs := gen.BatchInsertTransactionsParams{
TxHashArr: []string{},
VersionArr: []int32{},
LocktimeArr: []int64{},
BlockHeightArr: []int32{},
BlockHashArr: []string{},
IdxArr: []int32{},
}
txouts := gen.BatchInsertTransactionTxOutsParams{
TxHashArr: []string{},
TxIdxArr: []int32{},
PkscriptArr: []string{},
ValueArr: []int64{},
}
txins := gen.BatchInsertTransactionTxInsParams{
PrevoutTxHashArr: []string{},
PrevoutTxIdxArr: []int32{},
TxHashArr: []string{},
TxIdxArr: []int32{},
ScriptsigArr: []string{},
WitnessArr: []string{},
SequenceArr: []int32{},
}
for _, block := range src {
blockHash := block.Header.Hash.String()
// Batch insert blocks
blocks.BlockHeightArr = append(blocks.BlockHeightArr, int32(block.Header.Height))
blocks.BlockHashArr = append(blocks.BlockHashArr, blockHash)
blocks.VersionArr = append(blocks.VersionArr, block.Header.Version)
blocks.MerkleRootArr = append(blocks.MerkleRootArr, block.Header.MerkleRoot.String())
blocks.PrevBlockHashArr = append(blocks.PrevBlockHashArr, block.Header.PrevBlock.String())
blocks.TimestampArr = append(blocks.TimestampArr, pgtype.Timestamptz{
Time: block.Header.Timestamp,
Valid: true,
})
blocks.BitsArr = append(blocks.BitsArr, int64(block.Header.Bits))
blocks.NonceArr = append(blocks.NonceArr, int64(block.Header.Nonce))
for txIdx, srcTx := range block.Transactions {
txHash := srcTx.TxHash.String()
// Batch insert transactions
txs.TxHashArr = append(txs.TxHashArr, txHash)
txs.VersionArr = append(txs.VersionArr, srcTx.Version)
txs.LocktimeArr = append(txs.LocktimeArr, int64(srcTx.LockTime))
txs.BlockHeightArr = append(txs.BlockHeightArr, int32(block.Header.Height))
txs.BlockHashArr = append(txs.BlockHashArr, blockHash)
txs.IdxArr = append(txs.IdxArr, int32(txIdx))
// Batch insert txins
for idx, txin := range srcTx.TxIn {
var witness string
if len(txin.Witness) > 0 {
witness = btcutils.WitnessToString(txin.Witness)
}
txins.TxHashArr = append(txins.TxHashArr, txHash)
txins.TxIdxArr = append(txins.TxIdxArr, int32(idx))
txins.PrevoutTxHashArr = append(txins.PrevoutTxHashArr, txin.PreviousOutTxHash.String())
txins.PrevoutTxIdxArr = append(txins.PrevoutTxIdxArr, int32(txin.PreviousOutIndex))
txins.ScriptsigArr = append(txins.ScriptsigArr, hex.EncodeToString(txin.SignatureScript))
txins.WitnessArr = append(txins.WitnessArr, witness)
txins.SequenceArr = append(txins.SequenceArr, int32(txin.Sequence))
}
// Batch insert txouts
for idx, txout := range srcTx.TxOut {
txouts.TxHashArr = append(txouts.TxHashArr, txHash)
txouts.TxIdxArr = append(txouts.TxIdxArr, int32(idx))
txouts.PkscriptArr = append(txouts.PkscriptArr, hex.EncodeToString(txout.PkScript))
txouts.ValueArr = append(txouts.ValueArr, txout.Value)
}
}
}
return blocks, txs, txouts, txins
}
func mapTransactionModelToType(src gen.BitcoinTransaction, txInModel []gen.BitcoinTransactionTxin, txOutModels []gen.BitcoinTransactionTxout) (types.Transaction, error) {
blockHash, err := chainhash.NewHashFromStr(src.BlockHash)
if err != nil {
return types.Transaction{}, errors.Wrap(err, "failed to parse block hash")
}
txHash, err := chainhash.NewHashFromStr(src.TxHash)
if err != nil {
return types.Transaction{}, errors.Wrap(err, "failed to parse tx hash")
}
// Sort txins and txouts by index (Asc)
slices.SortFunc(txOutModels, func(i, j gen.BitcoinTransactionTxout) int {
return cmp.Compare(i.TxIdx, j.TxIdx)
})
slices.SortFunc(txInModel, func(i, j gen.BitcoinTransactionTxin) int {
return cmp.Compare(i.TxIdx, j.TxIdx)
})
txIns := make([]*types.TxIn, 0, len(txInModel))
txOuts := make([]*types.TxOut, 0, len(txOutModels))
for _, txInModel := range txInModel {
scriptsig, err := hex.DecodeString(txInModel.Scriptsig)
if err != nil {
return types.Transaction{}, errors.Wrap(err, "failed to decode scriptsig")
}
prevoutTxHash, err := chainhash.NewHashFromStr(txInModel.PrevoutTxHash)
if err != nil {
return types.Transaction{}, errors.Wrap(err, "failed to parse prevout tx hash")
}
witness, err := btcutils.WitnessFromString(txInModel.Witness)
if err != nil {
return types.Transaction{}, errors.Wrap(err, "failed to parse witness from hex string")
}
txIns = append(txIns, &types.TxIn{
SignatureScript: scriptsig,
Witness: witness,
Sequence: uint32(txInModel.Sequence),
PreviousOutIndex: uint32(txInModel.PrevoutTxIdx),
PreviousOutTxHash: *prevoutTxHash,
})
}
for _, txOutModel := range txOutModels {
pkscript, err := hex.DecodeString(txOutModel.Pkscript)
if err != nil {
return types.Transaction{}, errors.Wrap(err, "failed to decode pkscript")
}
txOuts = append(txOuts, &types.TxOut{
PkScript: pkscript,
Value: txOutModel.Value,
})
}
return types.Transaction{
BlockHeight: int64(src.BlockHeight),
BlockHash: *blockHash,
Index: uint32(src.Idx),
TxHash: *txHash,
Version: src.Version,
LockTime: uint32(src.Locktime),
TxIn: txIns,
TxOut: txOuts,
}, nil
}

View File

@@ -1,22 +0,0 @@
package postgres
import (
"github.com/gaze-network/indexer-network/internal/postgres"
"github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
"github.com/gaze-network/indexer-network/modules/bitcoin/repository/postgres/gen"
)
// Make sure Repository implements the BitcoinDataGateway interface
var _ datagateway.BitcoinDataGateway = (*Repository)(nil)
type Repository struct {
db postgres.DB
queries *gen.Queries
}
func NewRepository(db postgres.DB) *Repository {
return &Repository{
db: db,
queries: gen.New(db),
}
}

View File

@@ -1,35 +0,0 @@
package postgres
import (
"context"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/jackc/pgx/v5"
)
func (r *Repository) GetTransactionByHash(ctx context.Context, txHash chainhash.Hash) (*types.Transaction, error) {
model, err := r.queries.GetTransactionByHash(ctx, txHash.String())
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, errors.Join(errs.NotFound, err)
}
return nil, errors.Wrap(err, "failed to get transaction by hash")
}
txIns, err := r.queries.GetTransactionTxInsByTxHashes(ctx, []string{txHash.String()})
if err != nil {
return nil, errors.Wrap(err, "failed to get transaction txins by tx hashes")
}
txOuts, err := r.queries.GetTransactionTxOutsByTxHashes(ctx, []string{txHash.String()})
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return nil, errors.Wrap(err, "failed to get transaction txouts by tx hashes")
}
tx, err := mapTransactionModelToType(model, txIns, txOuts)
if err != nil {
return nil, errors.Wrap(err, "failed to map transaction model to type")
}
return &tx, nil
}

View File

@@ -1 +0,0 @@
package bitcoin

View File

@@ -3,7 +3,7 @@ package config
import "github.com/gaze-network/indexer-network/internal/postgres"
type Config struct {
Datasource string `mapstructure:"datasource"` // Datasource to fetch bitcoin data for Meta-Protocol e.g. `bitcoin-node` | `database`
Datasource string `mapstructure:"datasource"` // Datasource to fetch bitcoin data for Meta-Protocol e.g. `bitcoin-node`
Database string `mapstructure:"database"` // Database to store runes data.
APIHandlers []string `mapstructure:"api_handlers"` // List of API handlers to enable. (e.g. `http`)
Postgres postgres.Config `mapstructure:"postgres"`

View File

@@ -9,12 +9,12 @@ import (
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/indexers"
"github.com/gaze-network/indexer-network/core/indexer"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/bitcoin/btcclient"
"github.com/gaze-network/indexer-network/modules/runes/datagateway"
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
"github.com/gaze-network/indexer-network/modules/runes/runes"
"github.com/gaze-network/indexer-network/pkg/btcclient"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
@@ -22,15 +22,15 @@ import (
"github.com/samber/lo"
)
var _ indexers.BitcoinProcessor = (*Processor)(nil)
// Make sure to implement the Bitcoin Processor interface
var _ indexer.Processor[*types.Block] = (*Processor)(nil)
type Processor struct {
runesDg datagateway.RunesDataGateway
indexerInfoDg datagateway.IndexerInfoDataGateway
bitcoinClient btcclient.Contract
bitcoinDataSource indexers.BitcoinDatasource
network common.Network
reportingClient *reportingclient.ReportingClient
runesDg datagateway.RunesDataGateway
indexerInfoDg datagateway.IndexerInfoDataGateway
bitcoinClient btcclient.Contract
network common.Network
reportingClient *reportingclient.ReportingClient
newRuneEntries map[runes.RuneId]*runes.RuneEntry
newRuneEntryStates map[runes.RuneId]*runes.RuneEntry
@@ -40,12 +40,11 @@ type Processor struct {
newRuneTxs []*entity.RuneTransaction
}
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, bitcoinDataSource indexers.BitcoinDatasource, network common.Network, reportingClient *reportingclient.ReportingClient) *Processor {
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient) *Processor {
return &Processor{
runesDg: runesDg,
indexerInfoDg: indexerInfoDg,
bitcoinClient: bitcoinClient,
bitcoinDataSource: bitcoinDataSource,
network: network,
reportingClient: reportingClient,
newRuneEntries: make(map[runes.RuneId]*runes.RuneEntry),

View File

@@ -6,7 +6,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/modules/runes/datagateway"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/jackc/pgx"
"github.com/jackc/pgx/v5"
)
var ErrTxAlreadyExists = errors.New("Transaction already exists. Call Commit() or Rollback() first.")

89
modules/runes/runes.go Normal file
View File

@@ -0,0 +1,89 @@
package runes
import (
"context"
"strings"
"github.com/btcsuite/btcd/rpcclient"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/datasources"
"github.com/gaze-network/indexer-network/core/indexer"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/config"
"github.com/gaze-network/indexer-network/internal/postgres"
runesapi "github.com/gaze-network/indexer-network/modules/runes/api"
runesdatagateway "github.com/gaze-network/indexer-network/modules/runes/datagateway"
runespostgres "github.com/gaze-network/indexer-network/modules/runes/repository/postgres"
runesusecase "github.com/gaze-network/indexer-network/modules/runes/usecase"
"github.com/gaze-network/indexer-network/pkg/btcclient"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gofiber/fiber/v2"
"github.com/samber/do/v2"
"github.com/samber/lo"
)
func New(injector do.Injector) (indexer.IndexerWorker, error) {
ctx := do.MustInvoke[context.Context](injector)
conf := do.MustInvoke[config.Config](injector)
reportingClient := do.MustInvoke[*reportingclient.ReportingClient](injector)
var (
runesDg runesdatagateway.RunesDataGateway
indexerInfoDg runesdatagateway.IndexerInfoDataGateway
)
switch strings.ToLower(conf.Modules.Runes.Database) {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
if err != nil {
if errors.Is(err, errs.InvalidArgument) {
return nil, errors.Wrap(err, "Invalid Postgres configuration for indexer")
}
return nil, errors.Wrap(err, "can't create Postgres connection pool")
}
defer pg.Close()
runesRepo := runespostgres.NewRepository(pg)
runesDg = runesRepo
indexerInfoDg = runesRepo
default:
return nil, errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Runes.Database)
}
var bitcoinDatasource datasources.Datasource[*types.Block]
var bitcoinClient btcclient.Contract
switch strings.ToLower(conf.Modules.Runes.Datasource) {
case "bitcoin-node":
btcClient := do.MustInvoke[*rpcclient.Client](injector)
bitcoinNodeDatasource := datasources.NewBitcoinNode(btcClient)
bitcoinDatasource = bitcoinNodeDatasource
bitcoinClient = bitcoinNodeDatasource
default:
return nil, errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
}
processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient)
if err := processor.VerifyStates(ctx); err != nil {
return nil, errors.WithStack(err)
}
// Mount API
apiHandlers := lo.Uniq(conf.Modules.Runes.APIHandlers)
for _, handler := range apiHandlers {
switch handler { // TODO: support more handlers (e.g. gRPC)
case "http":
httpServer := do.MustInvoke[*fiber.App](injector)
runesUsecase := runesusecase.New(runesDg, bitcoinClient)
runesHTTPHandler := runesapi.NewHTTPHandler(conf.Network, runesUsecase)
if err := runesHTTPHandler.Mount(httpServer); err != nil {
return nil, errors.Wrap(err, "can't mount Runes API")
}
logger.InfoContext(ctx, "Mounted HTTP handler")
default:
return nil, errors.Wrapf(errs.Unsupported, "%q API handler is not supported", handler)
}
}
indexer := indexer.New(processor, bitcoinDatasource)
return indexer, nil
}

View File

@@ -69,8 +69,26 @@ func ParseTag(input interface{}) (Tag, error) {
return input, nil
case uint128.Uint128:
return Tag(input), nil
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
return Tag(uint128.From64(input.(uint64))), nil
case int:
return Tag(uint128.From64(uint64(input))), nil
case int8:
return Tag(uint128.From64(uint64(input))), nil
case int16:
return Tag(uint128.From64(uint64(input))), nil
case int32:
return Tag(uint128.From64(uint64(input))), nil
case int64:
return Tag(uint128.From64(uint64(input))), nil
case uint:
return Tag(uint128.From64(uint64(input))), nil
case uint8:
return Tag(uint128.From64(uint64(input))), nil
case uint16:
return Tag(uint128.From64(uint64(input))), nil
case uint32:
return Tag(uint128.From64(uint64(input))), nil
case uint64:
return Tag(uint128.From64(input)), nil
case big.Int:
u128, err := uint128.FromBig(&input)
if err != nil {

View File

@@ -1,8 +1,8 @@
package usecase
import (
"github.com/gaze-network/indexer-network/modules/bitcoin/btcclient"
"github.com/gaze-network/indexer-network/modules/runes/datagateway"
"github.com/gaze-network/indexer-network/pkg/btcclient"
)
type Usecase struct {

View File

@@ -0,0 +1,99 @@
package automaxprocs
import (
"context"
"fmt"
"log/slog"
"os"
"runtime"
"github.com/Cleverse/go-utilities/utils"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"go.uber.org/automaxprocs/maxprocs"
)
var (
// undo is the undo function returned by maxprocs.Set
undo func()
// autoMaxProcs is the value of GOMAXPROCS set by automaxprocs.
// will be -1 if `automaxprocs` is not initialized.
autoMaxProcs = -1
// initialMaxProcs is the initial value of GOMAXPROCS.
initialMaxProcs = Current()
)
func Init() error {
logger := logger.With(
slogx.String("package", "automaxprocs"),
slogx.String("event", "set_gomaxprocs"),
slogx.Int("prev_maxprocs", initialMaxProcs),
)
// Create a logger function for `maxprocs.Set`.
setMaxProcLogger := func(format string, v ...any) {
fields := make([]slog.Attr, 0, 1)
// `maxprocs.Set` will always pass current GOMAXPROCS value to logger.
// except when calling `undo` function, it will not pass any value.
if val, ok := utils.Optional(v); ok {
// if `GOMAXPROCS` environment variable is set, then `automaxprocs` will honor it.
if _, exists := os.LookupEnv("GOMAXPROCS"); exists {
val = Current()
}
// add logging field for `set_maxprocs` value if it's present in integer value.
if setmaxprocs, ok := val.(int); ok {
fields = append(fields, slogx.Int("set_maxprocs", setmaxprocs))
}
}
logger.LogAttrs(context.Background(), slog.LevelInfo, fmt.Sprintf(format, v...), fields...)
}
// Set GOMAXPROCS to match the Linux container CPU quota (if any), returning
// any error encountered and an undo function.
//
// Set is a no-op on non-Linux systems and in Linux environments without a
// configured CPU quota.
revert, err := maxprocs.Set(maxprocs.Logger(setMaxProcLogger), maxprocs.Min(1))
if err != nil {
return errors.WithStack(err)
}
// set the result of `maxprocs.Set` to global variable.
autoMaxProcs = Current()
undo = revert
return nil
}
// Undo restores GOMAXPROCS to its previous value.
// or revert to initial value if `automaxprocs` is not initialized.
//
// returns the current GOMAXPROCS value.
func Undo() int {
if undo != nil {
undo()
return Current()
}
runtime.GOMAXPROCS(initialMaxProcs)
return initialMaxProcs
}
// Current returns the current value of GOMAXPROCS.
func Current() int {
return runtime.GOMAXPROCS(0)
}
// Value returns the value of GOMAXPROCS set by automaxprocs.
// returns -1 if `automaxprocs` is not initialized.
func Value() int {
if autoMaxProcs <= 0 {
return -1
}
return autoMaxProcs
}

View File

@@ -7,16 +7,6 @@
# https://docs.sqlc.dev/en/stable/howto/ddl.html#golang-migrate
version: "2"
sql:
- schema: "./modules/bitcoin/database/postgresql/migrations"
queries: "./modules/bitcoin/database/postgresql/queries"
engine: "postgresql"
gen:
go:
package: "gen"
out: "./modules/bitcoin/repository/postgres/gen"
sql_package: "pgx/v5"
rename:
id: "Id"
- schema: "./modules/runes/database/postgresql/migrations"
queries: "./modules/runes/database/postgresql/queries"
engine: "postgresql"