feat: fix interface

This commit is contained in:
Gaze
2024-04-17 23:55:09 +07:00
parent c7f7d276e6
commit 3f96a762ef
10 changed files with 144 additions and 74 deletions

View File

@@ -164,6 +164,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
// Initialize Runes Indexer
if opts.Runes {
var runesDg runesdatagateway.RunesDataGateway
var indexerInfoDg runesdatagateway.IndexerInfoDataGateway
switch strings.ToLower(conf.Modules.Runes.Database) {
case "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
@@ -171,7 +172,9 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
runesDg = runespostgres.NewRepository(pg)
runesRepo := runespostgres.NewRepository(pg)
runesDg = runesRepo
indexerInfoDg = runesRepo
default:
logger.PanicContext(ctx, "Unsupported database", slogx.String("database", conf.Modules.Runes.Database))
}
@@ -196,11 +199,11 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
default:
return errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
}
runesProcessor := runes.NewProcessor(runesDg, bitcoinClient, bitcoinDatasource, conf.Network)
runesProcessor := runes.NewProcessor(runesDg, indexerInfoDg, bitcoinClient, bitcoinDatasource, conf.Network)
runesIndexer := indexers.NewBitcoinIndexer(runesProcessor, bitcoinDatasource)
if err := runesProcessor.Init(ctx); err != nil {
logger.PanicContext(ctx, "Failed to initialize Runes Processor", slogx.Error(err))
if err := runesProcessor.VerifyStates(ctx); err != nil {
return errors.WithStack(err)
}
// Run Indexer

View File

@@ -1,5 +1,24 @@
package runes
const (
Version = "v0.0.1"
import (
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/core/types"
)
const (
Version = "v0.0.1"
DBVersion = 1
EventHashVersion = 1
)
var startingBlockHeader = map[common.Network]types.BlockHeader{
// TODO: add starting block header for mainnet after block 840,000 is mined
common.NetworkMainnet: {},
common.NetworkTestnet: {
Height: 2583200,
Hash: *utils.Must(chainhash.NewHashFromStr("000000000006c5f0dfcd9e0e81f27f97a87aef82087ffe69cd3c390325bb6541")),
PrevBlock: *utils.Must(chainhash.NewHashFromStr("00000000000668f3bafac992f53424774515440cb47e1cb9e73af3f496139e28")),
},
}

View File

@@ -4,7 +4,7 @@ SELECT * FROM runes_indexer_state ORDER BY created_at DESC LIMIT 1;
-- name: SetIndexerState :exec
INSERT INTO runes_indexer_state (db_version, event_hash_version) VALUES ($1, $2);
-- name: GetCurrentIndexerStats :one
-- name: GetLatestIndexerStats :one
SELECT "client_version", "network" FROM runes_indexer_stats ORDER BY id DESC LIMIT 1;
-- name: UpdateIndexerStats :exec

View File

@@ -0,0 +1,15 @@
package datagateway
import (
"context"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
)
type IndexerInfoDataGateway interface {
GetLatestIndexerState(ctx context.Context) (entity.IndexerState, error)
GetLatestIndexerStats(ctx context.Context) (version string, network common.Network, err error)
SetIndexerState(ctx context.Context, state entity.IndexerState) error
UpdateIndexerStats(ctx context.Context, clientVersion string, network common.Network) error
}

View File

@@ -16,7 +16,6 @@ type RunesDataGateway interface {
}
type RunesReaderDataGateway interface {
GetLatestIndexerState(ctx context.Context) (entity.IndexerState, error)
GetLatestBlock(ctx context.Context) (types.BlockHeader, error)
GetIndexedBlockByHeight(ctx context.Context, height int64) (*entity.IndexedBlock, error)
GetRuneTransactionsByHeight(ctx context.Context, height uint64) ([]*entity.RuneTransaction, error)
@@ -50,8 +49,6 @@ type RunesWriterDataGateway interface {
// Rollback() must be safe to call even if no transaction is active. Hence, a defer Rollback() is safe, even if Commit() was called prior with non-error conditions.
Rollback(ctx context.Context) error
SetIndexerState(ctx context.Context, state entity.IndexerState) error
CreateRuneEntry(ctx context.Context, entry *runes.RuneEntry, blockHeight uint64) error
CreateRuneEntryState(ctx context.Context, entry *runes.RuneEntry, blockHeight uint64) error
CreateOutPointBalances(ctx context.Context, outPoint wire.OutPoint, balances map[runes.RuneId]uint128.Uint128, blockHeight uint64) error

View File

@@ -28,7 +28,7 @@ func (p *Processor) calculateEventHash(header types.BlockHeader) (chainhash.Hash
func (p *Processor) getHashPayload(header types.BlockHeader) ([]byte, error) {
var sb strings.Builder
sb.WriteString("payload:v" + strconv.Itoa(eventHashVersion) + ":")
sb.WriteString("payload:v" + strconv.Itoa(EventHashVersion) + ":")
sb.WriteString("blockHash:")
sb.Write(header.Hash[:])

View File

@@ -4,8 +4,6 @@ import (
"context"
"time"
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
@@ -24,13 +22,9 @@ import (
var _ indexers.BitcoinProcessor = (*Processor)(nil)
const (
dbVersion = 1
eventHashVersion = 1
)
type Processor struct {
runesDg datagateway.RunesDataGateway
indexerInfoDg datagateway.IndexerInfoDataGateway
bitcoinClient btcclient.Contract
bitcoinDataSource indexers.BitcoinDatasource
network common.Network
@@ -43,9 +37,10 @@ type Processor struct {
newRuneTxs []*entity.RuneTransaction
}
func NewProcessor(runesDg datagateway.RunesDataGateway, bitcoinClient btcclient.Contract, bitcoinDataSource indexers.BitcoinDatasource, network common.Network) *Processor {
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, bitcoinDataSource indexers.BitcoinDatasource, network common.Network) *Processor {
return &Processor{
runesDg: runesDg,
indexerInfoDg: indexerInfoDg,
bitcoinClient: bitcoinClient,
bitcoinDataSource: bitcoinDataSource,
network: network,
@@ -63,7 +58,7 @@ var (
ErrEventHashVersionMismatch = errors.New("event hash version mismatch: please reset db and reindex")
)
func (p *Processor) Init(ctx context.Context) error {
func (p *Processor) VerifyStates(ctx context.Context) error {
// TODO: ensure db is migrated
if err := p.ensureValidState(ctx); err != nil {
return errors.Wrap(err, "error during ensureValidState")
@@ -77,26 +72,41 @@ func (p *Processor) Init(ctx context.Context) error {
}
func (p *Processor) ensureValidState(ctx context.Context) error {
indexerState, err := p.runesDg.GetLatestIndexerState(ctx)
indexerState, err := p.indexerInfoDg.GetLatestIndexerState(ctx)
if err != nil && !errors.Is(err, errs.NotFound) {
return errors.Wrap(err, "failed to get latest indexer state")
}
// if not found, set indexer state
if errors.Is(err, errs.NotFound) {
if err := p.runesDg.SetIndexerState(ctx, entity.IndexerState{
DBVersion: dbVersion,
EventHashVersion: eventHashVersion,
if err := p.indexerInfoDg.SetIndexerState(ctx, entity.IndexerState{
DBVersion: DBVersion,
EventHashVersion: EventHashVersion,
}); err != nil {
return errors.Wrap(err, "failed to set indexer state")
}
} else {
if indexerState.DBVersion != dbVersion {
return errors.WithStack(ErrDBVersionMismatch)
if indexerState.DBVersion != DBVersion {
return errors.Wrapf(errs.ConflictSetting, "db version mismatch: current version is %d. Please upgrade to version %d", indexerState.DBVersion, DBVersion)
}
if indexerState.EventHashVersion != eventHashVersion {
return errors.WithStack(ErrEventHashVersionMismatch)
if indexerState.EventHashVersion != EventHashVersion {
// TODO: automate reset db instead of returning error
return errors.Wrapf(errs.ConflictSetting, "event version mismatch: current version is %d. Please reset rune's db first.", indexerState.EventHashVersion, EventHashVersion)
}
}
_, network, err := p.indexerInfoDg.GetLatestIndexerStats(ctx)
if err != nil && !errors.Is(err, errs.NotFound) {
return errors.Wrap(err, "failed to get latest indexer stats")
}
// if found, verify indexer stats
if err == nil {
if network != p.network {
return errors.Wrapf(errs.ConflictSetting, "network mismatch: latest indexed network is %d, configured network is %d. If you want to change the network, please reset the database", network, p.network)
}
}
if err := p.indexerInfoDg.UpdateIndexerStats(ctx, p.network.String(), p.network); err != nil {
return errors.Wrap(err, "failed to update indexer stats")
}
return nil
}
@@ -140,16 +150,6 @@ func (p *Processor) Name() string {
return "Runes"
}
var startingBlockHeader = map[common.Network]types.BlockHeader{
// TODO: add starting block header for mainnet after block 840,000 is mined
common.NetworkMainnet: {},
common.NetworkTestnet: {
Height: 2583200,
Hash: *utils.Must(chainhash.NewHashFromStr("000000000006c5f0dfcd9e0e81f27f97a87aef82087ffe69cd3c390325bb6541")),
PrevBlock: *utils.Must(chainhash.NewHashFromStr("00000000000668f3bafac992f53424774515440cb47e1cb9e73af3f496139e28")),
},
}
func (p *Processor) CurrentBlock(ctx context.Context) (types.BlockHeader, error) {
blockHeader, err := p.runesDg.GetLatestBlock(ctx)
if err != nil {

View File

@@ -9,22 +9,6 @@ import (
"context"
)
const getCurrentIndexerStats = `-- name: GetCurrentIndexerStats :one
SELECT "client_version", "network" FROM runes_indexer_stats ORDER BY id DESC LIMIT 1
`
type GetCurrentIndexerStatsRow struct {
ClientVersion string
Network string
}
func (q *Queries) GetCurrentIndexerStats(ctx context.Context) (GetCurrentIndexerStatsRow, error) {
row := q.db.QueryRow(ctx, getCurrentIndexerStats)
var i GetCurrentIndexerStatsRow
err := row.Scan(&i.ClientVersion, &i.Network)
return i, err
}
const getLatestIndexerState = `-- name: GetLatestIndexerState :one
SELECT id, db_version, event_hash_version, created_at FROM runes_indexer_state ORDER BY created_at DESC LIMIT 1
`
@@ -41,6 +25,22 @@ func (q *Queries) GetLatestIndexerState(ctx context.Context) (RunesIndexerState,
return i, err
}
const getLatestIndexerStats = `-- name: GetLatestIndexerStats :one
SELECT "client_version", "network" FROM runes_indexer_stats ORDER BY id DESC LIMIT 1
`
type GetLatestIndexerStatsRow struct {
ClientVersion string
Network string
}
func (q *Queries) GetLatestIndexerStats(ctx context.Context) (GetLatestIndexerStatsRow, error) {
row := q.db.QueryRow(ctx, getLatestIndexerStats)
var i GetLatestIndexerStatsRow
err := row.Scan(&i.ClientVersion, &i.Network)
return i, err
}
const setIndexerState = `-- name: SetIndexerState :exec
INSERT INTO runes_indexer_state (db_version, event_hash_version) VALUES ($1, $2)
`

View File

@@ -0,0 +1,56 @@
package postgres
import (
"context"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/modules/runes/datagateway"
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
"github.com/gaze-network/indexer-network/modules/runes/repository/postgres/gen"
"github.com/jackc/pgx/v5"
)
var _ datagateway.IndexerInfoDataGateway = (*Repository)(nil)
func (r *Repository) GetLatestIndexerState(ctx context.Context) (entity.IndexerState, error) {
indexerStateModel, err := r.getQueries().GetLatestIndexerState(ctx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return entity.IndexerState{}, errors.WithStack(errs.NotFound)
}
return entity.IndexerState{}, errors.Wrap(err, "error during query")
}
indexerState := mapIndexerStateModelToType(indexerStateModel)
return indexerState, nil
}
func (r *Repository) GetLatestIndexerStats(ctx context.Context) (string, common.Network, error) {
stats, err := r.getQueries().GetLatestIndexerStats(ctx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return "", "", errors.WithStack(errs.NotFound)
}
return "", "", errors.Wrap(err, "error during query")
}
return stats.ClientVersion, common.Network(stats.Network), nil
}
func (r *Repository) SetIndexerState(ctx context.Context, state entity.IndexerState) error {
params := mapIndexerStateTypeToParams(state)
if err := r.getQueries().SetIndexerState(ctx, params); err != nil {
return errors.Wrap(err, "error during exec")
}
return nil
}
func (r *Repository) UpdateIndexerStats(ctx context.Context, clientVersion string, network common.Network) error {
if err := r.getQueries().UpdateIndexerStats(ctx, gen.UpdateIndexerStatsParams{
ClientVersion: clientVersion,
Network: string(network),
}); err != nil {
return errors.Wrap(err, "error during exec")
}
return nil
}

View File

@@ -21,18 +21,6 @@ import (
var _ datagateway.RunesDataGateway = (*Repository)(nil)
func (r *Repository) GetLatestIndexerState(ctx context.Context) (entity.IndexerState, error) {
indexerStateModel, err := r.getQueries().GetLatestIndexerState(ctx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return entity.IndexerState{}, errors.WithStack(errs.NotFound)
}
return entity.IndexerState{}, errors.Wrap(err, "error during query")
}
indexerState := mapIndexerStateModelToType(indexerStateModel)
return indexerState, nil
}
// warning: GetLatestBlock currently returns a types.BlockHeader with only Height, Hash, and PrevBlock fields populated.
// This is because it is known that all usage of this function only requires these fields. In the future, we may want to populate all fields for type safety.
func (r *Repository) GetLatestBlock(ctx context.Context) (types.BlockHeader, error) {
@@ -247,14 +235,6 @@ func (r *Repository) GetBalanceByPkScriptAndRuneId(ctx context.Context, pkScript
return result, nil
}
func (r *Repository) SetIndexerState(ctx context.Context, state entity.IndexerState) error {
params := mapIndexerStateTypeToParams(state)
if err := r.getQueries().SetIndexerState(ctx, params); err != nil {
return errors.Wrap(err, "error during exec")
}
return nil
}
func (r *Repository) CreateRuneTransaction(ctx context.Context, tx *entity.RuneTransaction) error {
if tx == nil {
return nil