Merge branch 'feature/bitcoin-indexer' into feat/runes-module

This commit is contained in:
Gaze
2024-04-17 22:39:50 +07:00
15 changed files with 126 additions and 63 deletions

View File

@@ -68,6 +68,9 @@ linters-settings:
misspell:
locale: US
ignore-words: []
errcheck:
exclude-functions:
- (github.com/jackc/pgx/v5.Tx).Rollback
wrapcheck:
ignoreSigs:
- .Errorf(

4
common/bitcoin.go Normal file
View File

@@ -0,0 +1,4 @@
package common
// HalvingInterval is the number of blocks between each halving event.
const HalvingInterval = 210_000

12
common/hash.go Normal file
View File

@@ -0,0 +1,12 @@
package common
import (
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/chaincfg/chainhash"
)
// Zero value of chainhash.Hash
var (
ZeroHash = *utils.Must(chainhash.NewHashFromStr("0000000000000000000000000000000000000000000000000000000000000000"))
NullHash = ZeroHash
)

View File

@@ -1,5 +1,7 @@
package common
import "github.com/btcsuite/btcd/chaincfg"
type Network string
const (
@@ -17,9 +19,15 @@ func (n Network) IsSupported() bool {
return ok
}
var chainParams = map[Network]*chaincfg.Params{
NetworkMainnet: &chaincfg.MainNetParams,
NetworkTestnet: &chaincfg.TestNet3Params,
}
func (n Network) ChainParams() *chaincfg.Params {
return chainParams[n]
}
func (n Network) String() string {
return string(n)
}
// HalvingInterval is the number of blocks between each halving event.
const HalvingInterval = 210_000

View File

@@ -50,14 +50,15 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
// set to -1 to start from genesis block
i.currentBlock, err = i.Processor.CurrentBlock(ctx)
if err != nil {
if errors.Is(err, errs.NotFound) {
i.currentBlock.Height = -1
if !errors.Is(err, errs.NotFound) {
return errors.Wrap(err, "can't init state, failed to get indexer current block")
}
return errors.Wrap(err, "can't init state, failed to get indexer current block")
i.currentBlock.Height = -1
}
// TODO:
// - compare db version in constants and database
// - compare current network and local indexed network
// - update indexer stats
ticker := time.NewTicker(10 * time.Second)
@@ -67,8 +68,6 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
case <-ctx.Done():
return nil
case <-ticker.C:
ctx := logger.WithContext(ctx, slog.Int64("current_block_height", i.currentBlock.Height))
if err := i.process(ctx); err != nil {
logger.ErrorContext(ctx, "failed to process", slogx.Error(err))
return errors.Wrap(err, "failed to process")
@@ -79,7 +78,7 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
ch := make(chan []*types.Block)
logger.InfoContext(ctx, "[BitcoinIndexer] fetching blocks", slog.Int64("from", i.currentBlock.Height+1), slog.Int64("to", -1))
logger.InfoContext(ctx, "Fetching blocks", slog.Int64("from", i.currentBlock.Height+1), slog.Int64("to", -1))
subscription, err := i.Datasource.FetchAsync(ctx, i.currentBlock.Height+1, -1, ch)
if err != nil {
return errors.Wrap(err, "failed to call fetch async")
@@ -98,7 +97,7 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
{
remoteBlockHeader := blocks[0].Header
if !remoteBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
logger.WarnContext(ctx, "reorg detected",
logger.WarnContext(ctx, "Reorg detected",
slogx.Stringer("current_hash", i.currentBlock.Hash),
slogx.Stringer("expected_hash", remoteBlockHeader.PrevBlock),
)

2
go.mod
View File

@@ -12,7 +12,7 @@ require (
github.com/gofiber/fiber/v2 v2.52.4
github.com/jackc/pgx/v5 v5.5.5
github.com/mcosta74/pgx-slog v0.3.0
github.com/planxnx/concurrent-stream v0.1.3
github.com/planxnx/concurrent-stream v0.1.5
github.com/samber/lo v1.39.0
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5

2
go.sum
View File

@@ -148,6 +148,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/planxnx/concurrent-stream v0.1.3 h1:zEW7hHUibCjEdp9iWNqn/eb7u9qWwSupsYIg4QvU3Co=
github.com/planxnx/concurrent-stream v0.1.3/go.mod h1:vxnW2qxkCLppMo5+Zns3b5/CiVxYQjXRLVFGJ9xvkXk=
github.com/planxnx/concurrent-stream v0.1.5 h1:qSMM27m7AApvalS0rSmovxOtDCnLy0/HinYJPe3oQfQ=
github.com/planxnx/concurrent-stream v0.1.5/go.mod h1:vxnW2qxkCLppMo5+Zns3b5/CiVxYQjXRLVFGJ9xvkXk=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

View File

@@ -1,5 +1,17 @@
package bitcoin
const (
Version = "v0.0.1"
import (
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/core/types"
)
const (
Version = "v0.0.1"
DBVersion = 1
)
// DefaultCurrentBlockHeight is the default value for the current block height for first time indexing
var defaultCurrentBlock = types.BlockHeader{
Hash: common.ZeroHash,
Height: -1,
}

View File

@@ -25,8 +25,8 @@ CREATE TABLE IF NOT EXISTS "bitcoin_blocks" (
"merkle_root" TEXT NOT NULL,
"prev_block_hash" TEXT NOT NULL,
"timestamp" TIMESTAMP WITH TIME ZONE NOT NULL,
"bits" INT NOT NULL,
"nonce" INT NOT NULL
"bits" BIGINT NOT NULL,
"nonce" BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS bitcoin_blocks_block_hash_idx ON "bitcoin_blocks" USING HASH ("block_hash");
@@ -34,10 +34,10 @@ CREATE INDEX IF NOT EXISTS bitcoin_blocks_block_hash_idx ON "bitcoin_blocks" USI
CREATE TABLE IF NOT EXISTS "bitcoin_transactions" (
"tx_hash" TEXT NOT NULL PRIMARY KEY,
"version" INT NOT NULL,
"locktime" INT NOT NULL,
"locktime" BIGINT NOT NULL,
"block_height" INT NOT NULL,
"block_hash" TEXT NOT NULL,
"idx" SMALLINT NOT NULL
"idx" INT NOT NULL
);
CREATE INDEX IF NOT EXISTS bitcoin_transactions_block_height_idx ON "bitcoin_transactions" USING BTREE ("block_height");
@@ -45,7 +45,7 @@ CREATE INDEX IF NOT EXISTS bitcoin_transactions_block_hash_idx ON "bitcoin_trans
CREATE TABLE IF NOT EXISTS "bitcoin_transaction_txouts" (
"tx_hash" TEXT NOT NULL,
"tx_idx" SMALLINT NOT NULL,
"tx_idx" INT NOT NULL,
"pkscript" TEXT NOT NULL, -- Hex String
"value" BIGINT NOT NULL,
"is_spent" BOOLEAN NOT NULL DEFAULT false,
@@ -56,16 +56,16 @@ CREATE INDEX IF NOT EXISTS bitcoin_transaction_txouts_pkscript_idx ON "bitcoin_t
CREATE TABLE IF NOT EXISTS "bitcoin_transaction_txins" (
"tx_hash" TEXT NOT NULL,
"tx_idx" SMALLINT NOT NULL,
"tx_idx" INT NOT NULL,
"prevout_tx_hash" TEXT NOT NULL,
"prevout_tx_idx" SMALLINT NOT NULL,
"prevout_pkscript" TEXT NOT NULL, -- Hex String
"prevout_tx_idx" INT NOT NULL,
"prevout_pkscript" TEXT NULL, -- Hex String, Can be NULL if the prevout is a coinbase transaction
"scriptsig" TEXT NOT NULL, -- Hex String
"witness" TEXT, -- Hex String
"sequence" INT NOT NULL,
"sequence" BIGINT NOT NULL,
PRIMARY KEY ("tx_hash", "tx_idx")
);
CREATE UNIQUE INDEX IF NOT EXISTS bitcoin_transaction_txins_prevout_idx ON "bitcoin_transaction_txins" USING BTREE ("prevout_tx_hash", "prevout_tx_idx");
CREATE INDEX IF NOT EXISTS bitcoin_transaction_txins_prevout_idx ON "bitcoin_transaction_txins" USING BTREE ("prevout_tx_hash", "prevout_tx_idx");
COMMIT;

View File

@@ -7,6 +7,7 @@ import (
"slices"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/indexers"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
@@ -41,7 +42,7 @@ func (p *Processor) Process(ctx context.Context, inputs []*types.Block) error {
return cmp.Compare(t1.Header.Height, t2.Header.Height)
})
latestBlock, err := p.bitcoinDg.GetLatestBlockHeader(ctx)
latestBlock, err := p.CurrentBlock(ctx)
if err != nil {
return errors.Wrap(err, "failed to get latest indexed block header")
}
@@ -74,13 +75,20 @@ func (p *Processor) Process(ctx context.Context, inputs []*types.Block) error {
func (p *Processor) CurrentBlock(ctx context.Context) (types.BlockHeader, error) {
b, err := p.bitcoinDg.GetLatestBlockHeader(ctx)
if err != nil {
if errors.Is(err, errs.NotFound) {
return defaultCurrentBlock, nil
}
return types.BlockHeader{}, errors.WithStack(err)
}
return b, nil
}
func (p *Processor) GetIndexedBlock(ctx context.Context, height int64) (types.BlockHeader, error) {
return types.BlockHeader{}, nil
header, err := p.bitcoinDg.GetBlockHeaderByHeight(ctx, height)
if err != nil {
return types.BlockHeader{}, errors.WithStack(err)
}
return header, nil
}
func (p *Processor) RevertData(ctx context.Context, from int64) error {

View File

@@ -4,14 +4,19 @@ import (
"context"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/bitcoin/repository/postgres/gen"
"github.com/jackc/pgx/v5"
"github.com/samber/lo"
)
func (r *Repository) GetLatestBlockHeader(ctx context.Context) (types.BlockHeader, error) {
model, err := r.queries.GetLatestBlockHeader(ctx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return types.BlockHeader{}, errors.Join(errs.NotFound, err)
}
return types.BlockHeader{}, errors.Wrap(err, "failed to get latest block header")
}
@@ -30,23 +35,16 @@ func (r *Repository) InsertBlock(ctx context.Context, block *types.Block) error
if err != nil {
return errors.Wrap(err, "failed to begin transaction")
}
defer func() {
if r := recover(); r != nil {
_ = tx.Rollback(ctx)
panic(r) // re-throw panic after rollback
}
}()
defer tx.Rollback(ctx)
queries := r.queries.WithTx(tx)
if err := queries.InsertBlock(ctx, blockParams); err != nil {
_ = tx.Rollback(ctx)
return errors.Wrapf(err, "failed to insert block, height: %d, hash: %s", blockParams.BlockHeight, blockParams.BlockHash)
}
for _, params := range txParams {
if err := queries.InsertTransaction(ctx, params); err != nil {
_ = tx.Rollback(ctx)
return errors.Wrapf(err, "failed to insert transaction, hash: %s", params.TxHash)
}
}
@@ -55,23 +53,28 @@ func (r *Repository) InsertBlock(ctx context.Context, block *types.Block) error
// Because txin references txout
for _, params := range txoutParams {
if err := queries.InsertTransactionTxOut(ctx, params); err != nil {
_ = tx.Rollback(ctx)
return errors.Wrapf(err, "failed to insert transaction txout, %v:%v", params.TxHash, params.TxIdx)
}
}
for _, params := range txinParams {
if err := queries.InsertTransactionTxIn(ctx, params); err != nil {
_ = tx.Rollback(ctx)
return errors.Wrapf(err, "failed to insert transaction txin, %v:%v", params.TxHash, params.TxIdx)
}
}
if err := tx.Commit(ctx); err != nil {
return errors.Wrap(err, "failed to commit transaction")
}
return nil
}
func (r *Repository) RevertBlocks(ctx context.Context, from int64) error {
if err := r.queries.RevertData(ctx, int32(from)); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil
}
return errors.Wrap(err, "failed to revert data")
}
return nil
@@ -80,6 +83,9 @@ func (r *Repository) RevertBlocks(ctx context.Context, from int64) error {
func (r *Repository) GetBlockHeaderByHeight(ctx context.Context, blockHeight int64) (types.BlockHeader, error) {
blockModel, err := r.queries.GetBlockByHeight(ctx, int32(blockHeight))
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return types.BlockHeader{}, errors.Join(errs.NotFound, err)
}
return types.BlockHeader{}, errors.Wrap(err, "failed to get block by height")
}
@@ -99,6 +105,10 @@ func (r *Repository) GetBlocksByHeightRange(ctx context.Context, from int64, to
return nil, errors.Wrap(err, "failed to get blocks by height range")
}
if len(blocks) == 0 {
return []*types.Block{}, nil
}
txs, err := r.queries.GetTransactionsByHeightRange(ctx, gen.GetTransactionsByHeightRangeParams{
FromHeight: int32(from),
ToHeight: int32(to),

View File

@@ -217,8 +217,8 @@ type InsertBlockParams struct {
MerkleRoot string
PrevBlockHash string
Timestamp pgtype.Timestamptz
Bits int32
Nonce int32
Bits int64
Nonce int64
}
func (q *Queries) InsertBlock(ctx context.Context, arg InsertBlockParams) error {
@@ -242,10 +242,10 @@ INSERT INTO bitcoin_transactions ("tx_hash","version","locktime","block_height",
type InsertTransactionParams struct {
TxHash string
Version int32
Locktime int32
Locktime int64
BlockHeight int32
BlockHash string
Idx int16
Idx int32
}
func (q *Queries) InsertTransaction(ctx context.Context, arg InsertTransactionParams) error {
@@ -273,12 +273,12 @@ VALUES ($1, $2, $3, $4, (SELECT "pkscript" FROM update_txout), $5, $6, $7)
type InsertTransactionTxInParams struct {
TxHash string
TxIdx int16
TxIdx int32
PrevoutTxHash string
PrevoutTxIdx int16
PrevoutTxIdx int32
Scriptsig string
Witness pgtype.Text
Sequence int32
Sequence int64
}
func (q *Queries) InsertTransactionTxIn(ctx context.Context, arg InsertTransactionTxInParams) error {
@@ -300,7 +300,7 @@ INSERT INTO bitcoin_transaction_txouts ("tx_hash","tx_idx","pkscript","value") V
type InsertTransactionTxOutParams struct {
TxHash string
TxIdx int16
TxIdx int32
Pkscript string
Value int64
}

View File

@@ -15,8 +15,8 @@ type BitcoinBlock struct {
MerkleRoot string
PrevBlockHash string
Timestamp pgtype.Timestamptz
Bits int32
Nonce int32
Bits int64
Nonce int64
}
type BitcoinIndexerDbVersion struct {
@@ -35,26 +35,26 @@ type BitcoinIndexerStat struct {
type BitcoinTransaction struct {
TxHash string
Version int32
Locktime int32
Locktime int64
BlockHeight int32
BlockHash string
Idx int16
Idx int32
}
type BitcoinTransactionTxin struct {
TxHash string
TxIdx int16
TxIdx int32
PrevoutTxHash string
PrevoutTxIdx int16
PrevoutPkscript string
PrevoutTxIdx int32
PrevoutPkscript pgtype.Text
Scriptsig string
Witness pgtype.Text
Sequence int32
Sequence int64
}
type BitcoinTransactionTxout struct {
TxHash string
TxIdx int16
TxIdx int32
Pkscript string
Value int64
IsSpent bool

View File

@@ -25,8 +25,8 @@ func mapBlockHeaderTypeToModel(src types.BlockHeader) gen.BitcoinBlock {
Time: src.Timestamp,
Valid: true,
},
Bits: int32(src.Bits),
Nonce: int32(src.Nonce),
Bits: int64(src.Bits),
Nonce: int64(src.Nonce),
}
}
@@ -69,17 +69,17 @@ func mapBlockTypeToParams(src *types.Block) (gen.InsertBlockParams, []gen.Insert
Time: src.Header.Timestamp,
Valid: true,
},
Bits: int32(src.Header.Bits),
Nonce: int32(src.Header.Nonce),
Bits: int64(src.Header.Bits),
Nonce: int64(src.Header.Nonce),
}
for txIdx, srcTx := range src.Transactions {
tx := gen.InsertTransactionParams{
TxHash: srcTx.TxHash.String(),
Version: srcTx.Version,
Locktime: int32(srcTx.LockTime),
Locktime: int64(srcTx.LockTime),
BlockHeight: int32(src.Header.Height),
BlockHash: src.Header.Hash.String(),
Idx: int16(txIdx),
Idx: int32(txIdx),
}
txs = append(txs, tx)
@@ -93,19 +93,19 @@ func mapBlockTypeToParams(src *types.Block) (gen.InsertBlockParams, []gen.Insert
}
txins = append(txins, gen.InsertTransactionTxInParams{
TxHash: tx.TxHash,
TxIdx: int16(idx),
TxIdx: int32(idx),
PrevoutTxHash: txin.PreviousOutTxHash.String(),
PrevoutTxIdx: int16(txin.PreviousOutIndex),
PrevoutTxIdx: int32(txin.PreviousOutIndex),
Scriptsig: hex.EncodeToString(txin.SignatureScript),
Witness: witness,
Sequence: int32(txin.Sequence),
Sequence: int64(txin.Sequence),
})
}
for idx, txout := range srcTx.TxOut {
txouts = append(txouts, gen.InsertTransactionTxOutParams{
TxHash: tx.TxHash,
TxIdx: int16(idx),
TxIdx: int32(idx),
Pkscript: hex.EncodeToString(txout.PkScript),
Value: txout.Value,
})

View File

@@ -5,12 +5,17 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/jackc/pgx/v5"
)
func (r *Repository) GetTransactionByHash(ctx context.Context, txHash chainhash.Hash) (*types.Transaction, error) {
model, err := r.queries.GetTransactionByHash(ctx, txHash.String())
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, errors.Join(errs.NotFound, err)
}
return nil, errors.Wrap(err, "failed to get transaction by hash")
}
txIns, err := r.queries.GetTransactionTxInsByTxHashes(ctx, []string{txHash.String()})
@@ -18,7 +23,7 @@ func (r *Repository) GetTransactionByHash(ctx context.Context, txHash chainhash.
return nil, errors.Wrap(err, "failed to get transaction txins by tx hashes")
}
txOuts, err := r.queries.GetTransactionTxOutsByTxHashes(ctx, []string{txHash.String()})
if err != nil {
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return nil, errors.Wrap(err, "failed to get transaction txouts by tx hashes")
}