Merge remote-tracking branch 'origin/feature/bitcoin-indexer' into feat/runes-module

This commit is contained in:
Gaze
2024-04-16 22:33:27 +07:00
9 changed files with 131 additions and 43 deletions

View File

@@ -7,6 +7,8 @@ import (
"syscall"
"github.com/btcsuite/btcd/rpcclient"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/datasources"
"github.com/gaze-network/indexer-network/core/indexers"
"github.com/gaze-network/indexer-network/internal/config"
@@ -23,15 +25,8 @@ import (
)
type runCmdOptions struct {
ProtocolDatasource string // Datasource to fetch bitcoin data for Meta-Protocol e.g. `bitcoin-node` | `database`
Bitcoin struct {
Enabled bool
Database string // DB to store bitcoin data e.g. `postgres` | `bigtable` | `leveldb`
}
Runes struct {
Enabled bool
Database string // DB to store bitcoin data e.g. `postgres` | `bigtable` | `leveldb`
}
Bitcoin bool
Runes bool
}
func NewRunCommand() *cobra.Command {
@@ -41,23 +36,30 @@ func NewRunCommand() *cobra.Command {
runCmd := &cobra.Command{
Use: "run",
Short: "Start indexer-network service",
Run: func(cmd *cobra.Command, args []string) {
runHandler(opts, cmd, args)
RunE: func(cmd *cobra.Command, args []string) error {
return runHandler(opts, cmd, args)
},
}
// TODO: separate flags and bind flags to each module cmd package.
// Add local flags
flags := runCmd.Flags()
flags.StringVar(&opts.ProtocolDatasource, "protocol-datasource", "bitcoin-node", `Datasource to fetch bitcoin data for Meta-Protocol. current supported datasources: "bitcoin-node" | "database"`)
flags.BoolVar(&opts.Bitcoin.Enabled, "bitcoin", false, "Enable Bitcoin indexer module")
flags.StringVar(&opts.Bitcoin.Database, "bitcoin-db", "postgres", `Database to store bitcoin data. current supported databases: "postgres"`)
flags.BoolVar(&opts.Runes.Enabled, "runes", false, "Enable Runes indexer module")
flags.StringVar(&opts.Runes.Database, "runes-db", "postgres", `Database to store runes data. current supported databases: "postgres"`)
flags.BoolVar(&opts.Bitcoin, "bitcoin", false, "Enable Bitcoin indexer module")
flags.String("bitcoin-db", "postgres", `Database to store bitcoin data. current supported databases: "postgres"`)
flags.BoolVar(&opts.Runes, "runes", false, "Enable Runes indexer module")
flags.String("runes-db", "postgres", `Database to store runes data. current supported databases: "postgres"`)
flags.String("runes-datasource", "bitcoin-node", `Datasource to fetch bitcoin data for processing Meta-Protocol data. current supported datasources: "bitcoin-node" | "database"`)
// Bind flags to configuration
config.BindPFlag("modules.bitcoin.database", flags.Lookup("bitcoin-db"))
config.BindPFlag("modules.runes.database", flags.Lookup("runes-db"))
config.BindPFlag("modules.runes.datasource", flags.Lookup("runes-datasource"))
return runCmd
}
func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) {
func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
conf := config.Load()
// Initialize context
@@ -88,25 +90,25 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) {
// Validate network
if !conf.Network.IsSupported() {
logger.PanicContext(ctx, "Unsupported network", slogx.String("network", conf.Network.String()))
return errors.Wrapf(errs.Unsupported, "%q network is not supported", conf.Network.String())
}
// TODO: create module command package.
// each module should have its own command package and main package will routing the command to the module command package.
// Initialize Bitcoin Indexer
if opts.Bitcoin.Enabled {
if opts.Bitcoin {
var db btcdatagateway.BitcoinDataGateway
switch strings.ToLower(opts.Bitcoin.Database) {
switch strings.ToLower(conf.Modules.Bitcoin.Database) {
case "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules["bitcoin"].Postgres)
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
if err != nil {
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
db = btcpostgres.NewRepository(pg)
default:
logger.PanicContext(ctx, "Unsupported database", slogx.String("database", opts.Bitcoin.Database))
return errors.Wrapf(errs.Unsupported, "%q database is not supported", conf.Modules.Bitcoin.Database)
}
bitcoinProcessor := bitcoin.NewProcessor(db)
bitcoinNodeDatasource := datasources.NewBitcoinNode(client)
@@ -155,4 +157,5 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) {
// Wait for interrupt signal to gracefully stop the server with
<-ctx.Done()
logger.InfoContext(ctx, "Shutting down server")
return nil
}

View File

@@ -12,6 +12,7 @@ network: mainnet
modules:
bitcoin:
database: "postgres" # Database to store bitcoin data. current supported databases: "postgres"
postgres:
host: "localhost"
port: 5432

View File

@@ -214,6 +214,22 @@ func (d *BitcoinNodeDatasource) GetTransaction(ctx context.Context, txHash chain
if err != nil {
return nil, errors.Wrap(err, "failed to get raw transaction")
}
msgTx := rawTx.MsgTx()
return types.ParseMsgTx(msgTx, 0, chainhash.Hash{}, 0), nil
}
// GetBlockHeader fetch block header from Bitcoin node
func (d *BitcoinNodeDatasource) GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error) {
hash, err := d.btcclient.GetBlockHash(height)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "failed to get block hash")
}
block, err := d.btcclient.GetBlockHeader(hash)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "failed to get block header")
}
return types.ParseMsgBlockHeader(*block, height), nil
}

View File

@@ -2,10 +2,14 @@ package datasources
import (
"context"
"github.com/gaze-network/indexer-network/core/types"
)
// Datasource is an interface for indexer data sources.
type Datasource[T any] interface {
Name() string
Fetch(ctx context.Context, from, to int64) (T, error)
FetchAsync(ctx context.Context, from, to int64, ch chan<- T) (*ClientSubscription[T], error)
GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error)
}

