Merge remote-tracking branch 'origin/feature/bitcoin-indexer' into feat/runes-module

This commit is contained in:
Gaze
2024-04-17 22:59:33 +07:00
8 changed files with 131 additions and 16 deletions

View File

@@ -122,7 +122,10 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
// Initialize Bitcoin Indexer
if opts.Bitcoin {
var db btcdatagateway.BitcoinDataGateway
var (
btcDB btcdatagateway.BitcoinDataGateway
indexerInfoDB btcdatagateway.IndexerInformationDataGateway
)
switch strings.ToLower(conf.Modules.Bitcoin.Database) {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
@@ -130,14 +133,21 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
db = btcpostgres.NewRepository(pg)
repo := btcpostgres.NewRepository(pg)
btcDB = repo
indexerInfoDB = repo
default:
return errors.Wrapf(errs.Unsupported, "%q database is not supported", conf.Modules.Bitcoin.Database)
}
bitcoinProcessor := bitcoin.NewProcessor(db)
bitcoinProcessor := bitcoin.NewProcessor(conf, btcDB, indexerInfoDB)
bitcoinNodeDatasource := datasources.NewBitcoinNode(client)
bitcoinIndexer := indexers.NewBitcoinIndexer(bitcoinProcessor, bitcoinNodeDatasource)
// Verify states before running Indexer
if err := bitcoinProcessor.VerifyStates(ctx); err != nil {
return errors.WithStack(err)
}
// Run Indexer
go func() {
logger.InfoContext(ctx, "Starting Bitcoin Indexer")

View File

@@ -87,4 +87,10 @@ var (
// inherited error from Overflow,
// so errors.Is(err, Overflow) == true
OverflowUint128 = errors.WrapWithDepth(depth, Overflow, "overflow uint128")
// InvalidState is returned when a state is invalid
InvalidState = errors.NewWithDepth(depth, "invalid state")
// ConflictSetting is returned when an indexer setting is conflicted
ConflictSetting = errors.NewWithDepth(depth, "conflict setting")
)

View File

@@ -14,16 +14,16 @@ var supportedNetworks = map[Network]struct{}{
NetworkTestnet: {},
}
func (n Network) IsSupported() bool {
_, ok := supportedNetworks[n]
return ok
}
var chainParams = map[Network]*chaincfg.Params{
NetworkMainnet: &chaincfg.MainNetParams,
NetworkTestnet: &chaincfg.TestNet3Params,
}
func (n Network) IsSupported() bool {
_, ok := supportedNetworks[n]
return ok
}
func (n Network) ChainParams() *chaincfg.Params {
return chainParams[n]
}

View File

@@ -17,6 +17,10 @@ import (
"github.com/samber/lo"
)
const (
blockStreamChunkSize = 2
)
// Make sure to implement the BitcoinDatasource interface
var _ Datasource[[]*types.Block] = (*BitcoinNodeDatasource)(nil)
@@ -141,7 +145,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
go func() {
defer stream.Close()
done := subscription.Done()
chunks := lo.Chunk(blockHeights, 10)
chunks := lo.Chunk(blockHeights, blockStreamChunkSize)
for _, chunk := range chunks {
chunk := chunk
select {

View File

@@ -24,4 +24,8 @@ type Processor[T any] interface {
// RevertData revert synced data to the specified block height for re-indexing.
RevertData(ctx context.Context, from int64) error
// VerifyStates verifies the states of the indexed data and the indexer
// to ensure the last shutdown was graceful and no missing data.
VerifyStates(ctx context.Context) error
}

View File

@@ -1,9 +1,13 @@
package datagateway
import "context"
import (
"context"
"github.com/gaze-network/indexer-network/common"
)
type IndexerInformationDataGateway interface {
GetCurrentDBVersion(ctx context.Context) (int, error)
GetCurrentIndexerStats(ctx context.Context) (clientVersion string, network string, err error)
UpdateIndexerStats(ctx context.Context, clientVersion string, network string) error
GetCurrentDBVersion(ctx context.Context) (int32, error)
GetLatestIndexerStats(ctx context.Context) (version string, network common.Network, err error)
UpdateIndexerStats(ctx context.Context, clientVersion string, network common.Network) error
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/indexers"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/config"
"github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
@@ -19,12 +20,16 @@ import (
var _ indexers.BitcoinProcessor = (*Processor)(nil)
type Processor struct {
bitcoinDg datagateway.BitcoinDataGateway
config config.Config
bitcoinDg datagateway.BitcoinDataGateway
indexerInfoDg datagateway.IndexerInformationDataGateway
}
func NewProcessor(bitcoinDg datagateway.BitcoinDataGateway) *Processor {
func NewProcessor(config config.Config, bitcoinDg datagateway.BitcoinDataGateway, indexerInfoDg datagateway.IndexerInformationDataGateway) *Processor {
return &Processor{
bitcoinDg: bitcoinDg,
config: config,
bitcoinDg: bitcoinDg,
indexerInfoDg: indexerInfoDg,
}
}
@@ -97,3 +102,41 @@ func (p *Processor) RevertData(ctx context.Context, from int64) error {
}
return nil
}
func (p *Processor) VerifyStates(ctx context.Context) error {
// Check current db version with the required db version
{
dbVersion, err := p.indexerInfoDg.GetCurrentDBVersion(ctx)
if err != nil {
return errors.Wrap(err, "can't get current db version")
}
if dbVersion != DBVersion {
return errors.Wrapf(errs.ConflictSetting, "db version mismatch, please upgrade to version %d", DBVersion)
}
}
// Check if the latest indexed network is mismatched with configured network
{
_, network, err := p.indexerInfoDg.GetLatestIndexerStats(ctx)
if err != nil {
if errors.Is(err, errs.NotFound) {
goto end
}
return errors.Wrap(err, "can't get latest indexer stats")
}
if network != p.config.Network {
return errors.Wrapf(errs.ConflictSetting, "network mismatch, latest indexed network: %q, configured network: %q. If you want to change the network, please reset the database", network, p.config.Network)
}
}
// TODO: Verify the states of the indexed data to ensure the last shutdown was graceful and no missing data.
end:
if err := p.indexerInfoDg.UpdateIndexerStats(ctx, Version, p.config.Network); err != nil {
return errors.Wrap(err, "can't update indexer stats")
}
return nil
}

View File

@@ -0,0 +1,44 @@
package postgres
import (
"context"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
"github.com/gaze-network/indexer-network/modules/bitcoin/repository/postgres/gen"
"github.com/jackc/pgx/v5"
)
// Make sure Repository implements the IndexerInformationDataGateway interface
var _ datagateway.IndexerInformationDataGateway = (*Repository)(nil)
func (r *Repository) GetCurrentDBVersion(ctx context.Context) (int32, error) {
version, err := r.queries.GetCurrentDBVersion(ctx)
if err != nil {
return 0, errors.WithStack(err)
}
return version, nil
}
func (r *Repository) GetLatestIndexerStats(ctx context.Context) (string, common.Network, error) {
stats, err := r.queries.GetCurrentIndexerStats(ctx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return "", "", errors.Join(errs.NotFound, err)
}
return "", "", errors.WithStack(err)
}
return stats.ClientVersion, common.Network(stats.Network), nil
}
func (r *Repository) UpdateIndexerStats(ctx context.Context, clientVersion string, network common.Network) error {
if err := r.queries.UpdateIndexerStats(ctx, gen.UpdateIndexerStatsParams{
ClientVersion: clientVersion,
Network: network.String(),
}); err != nil {
return errors.WithStack(err)
}
return nil
}