Merge branch 'feature/bitcoin-indexer' into feature/cli-cmd

This commit is contained in:
Gaze
2024-04-15 20:03:28 +07:00
8 changed files with 65 additions and 33 deletions

View File

@@ -43,9 +43,16 @@ func main() {
}
defer client.Shutdown()
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
if err := client.Ping(); err != nil {
logger.PanicContext(ctx, "Failed to ping Bitcoin Core RPC Server", slogx.Error(err))
}
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server", slogx.String("host", conf.BitcoinNode.Host))
// Validate network
if !conf.Network.IsSupported() {
logger.PanicContext(ctx, "Unsupported network", slogx.String("network", conf.Network.String()))
}
// Initialize Bitcoin Indexer
{
@@ -59,7 +66,7 @@ func main() {
bitcoinNodeDatasource := datasources.NewBitcoinNode(client)
bitcoinIndexer := indexers.NewBitcoinIndexer(bitcoinProcessor, bitcoinNodeDatasource)
// Run Indexers
// Run Indexer
go func() {
if err := bitcoinIndexer.Run(ctx); err != nil {
logger.ErrorContext(ctx, "Failed to run Bitcoin Indexer", slogx.Error(err))

View File

@@ -7,6 +7,16 @@ const (
NetworkTestnet Network = "testnet"
)
var supportedNetworks = map[Network]struct{}{
NetworkMainnet: {},
NetworkTestnet: {},
}
func (n Network) IsSupported() bool {
_, ok := supportedNetworks[n]
return ok
}
func (n Network) String() string {
return string(n)
}

View File

@@ -8,12 +8,14 @@ bitcoin_node:
pass: "pass"
disable_tls: false
network: mainnet
modules:
bitcoin:
postgresql:
postgres:
host: "localhost"
port: 5432
user: "postgres"
pass: "password"
db: "postgres"
password: "password"
db_name: "postgres"
# url: "postgres://postgres:password@localhost:5432/postgres?sslmode=prefer" # [Optional] This will override other database connection configurations

View File

@@ -3,6 +3,7 @@ package datasources
import (
"context"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/core/types"
@@ -164,6 +165,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
}
}
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched block", slogx.Int64("height", height), slogx.String("hash", hash.String()))
blocks = append(blocks, types.ParseMsgBlock(block, height))
}
@@ -205,3 +207,13 @@ func (d *BitcoinNodeDatasource) prepareRange(fromHeight, toHeight int64) (start,
return start, end, false, nil
}
// GetTransaction fetch transaction from Bitcoin node
func (d *BitcoinNodeDatasource) GetTransaction(ctx context.Context, txHash chainhash.Hash) (*types.Transaction, error) {
rawTx, err := d.btcclient.GetRawTransaction(&txHash)
if err != nil {
return nil, errors.Wrap(err, "failed to get raw transaction")
}
msgTx := rawTx.MsgTx()
return types.ParseMsgTx(msgTx, 0, chainhash.Hash{}, 0), nil
}

View File

@@ -33,13 +33,17 @@ type ClientSubscription[T any] struct {
}
func newClientSubscription[T any](channel chan<- T) *ClientSubscription[T] {
return &ClientSubscription[T]{
subscription := &ClientSubscription[T]{
channel: channel,
in: make(chan T, ClientSubscriptionBufferSize),
err: make(chan error, ClientSubscriptionBufferSize),
quit: make(chan struct{}),
quitDone: make(chan struct{}),
}
go func() {
subscription.run()
}()
return subscription
}
func (c *ClientSubscription[T]) Unsubscribe() {

View File

@@ -59,7 +59,7 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
case <-ctx.Done():
return nil
case <-ticker.C:
ctx = logger.WithContext(ctx, slog.Int64("current_block_height", i.currentBlock.Height))
ctx := logger.WithContext(ctx, slog.Int64("current_block_height", i.currentBlock.Height))
if err := i.process(ctx); err != nil {
logger.ErrorContext(ctx, "failed to process", slogx.Error(err))
@@ -71,7 +71,8 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
ch := make(chan []*types.Block)
subscription, err := i.Datasource.FetchAsync(ctx, i.currentBlock.Height, -1, ch)
logger.InfoContext(ctx, "[BitcoinIndexer] fetching blocks", slog.Int64("from", i.currentBlock.Height+1), slog.Int64("to", -1))
subscription, err := i.Datasource.FetchAsync(ctx, i.currentBlock.Height+1, -1, ch)
if err != nil {
return errors.Wrap(err, "failed to call fetch async")
}
@@ -85,13 +86,13 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
continue
}
// validate reorg from current block
if blocks[0].Header.Height == i.currentBlock.Height {
// validate reorg from first block
{
currentBlockHeader := blocks[0].Header
if !currentBlockHeader.Hash.IsEqual(&i.currentBlock.Hash) {
if !currentBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
logger.WarnContext(ctx, "reorg detected",
slogx.Stringer("current_hash", i.currentBlock.Hash),
slogx.Stringer("expected_hash", currentBlockHeader.Hash),
slogx.Stringer("expected_hash", currentBlockHeader.PrevBlock),
)
// TODO: find start reorg block
@@ -104,19 +105,13 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
}
}
// remove old block
newBlocks := blocks[1:]
if len(newBlocks) == 0 {
continue
}
// validate is block is continuous and no reorg
for i := 1; i < len(newBlocks); i++ {
if newBlocks[i].Header.Height != newBlocks[i-1].Header.Height+1 {
return errors.Wrapf(errs.InternalError, "block is not continuous, block[%d] height: %d, block[%d] height: %d", i-1, newBlocks[i-1].Header.Height, i, newBlocks[i].Header.Height)
for i := 1; i < len(blocks); i++ {
if blocks[i].Header.Height != blocks[i-1].Header.Height+1 {
return errors.Wrapf(errs.InternalError, "block is not continuous, block[%d] height: %d, block[%d] height: %d", i-1, blocks[i-1].Header.Height, i, blocks[i].Header.Height)
}
if !newBlocks[i].Header.PrevBlock.IsEqual(&newBlocks[i-1].Header.Hash) {
if !blocks[i].Header.PrevBlock.IsEqual(&blocks[i-1].Header.Hash) {
logger.WarnContext(ctx, "reorg occurred while batch fetching blocks, need to try to fetch again")
// end current round
return nil
@@ -124,12 +119,12 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
}
// Start processing blocks
if err := i.Processor.Process(ctx, newBlocks); err != nil {
if err := i.Processor.Process(ctx, blocks); err != nil {
return errors.WithStack(err)
}
// Update current state
i.currentBlock = newBlocks[len(newBlocks)-1].Header
i.currentBlock = blocks[len(blocks)-1].Header
case <-subscription.Done():
// end current round
if err := ctx.Err(); err != nil {

View File

@@ -7,6 +7,7 @@ import (
"sync"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/internal/postgres"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
@@ -29,6 +30,7 @@ var (
type Config struct {
Logger logger.Config `mapstructure:"logger"`
BitcoinNode BitcoinNodeClient `mapstructure:"bitcoin_node"`
Network common.Network `mapstructure:"network"`
Modules map[string]Module `mapstructure:"modules"`
}

View File

@@ -20,18 +20,18 @@ const (
)
type Config struct {
Host string `env:"HOST"` // Default is 127.0.0.1
Port string `env:"PORT"` // Default is 5432
User string `env:"USER"` // Default is empty
Password string `env:"PASSWORD"` // Default is empty
DBName string `env:"DBNAME"` // Default is postgres
SSLMode string `env:"SSLMODE"` // Default is prefer
URL string `env:"URL"` // If URL is provided, other fields are ignored
Host string `mapstructure:"host"` // Default is 127.0.0.1
Port string `mapstructure:"port"` // Default is 5432
User string `mapstructure:"user"` // Default is empty
Password string `mapstructure:"password"` // Default is empty
DBName string `mapstructure:"db_name"` // Default is postgres
SSLMode string `mapstructure:"ssl_mode"` // Default is prefer
URL string `mapstructure:"url"` // If URL is provided, other fields are ignored
MaxConns int32 `env:"MAX_CONNS"` // Default is 16
MinConns int32 `env:"MIN_CONNS"` // Default is 0
MaxConns int32 `mapstructure:"max_conns"` // Default is 16
MinConns int32 `mapstructure:"min_conns"` // Default is 0
Debug bool `env:"DEBUG"`
Debug bool `mapstructure:"debug"`
}
// New creates a new connection to the database