mirror of
https://github.com/alexgo-io/gaze-indexer.git
synced 2026-04-29 20:25:24 +08:00
Merge remote-tracking branch 'origin/feature/bitcoin-indexer' into feat/runes-module
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
"github.com/gaze-network/indexer-network/internal/subscription"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
cstream "github.com/planxnx/concurrent-stream"
|
||||
@@ -75,18 +76,18 @@ func (d *BitcoinNodeDatasource) Fetch(ctx context.Context, from, to int64) ([]*t
|
||||
//
|
||||
// - from: block height to start fetching, if -1, it will start from genesis block
|
||||
// - to: block height to stop fetching, if -1, it will fetch until the latest block
|
||||
func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*ClientSubscription[[]*types.Block], error) {
|
||||
func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
|
||||
from, to, skip, err := d.prepareRange(from, to)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to prepare fetch range")
|
||||
}
|
||||
|
||||
subscription := newClientSubscription(ch)
|
||||
subscription := subscription.NewSubscription(ch)
|
||||
if skip {
|
||||
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unsubscribe")
|
||||
}
|
||||
return subscription, nil
|
||||
return subscription.Client(), nil
|
||||
}
|
||||
|
||||
// Create parallel stream
|
||||
@@ -122,7 +123,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
}
|
||||
|
||||
// send blocks to subscription channel
|
||||
if err := subscription.send(ctx, data); err != nil {
|
||||
if err := subscription.Send(ctx, data); err != nil {
|
||||
logger.ErrorContext(ctx, "failed while dispatch block",
|
||||
slogx.Error(err),
|
||||
slogx.Int64("start", data[0].Header.Height),
|
||||
@@ -156,17 +157,19 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
hash, err := d.btcclient.GetBlockHash(height)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "failed to get block hash", slogx.Error(err), slogx.Int64("height", height))
|
||||
if err := subscription.sendError(ctx, errors.Wrapf(err, "failed to get block hash: height: %d", height)); err != nil {
|
||||
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get block hash: height: %d", height)); err != nil {
|
||||
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
block, err := d.btcclient.GetBlock(hash)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "failed to get block", slogx.Error(err), slogx.Int64("height", height))
|
||||
if err := subscription.sendError(ctx, errors.Wrapf(err, "failed to get block: height: %d, hash: %s", height, hash)); err != nil {
|
||||
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get block: height: %d, hash: %s", height, hash)); err != nil {
|
||||
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched block", slogx.Int64("height", height), slogx.String("hash", hash.String()))
|
||||
|
||||
@@ -178,7 +181,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
}
|
||||
}()
|
||||
|
||||
return subscription, nil
|
||||
return subscription.Client(), nil
|
||||
}
|
||||
|
||||
func (d *BitcoinNodeDatasource) prepareRange(fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
|
||||
|
||||
@@ -4,12 +4,13 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
"github.com/gaze-network/indexer-network/internal/subscription"
|
||||
)
|
||||
|
||||
// Datasource is an interface for indexer data sources.
|
||||
type Datasource[T any] interface {
|
||||
Name() string
|
||||
Fetch(ctx context.Context, from, to int64) (T, error)
|
||||
FetchAsync(ctx context.Context, from, to int64, ch chan<- T) (*ClientSubscription[T], error)
|
||||
FetchAsync(ctx context.Context, from, to int64, ch chan<- T) (*subscription.ClientSubscription[T], error)
|
||||
GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error)
|
||||
}
|
||||
|
||||
31
internal/subscription/client_subscription.go
Normal file
31
internal/subscription/client_subscription.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package subscription
|
||||
|
||||
import "context"
|
||||
|
||||
// ClientSubscription is a subscription that can be used by the client to unsubscribe from the subscription.
|
||||
type ClientSubscription[T any] struct {
|
||||
subscription *Subscription[T]
|
||||
}
|
||||
|
||||
func (c *ClientSubscription[T]) Unsubscribe() {
|
||||
c.subscription.Unsubscribe()
|
||||
}
|
||||
|
||||
func (c *ClientSubscription[T]) UnsubscribeWithContext(ctx context.Context) (err error) {
|
||||
return c.subscription.UnsubscribeWithContext(ctx)
|
||||
}
|
||||
|
||||
// Err returns the error channel of the subscription.
|
||||
func (c *ClientSubscription[T]) Err() <-chan error {
|
||||
return c.subscription.Err()
|
||||
}
|
||||
|
||||
// Done returns the done channel of the subscription
|
||||
func (c *ClientSubscription[T]) Done() <-chan struct{} {
|
||||
return c.subscription.Done()
|
||||
}
|
||||
|
||||
// IsClosed returns status of the subscription
|
||||
func (c *ClientSubscription[T]) IsClosed() bool {
|
||||
return c.subscription.IsClosed()
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package datasources
|
||||
package subscription
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -8,13 +8,13 @@ import (
|
||||
"github.com/gaze-network/indexer-network/common/errs"
|
||||
)
|
||||
|
||||
// ClientSubscriptionBufferSize is the buffer size of the subscription channel.
|
||||
// SubscriptionBufferSize is the buffer size of the subscription channel.
|
||||
// It is used to prevent blocking the client dispatcher when the client is slow to consume values.
|
||||
var ClientSubscriptionBufferSize = 8
|
||||
var SubscriptionBufferSize = 8
|
||||
|
||||
// ClientSubscription is a subscription to a stream of values from the client dispatcher.
|
||||
// Subscription is a subscription to a stream of values from the client dispatcher.
|
||||
// It has two channels: one for values, and one for errors.
|
||||
type ClientSubscription[T any] struct {
|
||||
type Subscription[T any] struct {
|
||||
// The channel which the subscription sends values.
|
||||
channel chan<- T
|
||||
|
||||
@@ -32,11 +32,11 @@ type ClientSubscription[T any] struct {
|
||||
quitDone chan struct{}
|
||||
}
|
||||
|
||||
func newClientSubscription[T any](channel chan<- T) *ClientSubscription[T] {
|
||||
subscription := &ClientSubscription[T]{
|
||||
func NewSubscription[T any](channel chan<- T) *Subscription[T] {
|
||||
subscription := &Subscription[T]{
|
||||
channel: channel,
|
||||
in: make(chan T, ClientSubscriptionBufferSize),
|
||||
err: make(chan error, ClientSubscriptionBufferSize),
|
||||
in: make(chan T, SubscriptionBufferSize),
|
||||
err: make(chan error, SubscriptionBufferSize),
|
||||
quit: make(chan struct{}),
|
||||
quitDone: make(chan struct{}),
|
||||
}
|
||||
@@ -46,15 +46,15 @@ func newClientSubscription[T any](channel chan<- T) *ClientSubscription[T] {
|
||||
return subscription
|
||||
}
|
||||
|
||||
func (c *ClientSubscription[T]) Unsubscribe() {
|
||||
_ = c.UnsubscribeWithContext(context.Background())
|
||||
func (s *Subscription[T]) Unsubscribe() {
|
||||
_ = s.UnsubscribeWithContext(context.Background())
|
||||
}
|
||||
|
||||
func (c *ClientSubscription[T]) UnsubscribeWithContext(ctx context.Context) (err error) {
|
||||
c.quiteOnce.Do(func() {
|
||||
func (s *Subscription[T]) UnsubscribeWithContext(ctx context.Context) (err error) {
|
||||
s.quiteOnce.Do(func() {
|
||||
select {
|
||||
case c.quit <- struct{}{}:
|
||||
<-c.quitDone
|
||||
case s.quit <- struct{}{}:
|
||||
<-s.quitDone
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
}
|
||||
@@ -62,31 +62,38 @@ func (c *ClientSubscription[T]) UnsubscribeWithContext(ctx context.Context) (err
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Client returns a client subscription for this subscription.
|
||||
func (s *Subscription[T]) Client() *ClientSubscription[T] {
|
||||
return &ClientSubscription[T]{
|
||||
subscription: s,
|
||||
}
|
||||
}
|
||||
|
||||
// Err returns the error channel of the subscription.
|
||||
func (c *ClientSubscription[T]) Err() <-chan error {
|
||||
return c.err
|
||||
func (s *Subscription[T]) Err() <-chan error {
|
||||
return s.err
|
||||
}
|
||||
|
||||
// Done returns the done channel of the subscription
|
||||
func (c *ClientSubscription[T]) Done() <-chan struct{} {
|
||||
return c.quitDone
|
||||
func (s *Subscription[T]) Done() <-chan struct{} {
|
||||
return s.quitDone
|
||||
}
|
||||
|
||||
// IsClosed returns status of the subscription
|
||||
func (c *ClientSubscription[T]) IsClosed() bool {
|
||||
func (s *Subscription[T]) IsClosed() bool {
|
||||
select {
|
||||
case <-c.quitDone:
|
||||
case <-s.quitDone:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// send sends a value to the subscription channel. If the subscription is closed, it returns an error.
|
||||
func (c *ClientSubscription[T]) send(ctx context.Context, value T) error {
|
||||
// Send sends a value to the subscription channel. If the subscription is closed, it returns an error.
|
||||
func (s *Subscription[T]) Send(ctx context.Context, value T) error {
|
||||
select {
|
||||
case c.in <- value:
|
||||
case <-c.quitDone:
|
||||
case s.in <- value:
|
||||
case <-s.quitDone:
|
||||
return errors.Wrap(errs.InternalError, "subscription is closed")
|
||||
case <-ctx.Done():
|
||||
return errors.WithStack(ctx.Err())
|
||||
@@ -94,11 +101,11 @@ func (c *ClientSubscription[T]) send(ctx context.Context, value T) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendError sends an error to the subscription error channel. If the subscription is closed, it returns an error.
|
||||
func (c *ClientSubscription[T]) sendError(ctx context.Context, err error) error {
|
||||
// SendError sends an error to the subscription error channel. If the subscription is closed, it returns an error.
|
||||
func (s *Subscription[T]) SendError(ctx context.Context, err error) error {
|
||||
select {
|
||||
case c.err <- err:
|
||||
case <-c.quitDone:
|
||||
case s.err <- err:
|
||||
case <-s.quitDone:
|
||||
return errors.Wrap(errs.InternalError, "subscription is closed")
|
||||
case <-ctx.Done():
|
||||
return errors.WithStack(ctx.Err())
|
||||
@@ -107,17 +114,17 @@ func (c *ClientSubscription[T]) sendError(ctx context.Context, err error) error
|
||||
}
|
||||
|
||||
// run starts the forwarding loop for the subscription.
|
||||
func (c *ClientSubscription[T]) run() {
|
||||
defer close(c.quitDone)
|
||||
func (s *Subscription[T]) run() {
|
||||
defer close(s.quitDone)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.quit:
|
||||
case <-s.quit:
|
||||
return
|
||||
case value := <-c.in:
|
||||
case value := <-s.in:
|
||||
select {
|
||||
case c.channel <- value:
|
||||
case <-c.quit:
|
||||
case s.channel <- value:
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
205
modules/bitcoin/btcclient/client_db.go
Normal file
205
modules/bitcoin/btcclient/client_db.go
Normal file
@@ -0,0 +1,205 @@
|
||||
package btcclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/gaze-network/indexer-network/core/datasources"
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
"github.com/gaze-network/indexer-network/internal/subscription"
|
||||
"github.com/gaze-network/indexer-network/modules/bitcoin/datagateway"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
"github.com/pkg/errors"
|
||||
cstream "github.com/planxnx/concurrent-stream"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
// TODO: Refactor this, datasources.BitcoinNode and This package is the same.
|
||||
|
||||
// Make sure to implement the BitcoinDatasource interface
|
||||
var _ datasources.Datasource[[]*types.Block] = (*ClientDatabase)(nil)
|
||||
|
||||
// ClientDatabase is a client to connect to the bitcoin database.
|
||||
type ClientDatabase struct {
|
||||
bitcoinDg datagateway.BitcoinDataGateway
|
||||
}
|
||||
|
||||
func NewClientDatabase(bitcoinDg datagateway.BitcoinDataGateway) *ClientDatabase {
|
||||
return &ClientDatabase{
|
||||
bitcoinDg: bitcoinDg,
|
||||
}
|
||||
}
|
||||
|
||||
func (c ClientDatabase) Name() string {
|
||||
return "BitcoinDatabase"
|
||||
}
|
||||
|
||||
func (c *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
|
||||
ch := make(chan []*types.Block)
|
||||
subscription, err := c.FetchAsync(ctx, from, to, ch)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
defer subscription.Unsubscribe()
|
||||
|
||||
blocks := make([]*types.Block, 0)
|
||||
for {
|
||||
select {
|
||||
case b, ok := <-ch:
|
||||
if !ok {
|
||||
return blocks, nil
|
||||
}
|
||||
blocks = append(blocks, b...)
|
||||
case <-subscription.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, errors.Wrap(err, "context done")
|
||||
}
|
||||
return blocks, nil
|
||||
case err := <-subscription.Err():
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "got error while fetch async")
|
||||
}
|
||||
return blocks, nil
|
||||
case <-ctx.Done():
|
||||
return nil, errors.Wrap(ctx.Err(), "context done")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
|
||||
from, to, skip, err := c.prepareRange(ctx, from, to)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to prepare fetch range")
|
||||
}
|
||||
|
||||
subscription := subscription.NewSubscription(ch)
|
||||
if skip {
|
||||
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unsubscribe")
|
||||
}
|
||||
return subscription.Client(), nil
|
||||
}
|
||||
|
||||
// Create parallel stream
|
||||
out := make(chan []*types.Block)
|
||||
stream := cstream.NewStream(ctx, 8, out)
|
||||
|
||||
// create slice of block height to fetch
|
||||
blockHeights := make([]int64, 0, to-from+1)
|
||||
for i := from; i <= to; i++ {
|
||||
blockHeights = append(blockHeights, i)
|
||||
}
|
||||
|
||||
// Wait for stream to finish and close out channel
|
||||
go func() {
|
||||
defer close(out)
|
||||
_ = stream.Wait()
|
||||
}()
|
||||
|
||||
// Fan-out blocks to subscription channel
|
||||
go func() {
|
||||
defer subscription.Unsubscribe()
|
||||
for {
|
||||
select {
|
||||
case data, ok := <-out:
|
||||
// stream closed
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// empty blocks
|
||||
if len(data) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// send blocks to subscription channel
|
||||
if err := subscription.Send(ctx, data); err != nil {
|
||||
logger.ErrorContext(ctx, "failed while dispatch block",
|
||||
slogx.Error(err),
|
||||
slogx.Int64("start", data[0].Header.Height),
|
||||
slogx.Int64("end", data[len(data)-1].Header.Height),
|
||||
)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Parallel fetch blocks from Bitcoin node until complete all block heights
|
||||
// or subscription is done.
|
||||
go func() {
|
||||
defer stream.Close()
|
||||
done := subscription.Done()
|
||||
chunks := lo.Chunk(blockHeights, 100)
|
||||
for _, chunk := range chunks {
|
||||
chunk := chunk
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
if len(chunk) == 0 {
|
||||
continue
|
||||
}
|
||||
stream.Go(func() []*types.Block {
|
||||
fromHeight, toHeight := chunk[0], chunk[len(chunk)-1]
|
||||
blocks, err := c.bitcoinDg.GetBlocksByHeightRange(ctx, fromHeight, toHeight)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "failed to get blocks",
|
||||
slogx.Error(err),
|
||||
slogx.Int64("from_height", fromHeight),
|
||||
slogx.Int64("to_height", toHeight),
|
||||
)
|
||||
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get blocks: from_height: %d, to_height: %d", fromHeight, toHeight)); err != nil {
|
||||
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return blocks
|
||||
})
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return subscription.Client(), nil
|
||||
}
|
||||
|
||||
func (c *ClientDatabase) GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error) {
|
||||
header, err := c.bitcoinDg.GetBlockHeaderByHeight(ctx, height)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.WithStack(err)
|
||||
}
|
||||
return header, nil
|
||||
}
|
||||
|
||||
func (c *ClientDatabase) prepareRange(ctx context.Context, fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
|
||||
start = fromHeight
|
||||
end = toHeight
|
||||
|
||||
// get current bitcoin block height
|
||||
latestBlock, err := c.bitcoinDg.GetLatestBlockHeader(ctx)
|
||||
if err != nil {
|
||||
return -1, -1, false, errors.Wrap(err, "failed to get block count")
|
||||
}
|
||||
|
||||
// set start to genesis block height
|
||||
if start < 0 {
|
||||
start = 0
|
||||
}
|
||||
|
||||
// set end to current bitcoin block height if
|
||||
// - end is -1
|
||||
// - end is greater that current bitcoin block height
|
||||
if end < 0 || end > latestBlock.Height {
|
||||
end = latestBlock.Height
|
||||
}
|
||||
|
||||
// if start is greater than end, skip this round
|
||||
if start > end {
|
||||
return -1, -1, true, nil
|
||||
}
|
||||
|
||||
return start, end, false, nil
|
||||
}
|
||||
@@ -1,10 +1,6 @@
|
||||
-- name: GetLatestBlockHeader :one
|
||||
SELECT * FROM bitcoin_blocks ORDER BY block_height DESC LIMIT 1;
|
||||
|
||||
-- TODO: GetBlockHeaderByRange
|
||||
|
||||
-- TODO: GetBlockByHeight/Hash (Join block with transactions, txins, txouts)
|
||||
|
||||
-- name: InsertBlock :exec
|
||||
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce") VALUES ($1, $2, $3, $4, $5, $6, $7, $8);
|
||||
|
||||
@@ -24,7 +20,6 @@ WITH update_txout AS (
|
||||
INSERT INTO bitcoin_transaction_txins ("tx_hash","tx_idx","prevout_tx_hash","prevout_tx_idx","prevout_pkscript","scriptsig","witness","sequence")
|
||||
VALUES ($1, $2, $3, $4, (SELECT "pkscript" FROM update_txout), $5, $6, $7);
|
||||
|
||||
|
||||
-- name: RevertData :exec
|
||||
WITH delete_tx AS (
|
||||
DELETE FROM "bitcoin_transactions" WHERE "block_height" >= @from_height
|
||||
@@ -42,3 +37,18 @@ WITH delete_tx AS (
|
||||
RETURNING NULL
|
||||
)
|
||||
DELETE FROM "bitcoin_blocks" WHERE "bitcoin_blocks"."block_height" >= @from_height;
|
||||
|
||||
-- name: GetBlockByHeight :one
|
||||
SELECT * FROM bitcoin_blocks WHERE block_height = $1;
|
||||
|
||||
-- name: GetBlocksByHeightRange :many
|
||||
SELECT * FROM bitcoin_blocks WHERE block_height >= @from_height AND block_height <= @to_height ORDER BY block_height ASC;
|
||||
|
||||
-- name: GetTransactionsByHeightRange :many
|
||||
SELECT * FROM bitcoin_transactions WHERE block_height >= @from_height AND block_height <= @to_height;
|
||||
|
||||
-- name: GetTransactionTxOutsByTxHashes :many
|
||||
SELECT * FROM bitcoin_transaction_txouts WHERE tx_hash = ANY(@tx_hashes::TEXT[]);
|
||||
|
||||
-- name: GetTransactionTxInsByTxHashes :many
|
||||
SELECT * FROM bitcoin_transaction_txins WHERE tx_hash = ANY(@tx_hashes::TEXT[]);
|
||||
|
||||
@@ -18,4 +18,6 @@ type BitcoinWriterDataDataGateway interface {
|
||||
|
||||
type BitcoinReaderDataDataGateway interface {
|
||||
GetLatestBlockHeader(context.Context) (types.BlockHeader, error)
|
||||
GetBlockHeaderByHeight(ctx context.Context, blockHeight int64) (types.BlockHeader, error)
|
||||
GetBlocksByHeightRange(ctx context.Context, from int64, to int64) ([]*types.Block, error)
|
||||
}
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"slices"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
"github.com/gaze-network/indexer-network/modules/bitcoin/repository/postgres/gen"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
func (r *Repository) GetLatestBlockHeader(ctx context.Context) (types.BlockHeader, error) {
|
||||
@@ -74,3 +80,135 @@ func (r *Repository) RevertBlocks(ctx context.Context, from int64) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Repository) GetBlockHeaderByHeight(ctx context.Context, blockHeight int64) (types.BlockHeader, error) {
|
||||
blockModel, err := r.queries.GetBlockByHeight(ctx, int32(blockHeight))
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.Wrap(err, "failed to get block by height")
|
||||
}
|
||||
|
||||
data, err := mapBlockHeaderModelToType(blockModel)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.Wrap(err, "failed to map block header model to type")
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (r *Repository) GetBlocksByHeightRange(ctx context.Context, from int64, to int64) ([]*types.Block, error) {
|
||||
blocks, err := r.queries.GetBlocksByHeightRange(ctx, gen.GetBlocksByHeightRangeParams{
|
||||
FromHeight: int32(from),
|
||||
ToHeight: int32(to),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get blocks by height range")
|
||||
}
|
||||
|
||||
txs, err := r.queries.GetTransactionsByHeightRange(ctx, gen.GetTransactionsByHeightRangeParams{
|
||||
FromHeight: int32(from),
|
||||
ToHeight: int32(to),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get transactions by height range")
|
||||
}
|
||||
|
||||
txHashes := lo.Map(txs, func(tx gen.BitcoinTransaction, _ int) string { return tx.TxHash })
|
||||
|
||||
txOuts, err := r.queries.GetTransactionTxOutsByTxHashes(ctx, txHashes)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get transaction txouts by tx hashes")
|
||||
}
|
||||
|
||||
txIns, err := r.queries.GetTransactionTxInsByTxHashes(ctx, txHashes)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get transaction txins by tx hashes")
|
||||
}
|
||||
|
||||
// Grouping result by block height and tx hash
|
||||
groupedTxs := lo.GroupBy(txs, func(tx gen.BitcoinTransaction) int32 { return tx.BlockHeight })
|
||||
groupedTxOuts := lo.GroupBy(txOuts, func(txOut gen.BitcoinTransactionTxout) string { return txOut.TxHash })
|
||||
groupedTxIns := lo.GroupBy(txIns, func(txIn gen.BitcoinTransactionTxin) string { return txIn.TxHash })
|
||||
|
||||
// TODO: Extract to mapper functions
|
||||
var errs []error
|
||||
result := lo.Map(blocks, func(blockModel gen.BitcoinBlock, _ int) *types.Block {
|
||||
header, err := mapBlockHeaderModelToType(blockModel)
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to map block header model to type"))
|
||||
return nil
|
||||
}
|
||||
|
||||
txsModel := groupedTxs[blockModel.BlockHeight]
|
||||
return &types.Block{
|
||||
Header: header,
|
||||
Transactions: lo.Map(txsModel, func(txModel gen.BitcoinTransaction, _ int) *types.Transaction {
|
||||
blockHash, err := chainhash.NewHashFromStr(txModel.BlockHash)
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to parse block hash"))
|
||||
return nil
|
||||
}
|
||||
|
||||
txHash, err := chainhash.NewHashFromStr(txModel.TxHash)
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to parse tx hash"))
|
||||
return nil
|
||||
}
|
||||
|
||||
txOutsModel := groupedTxOuts[txModel.TxHash]
|
||||
txInsModel := groupedTxIns[txModel.TxHash]
|
||||
|
||||
// Sort txins and txouts by index (Asc)
|
||||
slices.SortFunc(txOutsModel, func(i, j gen.BitcoinTransactionTxout) int {
|
||||
return cmp.Compare(i.TxIdx, j.TxIdx)
|
||||
})
|
||||
slices.SortFunc(txInsModel, func(i, j gen.BitcoinTransactionTxin) int {
|
||||
return cmp.Compare(i.TxIdx, j.TxIdx)
|
||||
})
|
||||
|
||||
return &types.Transaction{
|
||||
BlockHeight: int64(txModel.BlockHeight),
|
||||
BlockHash: *blockHash,
|
||||
Index: uint32(txModel.Idx),
|
||||
TxHash: *txHash,
|
||||
Version: txModel.Version,
|
||||
LockTime: uint32(txModel.Locktime),
|
||||
TxIn: lo.Map(txInsModel, func(src gen.BitcoinTransactionTxin, _ int) *types.TxIn {
|
||||
scriptsig, err := hex.DecodeString(src.Scriptsig)
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to decode scriptsig"))
|
||||
return nil
|
||||
}
|
||||
|
||||
prevoutTxHash, err := chainhash.NewHashFromStr(src.PrevoutTxHash)
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to parse prevout tx hash"))
|
||||
return nil
|
||||
}
|
||||
|
||||
return &types.TxIn{
|
||||
SignatureScript: scriptsig,
|
||||
Witness: [][]byte{}, // TODO: implement witness
|
||||
Sequence: uint32(src.Sequence),
|
||||
PreviousOutIndex: uint32(src.PrevoutTxIdx),
|
||||
PreviousOutTxHash: *prevoutTxHash,
|
||||
}
|
||||
}),
|
||||
TxOut: lo.Map(txOutsModel, func(src gen.BitcoinTransactionTxout, _ int) *types.TxOut {
|
||||
pkscript, err := hex.DecodeString(src.Pkscript)
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "failed to decode pkscript"))
|
||||
return nil
|
||||
}
|
||||
return &types.TxOut{
|
||||
PkScript: pkscript,
|
||||
Value: src.Value,
|
||||
}
|
||||
}),
|
||||
}
|
||||
}),
|
||||
}
|
||||
})
|
||||
if len(errs) > 0 {
|
||||
return nil, errors.Wrap(errors.Join(errs...), "failed while mapping result")
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -11,6 +11,64 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const getBlockByHeight = `-- name: GetBlockByHeight :one
|
||||
SELECT block_height, block_hash, version, merkle_root, prev_block_hash, timestamp, bits, nonce FROM bitcoin_blocks WHERE block_height = $1
|
||||
`
|
||||
|
||||
func (q *Queries) GetBlockByHeight(ctx context.Context, blockHeight int32) (BitcoinBlock, error) {
|
||||
row := q.db.QueryRow(ctx, getBlockByHeight, blockHeight)
|
||||
var i BitcoinBlock
|
||||
err := row.Scan(
|
||||
&i.BlockHeight,
|
||||
&i.BlockHash,
|
||||
&i.Version,
|
||||
&i.MerkleRoot,
|
||||
&i.PrevBlockHash,
|
||||
&i.Timestamp,
|
||||
&i.Bits,
|
||||
&i.Nonce,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getBlocksByHeightRange = `-- name: GetBlocksByHeightRange :many
|
||||
SELECT block_height, block_hash, version, merkle_root, prev_block_hash, timestamp, bits, nonce FROM bitcoin_blocks WHERE block_height >= $1 AND block_height <= $2 ORDER BY block_height ASC
|
||||
`
|
||||
|
||||
type GetBlocksByHeightRangeParams struct {
|
||||
FromHeight int32
|
||||
ToHeight int32
|
||||
}
|
||||
|
||||
func (q *Queries) GetBlocksByHeightRange(ctx context.Context, arg GetBlocksByHeightRangeParams) ([]BitcoinBlock, error) {
|
||||
rows, err := q.db.Query(ctx, getBlocksByHeightRange, arg.FromHeight, arg.ToHeight)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []BitcoinBlock
|
||||
for rows.Next() {
|
||||
var i BitcoinBlock
|
||||
if err := rows.Scan(
|
||||
&i.BlockHeight,
|
||||
&i.BlockHash,
|
||||
&i.Version,
|
||||
&i.MerkleRoot,
|
||||
&i.PrevBlockHash,
|
||||
&i.Timestamp,
|
||||
&i.Bits,
|
||||
&i.Nonce,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getLatestBlockHeader = `-- name: GetLatestBlockHeader :one
|
||||
SELECT block_height, block_hash, version, merkle_root, prev_block_hash, timestamp, bits, nonce FROM bitcoin_blocks ORDER BY block_height DESC LIMIT 1
|
||||
`
|
||||
@@ -31,9 +89,106 @@ func (q *Queries) GetLatestBlockHeader(ctx context.Context) (BitcoinBlock, error
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getTransactionTxInsByTxHashes = `-- name: GetTransactionTxInsByTxHashes :many
|
||||
SELECT tx_hash, tx_idx, prevout_tx_hash, prevout_tx_idx, prevout_pkscript, scriptsig, witness, sequence FROM bitcoin_transaction_txins WHERE tx_hash = ANY($1::TEXT[])
|
||||
`
|
||||
|
||||
func (q *Queries) GetTransactionTxInsByTxHashes(ctx context.Context, txHashes []string) ([]BitcoinTransactionTxin, error) {
|
||||
rows, err := q.db.Query(ctx, getTransactionTxInsByTxHashes, txHashes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []BitcoinTransactionTxin
|
||||
for rows.Next() {
|
||||
var i BitcoinTransactionTxin
|
||||
if err := rows.Scan(
|
||||
&i.TxHash,
|
||||
&i.TxIdx,
|
||||
&i.PrevoutTxHash,
|
||||
&i.PrevoutTxIdx,
|
||||
&i.PrevoutPkscript,
|
||||
&i.Scriptsig,
|
||||
&i.Witness,
|
||||
&i.Sequence,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getTransactionTxOutsByTxHashes = `-- name: GetTransactionTxOutsByTxHashes :many
|
||||
SELECT tx_hash, tx_idx, pkscript, value, is_spent FROM bitcoin_transaction_txouts WHERE tx_hash = ANY($1::TEXT[])
|
||||
`
|
||||
|
||||
func (q *Queries) GetTransactionTxOutsByTxHashes(ctx context.Context, txHashes []string) ([]BitcoinTransactionTxout, error) {
|
||||
rows, err := q.db.Query(ctx, getTransactionTxOutsByTxHashes, txHashes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []BitcoinTransactionTxout
|
||||
for rows.Next() {
|
||||
var i BitcoinTransactionTxout
|
||||
if err := rows.Scan(
|
||||
&i.TxHash,
|
||||
&i.TxIdx,
|
||||
&i.Pkscript,
|
||||
&i.Value,
|
||||
&i.IsSpent,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getTransactionsByHeightRange = `-- name: GetTransactionsByHeightRange :many
|
||||
SELECT tx_hash, version, locktime, block_height, block_hash, idx FROM bitcoin_transactions WHERE block_height >= $1 AND block_height <= $2
|
||||
`
|
||||
|
||||
type GetTransactionsByHeightRangeParams struct {
|
||||
FromHeight int32
|
||||
ToHeight int32
|
||||
}
|
||||
|
||||
func (q *Queries) GetTransactionsByHeightRange(ctx context.Context, arg GetTransactionsByHeightRangeParams) ([]BitcoinTransaction, error) {
|
||||
rows, err := q.db.Query(ctx, getTransactionsByHeightRange, arg.FromHeight, arg.ToHeight)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []BitcoinTransaction
|
||||
for rows.Next() {
|
||||
var i BitcoinTransaction
|
||||
if err := rows.Scan(
|
||||
&i.TxHash,
|
||||
&i.Version,
|
||||
&i.Locktime,
|
||||
&i.BlockHeight,
|
||||
&i.BlockHash,
|
||||
&i.Idx,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const insertBlock = `-- name: InsertBlock :exec
|
||||
|
||||
|
||||
INSERT INTO bitcoin_blocks ("block_height","block_hash","version","merkle_root","prev_block_hash","timestamp","bits","nonce") VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
`
|
||||
|
||||
@@ -48,8 +203,6 @@ type InsertBlockParams struct {
|
||||
Nonce int32
|
||||
}
|
||||
|
||||
// TODO: GetBlockHeaderByRange
|
||||
// TODO: GetBlockByHeight/Hash (Join block with transactions, txins, txouts)
|
||||
func (q *Queries) InsertBlock(ctx context.Context, arg InsertBlockParams) error {
|
||||
_, err := q.db.Exec(ctx, insertBlock,
|
||||
arg.BlockHeight,
|
||||
|
||||
Reference in New Issue
Block a user