View File

@@ -13,6 +13,10 @@ import (
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
)
const (
maxReorgLookBack = 100
)
type (
BitcoinProcessor Processor[[]*types.Block]
BitcoinDatasource datasources.Datasource[[]*types.Block]
@@ -88,19 +92,55 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
// validate reorg from first block
{
currentBlockHeader := blocks[0].Header
if !currentBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
remoteBlockHeader := blocks[0].Header
if !remoteBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
logger.WarnContext(ctx, "reorg detected",
slogx.Stringer("current_hash", i.currentBlock.Hash),
slogx.Stringer("expected_hash", currentBlockHeader.PrevBlock),
slogx.Stringer("expected_hash", remoteBlockHeader.PrevBlock),
)
// TODO: find start reorg block
startReorgHeight := currentBlockHeader.Height // TODO: use start reorg block height
if err := i.Processor.RevertData(ctx, startReorgHeight); err != nil {
var (
targetHeight = i.currentBlock.Height - 1
beforeReorgBlockHeader = types.BlockHeader{
Height: -1,
}
)
for n := 0; n < maxReorgLookBack; n++ {
// TODO: concurrent fetch
indexedHeader, err := i.Processor.GetIndexedBlock(ctx, targetHeight)
if err != nil {
return errors.Wrapf(err, "failed to get indexed block, height: %d", targetHeight)
}
remoteHeader, err := i.Datasource.GetBlockHeader(ctx, targetHeight)
if err != nil {
return errors.Wrapf(err, "failed to get remote block header, height: %d", targetHeight)
}
// Found no reorg block
if indexedHeader.Hash.IsEqual(&remoteHeader.Hash) {
beforeReorgBlockHeader = remoteHeader
break
}
// Walk back to find fork point
targetHeight -= 1
}
// Reorg look back limit reached
if beforeReorgBlockHeader.Height < 0 {
return errors.Wrap(errs.SomethingWentWrong, "reorg look back limit reached")
}
// Revert all data since the reorg block
logger.WarnContext(ctx, "reverting reorg data", slogx.Int64("from", beforeReorgBlockHeader.Height+1))
if err := i.Processor.RevertData(ctx, beforeReorgBlockHeader.Height+1); err != nil {
return errors.Wrap(err, "failed to revert data")
}
i.currentBlock = currentBlockHeader
// Set current block to before reorg block and
// end current round to fetch again
i.currentBlock = beforeReorgBlockHeader
return nil
}
}

View File

@@ -19,6 +19,20 @@ type BlockHeader struct {
Nonce uint32
}
func ParseMsgBlockHeader(src wire.BlockHeader, height int64) BlockHeader {
hash := src.BlockHash()
return BlockHeader{
Hash: hash,
Height: height,
Version: src.Version,
PrevBlock: src.PrevBlock,
MerkleRoot: src.MerkleRoot,
Timestamp: src.Timestamp,
Bits: src.Bits,
Nonce: src.Nonce,
}
}
type Block struct {
Header BlockHeader
Transactions []*Transaction
@@ -27,16 +41,7 @@ type Block struct {
func ParseMsgBlock(src *wire.MsgBlock, height int64) *Block {
hash := src.Header.BlockHash()
return &Block{
Header: BlockHeader{
Hash: hash,
Height: height,
Version: src.Header.Version,
PrevBlock: src.Header.PrevBlock,
MerkleRoot: src.Header.MerkleRoot,
Timestamp: src.Header.Timestamp,
Bits: src.Header.Bits,
Nonce: src.Header.Nonce,
},
Header: ParseMsgBlockHeader(src.Header, height),
Transactions: lo.Map(src.Transactions, func(item *wire.MsgTx, index int) *Transaction { return ParseMsgTx(item, height, hash, uint32(index)) }),
}
}

View File

@@ -8,7 +8,8 @@ import (
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/internal/postgres"
btcconfig "github.com/gaze-network/indexer-network/modules/bitcoin/config"
runesconfig "github.com/gaze-network/indexer-network/modules/runes/config"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/spf13/pflag"
@@ -33,7 +34,7 @@ type Config struct {
Logger logger.Config `mapstructure:"logger"`
BitcoinNode BitcoinNodeClient `mapstructure:"bitcoin_node"`
Network common.Network `mapstructure:"network"`
Modules map[string]Module `mapstructure:"modules"`
Modules Modules `mapstructure:"modules"`
}
type BitcoinNodeClient struct {
@@ -43,8 +44,9 @@ type BitcoinNodeClient struct {
DisableTLS bool `mapstructure:"disable_tls"`
}
type Module struct {
Postgres postgres.Config `mapstructure:"postgres"`
type Modules struct {
Bitcoin btcconfig.Config `mapstructure:"bitcoin"`
Runes runesconfig.Config `mapstructure:"runes"`
}
// Parse parse the configuration from environment variables

View File

@@ -0,0 +1,8 @@
package config
import "github.com/gaze-network/indexer-network/internal/postgres"
type Config struct {
Database string `mapstructure:"database"` // Database to store bitcoin data.
Postgres postgres.Config `mapstructure:"postgresql"`
}

View File

@@ -0,0 +1,9 @@
package config
import "github.com/gaze-network/indexer-network/internal/postgres"
type Config struct {
Datasource string `mapstructure:"datasource"` // Datasource to fetch bitcoin data for Meta-Protocol e.g. `bitcoin-node` | `database`
Database string `mapstructure:"database"` // Database to store runes data.
Postgres postgres.Config `mapstructure:"postgresql"`
}