Compare commits

..

17 Commits

Author SHA1 Message Date
Planxnx
4f2dd80546 feat: support s3 downloading configure 2024-05-30 22:50:21 +07:00
Planxnx
ad26ea0bff feat: increase download connection 2024-05-27 09:48:01 +07:00
Planxnx
23b88c7859 feat(parquet): create buffer for parquet file reader and s3 downloader 2024-05-25 06:25:41 +07:00
Planxnx
d8b8ae42fb feat(parquet): add read all rows function 2024-05-25 06:15:30 +07:00
Planxnx
c1362ae328 doc: remove unnecessary todo 2024-05-25 05:20:26 +07:00
Planxnx
6585da5907 perf: remove unused field to reduce ~1.5GB of memory usage 2024-05-25 05:20:26 +07:00
Planxnx
41cb5de9c0 doc: update note about aws public data 2024-05-25 01:56:05 +07:00
Planxnx
fc8bad75a5 doc: add note about coinbase txinput 2024-05-24 04:44:04 +07:00
Planxnx
7373944c85 doc: add note about nill and empty witness 2024-05-24 03:53:14 +07:00
Planxnx
89a2e58622 fix: invalid blocks to ordered 2024-05-24 03:04:46 +07:00
Planxnx
9ea9ebdb30 fix: filtering block heights and use utc date for key 2024-05-24 03:00:59 +07:00
Planxnx
5e46a87201 wip: no return error when construct aws datasource 2024-05-23 16:58:23 +07:00
Planxnx
e98c3def55 wip: parse aws transaction to type transaction 2024-05-23 02:37:25 +07:00
Planxnx
a3f902f5d5 feat: map parquet data to internal types 2024-05-21 16:29:56 +07:00
Planxnx
e8b4f5a2de wip: implement aws public data datasource 2024-05-21 16:01:52 +07:00
Planxnx
611717706b feat(btc): support get current block height 2024-05-21 15:40:55 +07:00
Planxnx
83b38bc67b wip: create aws public data datasource 2024-05-21 02:48:10 +07:00
52 changed files with 2053 additions and 2222 deletions

View File

@@ -1,18 +0,0 @@
.git
.gitignore
.github
.vscode
**/*.md
**/*.log
.DS_Store
# Docker
Dockerfile
.dockerignore
docker-compose.yml
# Go
.golangci.yaml
cmd.local
config.*.y*ml
config.y*ml

View File

@@ -3,15 +3,15 @@ FROM golang:1.22 as builder
WORKDIR /app
COPY go.mod go.sum ./
RUN --mount=type=cache,target=/go/pkg/mod/ go mod download
RUN go mod download
COPY ./ ./
ENV GOOS=linux
ENV CGO_ENABLED=0
RUN --mount=type=cache,target=/go/pkg/mod/ \
go build -o main ./main.go
RUN go build \
-o main ./main.go
FROM alpine:latest
@@ -19,10 +19,9 @@ WORKDIR /app
RUN apk --no-cache add ca-certificates tzdata
COPY --from=builder /app/main .
COPY --from=builder /app/modules ./modules
# You can set TZ identifier to change the timezone, See https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List
# ENV TZ=US/Central
# You can set `TZ` environment variable to change the timezone
ENTRYPOINT ["/app/main"]
CMD ["/app/main", "run"]

View File

@@ -2,7 +2,7 @@
# Gaze Indexer
Gaze Indexer is an open-source and modular indexing client for Bitcoin meta-protocols with **Unified Consistent APIs** across fungible token protocols.
Gaze Indexer is an open-source and modular indexing client for Bitcoin meta-protocols. It has support for Runes out of the box, with **Unified Consistent APIs** across fungible token protocols.
Gaze Indexer is built with **modularity** in mind, allowing users to run all modules in one monolithic instance with a single command, or as a distributed cluster of micro-services.
@@ -25,7 +25,7 @@ This allows developers to focus on what **truly** matters: Meta-protocol indexin
### 1. Runes
The Runes Indexer is our first meta-protocol indexer. It indexes Runes states, transactions, runestones, and balances using Bitcoin transactions.
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://api-docs.gaze.network) for full details.
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://documenter.getpostman.com/view/28396285/2sA3Bn7Cxr) for full details.
## Installation
@@ -51,6 +51,8 @@ Here is our minimum database disk space requirement for each module.
| ------ | -------------------------- | ---------------------------- |
| Runes | 10 GB | 150 GB |
Here is our minimum database disk space requirement for each module.
#### 4. Prepare `config.yaml` file.
```yaml
@@ -86,7 +88,7 @@ modules:
# Configuration options for Runes module. Can be removed if not used.
runes:
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
datasource: "bitcoin-node" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node".
datasource: "database" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node".
api_handlers: # API handlers to enable. current supported handlers: "http"
- http
postgres:
@@ -106,14 +108,14 @@ We will be using `docker-compose` for our installation guide. Make sure the `doc
# docker-compose.yaml
services:
gaze-indexer:
image: ghcr.io/gaze-network/gaze-indexer:v0.2.1
image: ghcr.io/gaze-network/gaze-indexer:v1.0.0
container_name: gaze-indexer
restart: unless-stopped
ports:
- 8080:8080 # Expose HTTP server port to host
volumes:
- "./config.yaml:/app/config.yaml" # mount config.yaml file to the container as "/app/config.yaml"
command: ["/app/main", "run", "--modules", "runes"] # Put module flags after "run" commands to select which modules to run.
command: ["/app/main", "run", "--runes"] # Put module flags after "run" commands to select which modules to run.
```
### Install from source

View File

@@ -19,18 +19,13 @@ import (
"github.com/gaze-network/indexer-network/internal/config"
"github.com/gaze-network/indexer-network/modules/runes"
"github.com/gaze-network/indexer-network/pkg/automaxprocs"
"github.com/gaze-network/indexer-network/pkg/errorhandler"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/middleware/errorhandler"
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
"github.com/gaze-network/indexer-network/pkg/middleware/requestlogger"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/compress"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/favicon"
fiberrecover "github.com/gofiber/fiber/v2/middleware/recover"
"github.com/gofiber/fiber/v2/middleware/requestid"
"github.com/samber/do/v2"
"github.com/samber/lo"
"github.com/spf13/cobra"
@@ -136,26 +131,10 @@ func runHandler(cmd *cobra.Command, _ []string) error {
// Initialize HTTP server
do.Provide(injector, func(i do.Injector) (*fiber.App, error) {
app := fiber.New(fiber.Config{
AppName: "Gaze Indexer",
ErrorHandler: func(c *fiber.Ctx, err error) error {
logger.ErrorContext(c.UserContext(), "Something went wrong, unhandled api error",
slogx.String("event", "api_unhandled_error"),
slogx.Error(err),
)
return errors.WithStack(c.Status(http.StatusInternalServerError).JSON(fiber.Map{
"error": "Internal Server Error",
}))
},
AppName: "Gaze Indexer",
ErrorHandler: errorhandler.NewHTTPErrorHandler(),
})
app.
Use(favicon.New()).
Use(cors.New()).
Use(requestid.New()).
Use(requestcontext.New(
requestcontext.WithRequestId(),
requestcontext.WithClientIP(conf.HTTPServer.RequestIP),
)).
Use(requestlogger.New(conf.HTTPServer.Logger)).
Use(fiberrecover.New(fiberrecover.Config{
EnableStackTrace: true,
StackTraceHandler: func(c *fiber.Ctx, e interface{}) {
@@ -164,7 +143,6 @@ func runHandler(cmd *cobra.Command, _ []string) error {
logger.ErrorContext(c.UserContext(), "Something went wrong, panic in http handler", slogx.Any("panic", e), slog.String("stacktrace", string(buf)))
},
})).
Use(errorhandler.New()).
Use(compress.New(compress.Config{
Level: compress.LevelDefault,
}))

View File

@@ -23,14 +23,6 @@ reporting:
# HTTP server configuration options.
http_server:
port: 8080 # Port to run the HTTP server on for modules with HTTP API handlers.
logger:
disable: false # disable logger if logger level is `INFO`
request_header: false
request_query: false
requestip: # Client IP extraction configuration options. This is unnecessary if you don't care about the real client IP or if you're not using a reverse proxy.
trusted_proxies_ip: # Cloudflare, GCP Public LB. See: server/internal/middleware/requestcontext/PROXY-IP.md
trusted_proxies_header: # X-Real-IP, CF-Connecting-IP
enable_reject_malformed_request: false # return 403 if request is malformed (invalid IP)
# Meta-protocol modules configuration options.
modules:

View File

@@ -1,5 +1,5 @@
package constants
const (
Version = "v0.2.1"
Version = "v0.0.1"
)

View File

@@ -0,0 +1,688 @@
// AWS Public Blockchain Datasource
// - https://registry.opendata.aws/aws-public-blockchain
// - https://github.com/aws-solutions-library-samples/guidance-for-digital-assets-on-aws
//
// To setup your own data source, see: https://github.com/aws-solutions-library-samples/guidance-for-digital-assets-on-aws/blob/main/analytics/producer/README.md
package datasources
import (
"cmp"
"context"
"encoding/hex"
"fmt"
"io"
"log/slog"
"math"
"slices"
"strconv"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/subscription"
"github.com/gaze-network/indexer-network/pkg/btcutils"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/parquetutils"
"github.com/samber/lo"
parquettypes "github.com/xitongsys/parquet-go/types"
"golang.org/x/sync/errgroup"
)
const (
awsPublicDataS3Region = "us-east-2"
awsPublicDataS3Bucket = "aws-public-blockchain"
defaultAWSPublicDataDownloadConcurrency = 8
)
var firstBitcoinTimestamp = time.Date(2009, time.January, 3, 18, 15, 5, 0, time.UTC)
// Make sure to implement the BitcoinDatasource interface
var _ Datasource[*types.Block] = (*AWSPublicDataDatasource)(nil)
type AWSPublicDataDatasourceConfig struct {
// The number of goroutines to spin up in parallel when downloading parts.
// Concurrency of 1 will download the parts sequentially.
// Default is 8.
//
// CAUTION: High concurrency with low part size can reduce the time to download file,
// but it can also increase the memory usage.
DownloadConcurrency int
// The size (in bytes) to request from S3 for each part.
// Default is depend on the concurrency and file size (part size = file size / concurrency).
//
// CAUTION: High concurrency with low part size can reduce the time to download file,
// but it can also increase the memory usage.
DownloadPartSize int64 `mapstructure:"download_part_size"`
}
type AWSPublicDataDatasource struct {
btcDatasource Datasource[*types.Block]
s3Client *s3.Client
s3Bucket string
config AWSPublicDataDatasourceConfig
}
func NewAWSPublicData(btcDatasource Datasource[*types.Block], conf AWSPublicDataDatasourceConfig) *AWSPublicDataDatasource {
sdkConfig, err := config.LoadDefaultConfig(context.Background())
if err != nil {
logger.Panic("Can't load AWS SDK user config", slogx.Error(err), slog.String("package", "datasources"))
}
// TODO: support user defined config (self-hosted s3 bucket)
s3client := s3.NewFromConfig(sdkConfig, func(o *s3.Options) {
o.Region = awsPublicDataS3Region
o.Credentials = aws.AnonymousCredentials{}
})
if conf.DownloadConcurrency <= 0 {
conf.DownloadConcurrency = defaultAWSPublicDataDownloadConcurrency
}
return &AWSPublicDataDatasource{
btcDatasource: btcDatasource,
s3Client: s3client,
s3Bucket: awsPublicDataS3Bucket,
config: conf,
}
}
func (d AWSPublicDataDatasource) Name() string {
return fmt.Sprintf("aws_public_data/%s", d.btcDatasource.Name())
}
func (d *AWSPublicDataDatasource) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
ch := make(chan []*types.Block)
subscription, err := d.FetchAsync(ctx, from, to, ch)
if err != nil {
return nil, errors.WithStack(err)
}
defer subscription.Unsubscribe()
blocks := make([]*types.Block, 0)
for {
select {
case b, ok := <-ch:
if !ok {
return blocks, nil
}
blocks = append(blocks, b...)
case <-subscription.Done():
if err := ctx.Err(); err != nil {
return nil, errors.Wrap(err, "context done")
}
return blocks, nil
case err := <-subscription.Err():
if err != nil {
return nil, errors.Wrap(err, "got error while fetch async")
}
return blocks, nil
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "context done")
}
}
}
func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
ctx = logger.WithContext(ctx,
slogx.String("package", "datasources"),
slogx.String("datasource", d.Name()),
)
start, end, skip, err := d.prepareRange(ctx, from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
}
subscription := subscription.NewSubscription(ch)
if skip {
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
return subscription.Client(), nil
}
startFiles, err := d.listBlocksFilesByDate(ctx, start.Timestamp)
if err != nil {
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
return nil, errors.Wrap(err, "failed to list files by date")
}
// supported only merged blocks files
startFiles = lo.Filter(startFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
// use other datasource instead of s3 if there's no supported data
if len(startFiles) == 0 {
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
s, err := d.btcDatasource.FetchAsync(ctx, start.Height, end.Height, ch)
return s, errors.WithStack(err)
}
go func() {
defer func() {
// add a bit delay to prevent shutdown before client receive all blocks
time.Sleep(100 * time.Millisecond)
subscription.Unsubscribe()
}()
// loop through each day until reach the end of supported data or within end block date
for ts := start.Timestamp; ts.Before(end.Timestamp.Round(24*time.Hour)) && ts.Before(time.Now()); ts = ts.Add(24 * time.Hour) {
ctx := logger.WithContext(ctx,
slogx.Time("date", ts),
slogx.Int64("date_unix", ts.Unix()),
)
logger.DebugContext(ctx, "Fetching data from AWS S3", slogx.Int64("start", start.Height), slogx.Int64("end", end.Height))
allBlocksFiles, err := d.listBlocksFilesByDate(ctx, ts)
if err != nil {
logger.ErrorContext(ctx, "Failed to list blocks files by date from aws s3", slogx.Error(err))
if err := subscription.SendError(ctx, errors.WithStack(err)); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
allTxsFiles, err := d.listTxsFilesByDate(ctx, ts)
if err != nil {
logger.ErrorContext(ctx, "Failed to list txs files by date from aws s3", slogx.Error(err))
if err := subscription.SendError(ctx, errors.WithStack(err)); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
blocksFiles := lo.Filter(allBlocksFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
txsFiles := lo.Filter(allTxsFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
logger.DebugContext(ctx, "Found files in AWS S3 bucket",
slogx.Int("files_blocks", len(allBlocksFiles)),
slogx.Int("files_blocks_merged", len(blocksFiles)),
slogx.Int("files_txs_all", len(allTxsFiles)),
slogx.Int("files_txs_merged", len(txsFiles)),
)
// Reach the end of supported data,
// stop fetching data from AWS S3
if len(blocksFiles) == 0 || len(txsFiles) == 0 {
logger.DebugContext(ctx, "No blocks files found, stop fetching data from AWS S3")
return
}
// prevent unexpected error
{
if len(blocksFiles) != 1 {
logger.ErrorContext(ctx, "Unexpected blocks files count, should be 1", slogx.Int("count", len(blocksFiles)))
if err := subscription.SendError(ctx, errors.Wrap(errs.InternalError, "unexpected blocks files count")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
if len(txsFiles) != 1 {
logger.ErrorContext(ctx, "Unexpected txs files count, should be 1", slogx.Int("count", len(txsFiles)))
if err := subscription.SendError(ctx, errors.Wrap(errs.InternalError, "unexpected txs files count")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
}
// TODO: use concurrent stream (max 2 goroutine) to download files then sequentially read parquet files
// to improve performance while not consuming too much memory (increase around 500 MB per goroutine)
var (
// TODO: create []byte pool to reduce alloc ops (reduce GC pressure)
// TODO: use FileSystem for default buffer (can choose memory or disk buffer)
blocksBuffer = parquetutils.NewBuffer()
txsBuffer = parquetutils.NewBuffer()
)
startDownload := time.Now()
if err := d.downloadFile(ctx, blocksFiles[0], blocksBuffer); err != nil {
logger.ErrorContext(ctx, "Failed to download blocks file from AWS S3", slogx.Int("count", len(txsFiles)))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't download blocks file")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
if err := d.downloadFile(ctx, txsFiles[0], txsBuffer); err != nil {
logger.ErrorContext(ctx, "Failed to download blocks file from AWS S3", slogx.Int("count", len(txsFiles)))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't download blocks file")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
logger.DebugContext(ctx, "Downloaded files from AWS S3",
slogx.Duration("duration", time.Since(startDownload)),
slogx.Int("sizes_blocks", len(blocksBuffer.Bytes())),
slogx.Int("sizes_txs", len(txsBuffer.Bytes())),
)
// Read parquet files
startRead := time.Now()
// we can read all blocks data at once because it's small
rawAllBlocks, err := parquetutils.ReadAll[awsBlock](blocksBuffer)
if err != nil {
logger.ErrorContext(ctx, "Failed to read parquet blocks data", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't read parquet blocks data")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
// NOTE: We shouldn't read all txs data at once because it's very huge (up to ~1.5GB memory usage)
// we should read it by chunk and send it to subscription client to reduce memory usage.
// But AWS Public Dataset are not sorted by block number and index,
// so we can't avoid reading all transactions data by skip unnecessary transactions
// or chunk data by block number to reduce memory usage :(
rawAllTxs, err := parquetutils.ReadAll[awsTransaction](blocksBuffer)
if err != nil {
logger.ErrorContext(ctx, "Failed to read parquet txs data", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't read parquet blocks data")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
groupRawTxs := lo.GroupBy(rawAllTxs, func(tx awsTransaction) int64 {
return tx.BlockNumber
})
// filter blocks data by height range
rawFilteredBlocks := lo.Filter(rawAllBlocks, func(block awsBlock, _ int) bool {
return block.Number >= start.Height && block.Number <= end.Height
})
slices.SortFunc(rawFilteredBlocks, func(i, j awsBlock) int {
return cmp.Compare(i.Number, j.Number)
})
logger.DebugContext(ctx, "Read parquet files",
slogx.Duration("duration", time.Since(startRead)),
slogx.Int("total_blocks", len(rawAllBlocks)),
slogx.Int("filtered_blocks", len(rawFilteredBlocks)),
slogx.Int("total_txs", len(rawAllTxs)),
slogx.Int("total_txs_grouped", len(groupRawTxs)),
)
blocks := make([]*types.Block, 0, len(rawFilteredBlocks))
for _, rawBlock := range rawFilteredBlocks {
blockHeader, err := rawBlock.ToBlockHeader()
if err != nil {
logger.ErrorContext(ctx, "Failed to convert aws block to type block header", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't convert aws block to type block header")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
txs := make([]*types.Transaction, 0, len(groupRawTxs[blockHeader.Height]))
for _, rawTx := range groupRawTxs[rawBlock.Number] {
tx, err := rawTx.ToTransaction(rawBlock)
if err != nil {
logger.ErrorContext(ctx, "Failed to convert aws transaction to type transaction", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't convert aws transaction to type transaction")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
txs = append(txs, tx)
}
slices.SortFunc(txs, func(i, j *types.Transaction) int {
return cmp.Compare(i.Index, j.Index)
})
blocks = append(blocks, &types.Block{
Header: blockHeader,
Transactions: txs,
})
}
logger.DebugContext(ctx, "Send blocks to subscription client", slogx.Int("count", len(blocks)))
if err := subscription.Send(ctx, blocks); err != nil {
if errors.Is(err, errs.Closed) {
logger.DebugContext(ctx, "Subscription client closed, can't send", slogx.Error(err))
return
}
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
slogx.Int64("start", blocks[0].Header.Height),
slogx.Int64("end", blocks[len(blocks)-1].Header.Height),
slogx.Error(err),
)
}
}
}()
return subscription.Client(), nil
}
func (d *AWSPublicDataDatasource) GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error) {
header, err := d.btcDatasource.GetBlockHeader(ctx, height)
return header, errors.WithStack(err)
}
func (d *AWSPublicDataDatasource) GetCurrentBlockHeight(ctx context.Context) (int64, error) {
height, err := d.btcDatasource.GetCurrentBlockHeight(ctx)
return height, errors.WithStack(err)
}
func (d *AWSPublicDataDatasource) prepareRange(ctx context.Context, fromHeight, toHeight int64) (startHeader, endHeader types.BlockHeader, skip bool, err error) {
start := fromHeight
end := toHeight
// get current bitcoin block height
latestBlockHeight, err := d.btcDatasource.GetCurrentBlockHeight(ctx)
if err != nil {
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrap(err, "failed to get block count")
}
// set start to genesis block height
if start < 0 {
start = 0
}
// set end to current bitcoin block height if
// - end is -1
// - end is greater that current bitcoin block height
if end < 0 || end > latestBlockHeight {
end = latestBlockHeight
}
// if start is greater than end, skip this round
if start > end {
return types.BlockHeader{}, types.BlockHeader{}, true, nil
}
if err != nil {
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrapf(err, "block %v", end)
}
group, groupctx := errgroup.WithContext(ctx)
group.Go(func() error {
startHeader, err = d.GetBlockHeader(groupctx, start)
return errors.Wrapf(err, "block %v", start)
})
group.Go(func() error {
endHeader, err = d.GetBlockHeader(ctx, end)
return errors.Wrapf(err, "block %v", end)
})
if err := group.Wait(); err != nil {
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrap(err, "failed to get block header")
}
return startHeader, endHeader, false, nil
}
func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string) ([]awsFile, error) {
result, err := d.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.s3Bucket),
Prefix: aws.String(prefix),
})
if err != nil {
return nil, errors.Wrapf(err, "can't list s3 bucket objects for bucket %q and prefix %q", d.s3Bucket, prefix)
}
// filter empty keys
objs := lo.Filter(result.Contents, func(item s3types.Object, _ int) bool { return item.Key != nil })
return lo.Map(objs, func(item s3types.Object, _ int) awsFile {
return awsFile{
Key: *item.Key,
Size: *item.Size,
LastModified: *item.LastModified,
}
}), nil
}
func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
if date.Before(firstBitcoinTimestamp) {
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
}
prefix := "v1.0/btc/blocks/date=" + date.UTC().Format(time.DateOnly)
files, err := d.listFiles(ctx, prefix)
if err != nil {
return nil, errors.Wrap(err, "failed to list blocks files by date")
}
return files, nil
}
func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
if date.Before(firstBitcoinTimestamp) {
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
}
prefix := "v1.0/btc/transactions/date=" + date.UTC().Format(time.DateOnly)
files, err := d.listFiles(ctx, prefix)
if err != nil {
return nil, errors.Wrap(err, "failed to list txs files by date")
}
return files, nil
}
func (d *AWSPublicDataDatasource) downloadFile(ctx context.Context, f awsFile, w io.WriterAt) error {
downloader := manager.NewDownloader(d.s3Client, func(md *manager.Downloader) {
md.Concurrency = d.config.DownloadConcurrency
md.PartSize = d.config.DownloadPartSize
if md.PartSize <= 0 {
md.PartSize = f.Size / int64(md.Concurrency)
}
})
numBytes, err := downloader.Download(ctx, w, &s3.GetObjectInput{
Bucket: aws.String(d.s3Bucket),
Key: aws.String(f.Key),
})
if err != nil {
return errors.Wrapf(err, "failed to download file for bucket %q and key %q", d.s3Bucket, f.Key)
}
if numBytes < 1 {
return errors.Wrap(errs.NotFound, "got empty file")
}
return nil
}
// TODO: remove unused fields to reduce memory usage
type (
awsBlock struct {
Hash string `parquet:"name=hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Bits string `parquet:"name=bits, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"` // Hex string format
PreviousBlockHash string `parquet:"name=previousblockhash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
MerkleRoot string `parquet:"name=merkle_root, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
CoinbaseParam string `parquet:"name=coinbase_param, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Timestamp string `parquet:"name=timestamp, type=INT96, repetitiontype=OPTIONAL"`
Number int64 `parquet:"name=number, type=INT64, repetitiontype=OPTIONAL"`
Version int64 `parquet:"name=version, type=INT64, repetitiontype=OPTIONAL"`
Nonce int64 `parquet:"name=nonce, type=INT64, repetitiontype=OPTIONAL"`
// MedianTime string `parquet:"name=mediantime, type=INT96, repetitiontype=OPTIONAL"`
// Difficulty float64 `parquet:"name=difficulty, type=DOUBLE, repetitiontype=OPTIONAL"`
// Chainwork string `parquet:"name=chainwork, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Size int64 `parquet:"name=size, type=INT64, repetitiontype=OPTIONAL"`
// Weight int64 `parquet:"name=weight, type=INT64, repetitiontype=OPTIONAL"`
// TransactionCount int64 `parquet:"name=transaction_count, type=INT64, repetitiontype=OPTIONAL"`
// StrippedSize int64 `parquet:"name=stripped_size, type=INT64, repetitiontype=OPTIONAL"`
// Date string `parquet:"name=date, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// LastModified string `parquet:"name=last_modified, type=INT96, repetitiontype=OPTIONAL"`
}
awsTransaction struct {
Hash string `parquet:"name=hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
BlockHash string `parquet:"name=block_hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Outputs []*awsTxOutput `parquet:"name=outputs, type=LIST, repetitiontype=OPTIONAL, valuetype=STRUCT"`
Inputs []*awsTxInput `parquet:"name=inputs, type=LIST, repetitiontype=OPTIONAL, valuetype=STRUCT"`
Version int64 `parquet:"name=version, type=INT64, repetitiontype=OPTIONAL"`
Size int64 `parquet:"name=size, type=INT64, repetitiontype=OPTIONAL"`
BlockNumber int64 `parquet:"name=block_number, type=INT64, repetitiontype=OPTIONAL"`
Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
LockTime int64 `parquet:"name=lock_time, type=INT64, repetitiontype=OPTIONAL"`
IsCoinbase bool `parquet:"name=is_coinbase, type=BOOLEAN, repetitiontype=OPTIONAL"`
// VirtualSize int64 `parquet:"name=virtual_size, type=INT64, repetitiontype=OPTIONAL"`
// InputCount int64 `parquet:"name=input_count, type=INT64, repetitiontype=OPTIONAL"`
// OutputCount int64 `parquet:"name=output_count, type=INT64, repetitiontype=OPTIONAL"`
// OutputValue float64 `parquet:"name=output_value, type=DOUBLE, repetitiontype=OPTIONAL"`
// BlockTimestamp string `parquet:"name=block_timestamp, type=INT96, repetitiontype=OPTIONAL"`
// Date string `parquet:"name=date, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// LastModified string `parquet:"name=last_modified, type=INT96, repetitiontype=OPTIONAL"`
// Fee float64 `parquet:"name=fee, type=DOUBLE, repetitiontype=OPTIONAL"`
// InputValue float64 `parquet:"name=input_value, type=DOUBLE, repetitiontype=OPTIONAL"`
}
awsTxInput struct {
ScriptHex string `parquet:"name=script_hex, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
SpentTransactionHash string `parquet:"name=spent_transaction_hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
TxInWitness []*string `parquet:"name=txinwitness, type=LIST, repetitiontype=OPTIONAL, valuetype=BYTE_ARRAY, convertedtype=UTF8"`
SpentOutputIndex int64 `parquet:"name=spent_output_index, type=INT64, repetitiontype=OPTIONAL"`
Sequence int64 `parquet:"name=sequence, type=INT64, repetitiontype=OPTIONAL"`
// Address string `parquet:"name=address, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
// RequiredSignatures int64 `parquet:"name=required_signatures, type=INT64, repetitiontype=OPTIONAL"`
// ScriptAsm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Value float64 `parquet:"name=value, type=DOUBLE, repetitiontype=OPTIONAL"`
}
awsTxOutput struct {
Script_hex string `parquet:"name=script_hex, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Value float64 `parquet:"name=value, type=DOUBLE, repetitiontype=OPTIONAL"`
// Address string `parquet:"name=address, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
// Required_signatures int64 `parquet:"name=required_signatures, type=INT64, repetitiontype=OPTIONAL"`
// Script_asm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
}
awsFile struct {
Key string
Size int64
LastModified time.Time
}
)
func (a awsBlock) ToBlockHeader() (types.BlockHeader, error) {
hash, err := chainhash.NewHashFromStr(a.Hash)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert block hash")
}
prevBlockHash, err := chainhash.NewHashFromStr(a.PreviousBlockHash)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert previous block hash")
}
merkleRoot, err := chainhash.NewHashFromStr(a.MerkleRoot)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert merkle root")
}
bits, err := strconv.ParseUint(a.Bits, 16, 32)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert bits from hex str to uint32")
}
return types.BlockHeader{
Hash: *hash,
Height: a.Number,
Version: int32(a.Version),
PrevBlock: *prevBlockHash,
MerkleRoot: *merkleRoot,
Timestamp: parquettypes.INT96ToTime(a.Timestamp),
Bits: uint32(bits),
Nonce: uint32(a.Nonce),
}, nil
}
func (a awsTransaction) ToTransaction(block awsBlock) (*types.Transaction, error) {
blockhash, err := chainhash.NewHashFromStr(block.Hash)
if err != nil {
return nil, errors.Wrap(err, "can't convert block hash")
}
msgtx, err := a.MsgTx(block)
if err != nil {
return nil, errors.Wrap(err, "can't convert aws tx to wire.msgtx")
}
return types.ParseMsgTx(msgtx, a.BlockNumber, *blockhash, uint32(a.Index)), nil
}
func (a awsTransaction) MsgTx(block awsBlock) (*wire.MsgTx, error) {
txIn := make([]*wire.TxIn, 0, len(a.Inputs))
txOut := make([]*wire.TxOut, 0, len(a.Outputs))
// NOTE: coinbase tx from AWS S3 has no inputs, so we need to add it manually,
// but we can't guarantee this data is correct especially the sequence number.
if a.IsCoinbase && len(a.Inputs) == 0 {
scriptsig, err := hex.DecodeString(block.CoinbaseParam)
if err != nil {
return nil, errors.Wrap(err, "can't decode script hex")
}
txIn = append(txIn, &wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Hash: common.ZeroHash,
Index: math.MaxUint32,
},
SignatureScript: scriptsig,
Witness: btcutils.CoinbaseWitness,
Sequence: math.MaxUint32, // most coinbase tx are using max sequence number
})
}
for _, in := range a.Inputs {
scriptsig, err := hex.DecodeString(in.ScriptHex)
if err != nil {
return nil, errors.Wrap(err, "can't decode script hex")
}
witness, err := btcutils.WitnessFromHex(lo.Map(in.TxInWitness, func(src *string, _ int) string {
if src == nil {
return ""
}
return *src
}))
if err != nil {
return nil, errors.Wrap(err, "can't convert witness")
}
prevOutHash, err := chainhash.NewHashFromStr(in.SpentTransactionHash)
if err != nil {
return nil, errors.Wrap(err, "can't convert prevout hash")
}
txIn = append(txIn, &wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Hash: *prevOutHash,
Index: uint32(in.SpentOutputIndex),
},
SignatureScript: scriptsig,
Witness: witness,
Sequence: uint32(in.Sequence),
})
}
for _, out := range a.Outputs {
scriptpubkey, err := hex.DecodeString(out.Script_hex)
if err != nil {
return nil, errors.Wrap(err, "can't decode script hex")
}
txOut = append(txOut, &wire.TxOut{
Value: btcutils.BitcoinToSatoshi(out.Value),
PkScript: scriptpubkey,
})
}
return &wire.MsgTx{
Version: int32(a.Version),
TxIn: txIn,
TxOut: txOut,
LockTime: uint32(a.LockTime),
}, nil
}

View File

@@ -88,7 +88,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
slogx.String("datasource", d.Name()),
)
from, to, skip, err := d.prepareRange(from, to)
from, to, skip, err := d.prepareRange(ctx, from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
}
@@ -212,12 +212,12 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
return subscription.Client(), nil
}
func (d *BitcoinNodeDatasource) prepareRange(fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
func (d *BitcoinNodeDatasource) prepareRange(ctx context.Context, fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
start = fromHeight
end = toHeight
// get current bitcoin block height
latestBlockHeight, err := d.btcclient.GetBlockCount()
latestBlockHeight, err := d.GetCurrentBlockHeight(ctx)
if err != nil {
return -1, -1, false, errors.Wrap(err, "failed to get block count")
}
@@ -227,7 +227,7 @@ func (d *BitcoinNodeDatasource) prepareRange(fromHeight, toHeight int64) (start,
start = 0
}
// set end to current bitcoin block height if
// set end to current bitcoin block height if d
// - end is -1
// - end is greater that current bitcoin block height
if end < 0 || end > latestBlockHeight {
@@ -292,3 +292,12 @@ func (d *BitcoinNodeDatasource) GetBlockHeader(ctx context.Context, height int64
return types.ParseMsgBlockHeader(*block, height), nil
}
// GetCurrentBlockHeight fetch current block height from Bitcoin node
func (d *BitcoinNodeDatasource) GetCurrentBlockHeight(ctx context.Context) (int64, error) {
height, err := d.btcclient.GetBlockCount()
if err != nil {
return -1, errors.Wrap(err, "failed to get block height")
}
return height, nil
}

View File

@@ -13,4 +13,5 @@ type Datasource[T any] interface {
Fetch(ctx context.Context, from, to int64) ([]T, error)
FetchAsync(ctx context.Context, from, to int64, ch chan<- []T) (*subscription.ClientSubscription[[]T], error)
GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error)
GetCurrentBlockHeight(ctx context.Context) (int64, error)
}

View File

@@ -91,10 +91,6 @@ func (i *Indexer[T]) Run(ctx context.Context) (err error) {
select {
case <-i.quit:
logger.InfoContext(ctx, "Got quit signal, stopping indexer")
if err := i.Processor.Shutdown(ctx); err != nil {
logger.ErrorContext(ctx, "Failed to shutdown processor", slogx.Error(err))
return errors.Wrap(err, "processor shutdown failed")
}
return nil
case <-ctx.Done():
return nil
@@ -208,9 +204,9 @@ func (i *Indexer[T]) process(ctx context.Context) (err error) {
}
// validate is input is continuous and no reorg
prevHeader := i.currentBlock
for i, input := range inputs {
header := input.BlockHeader()
for i := 1; i < len(inputs); i++ {
header := inputs[i].BlockHeader()
prevHeader := inputs[i-1].BlockHeader()
if header.Height != prevHeader.Height+1 {
return errors.Wrapf(errs.InternalError, "input is not continuous, input[%d] height: %d, input[%d] height: %d", i-1, prevHeader.Height, i, header.Height)
}
@@ -221,7 +217,6 @@ func (i *Indexer[T]) process(ctx context.Context) (err error) {
// end current round
return nil
}
prevHeader = header
}
ctx = logger.WithContext(ctx, slog.Int("total_inputs", len(inputs)))

View File

@@ -29,9 +29,6 @@ type Processor[T Input] interface {
// VerifyStates verifies the states of the indexed data and the indexer
// to ensure the last shutdown was graceful and no missing data.
VerifyStates(ctx context.Context) error
// Shutdown gracefully stops the processor. Database connections, network calls, leftover states, etc. should be closed and cleaned up here.
Shutdown(ctx context.Context) error
}
type IndexerWorker interface {

49
go.mod
View File

@@ -4,9 +4,12 @@ go 1.22
require (
github.com/Cleverse/go-utilities/utils v0.0.0-20240119201306-d71eb577ef11
github.com/aws/aws-sdk-go-v2 v1.23.0
github.com/aws/aws-sdk-go-v2/config v1.25.3
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.14.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.43.0
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/btcutil v1.1.5
github.com/btcsuite/btcd/btcutil/psbt v1.1.9
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/cockroachdb/errors v1.11.1
github.com/gaze-network/uint128 v1.3.0
@@ -17,31 +20,50 @@ require (
github.com/planxnx/concurrent-stream v0.1.5
github.com/samber/do/v2 v2.0.0-beta.7
github.com/samber/lo v1.39.0
github.com/shopspring/decimal v1.3.1
github.com/shopspring/decimal v1.4.0
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.8.4
github.com/valyala/fasthttp v1.51.0
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20240122235623-d6294584ab18
go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.7.0
golang.org/x/sync v0.5.0
)
require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/bitonicnl/verify-signed-message v0.7.1
github.com/btcsuite/btcd/btcec/v2 v2.3.3 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.20.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.16.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.17.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.25.3 // indirect
github.com/aws/smithy-go v1.17.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
@@ -51,6 +73,7 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx v3.6.2+incompatible // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
@@ -61,6 +84,7 @@ require (
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.16 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
@@ -76,10 +100,11 @@ require (
github.com/valyala/tcplisten v1.0.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

1051
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -11,8 +11,6 @@ import (
runesconfig "github.com/gaze-network/indexer-network/modules/runes/config"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
"github.com/gaze-network/indexer-network/pkg/middleware/requestlogger"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/spf13/pflag"
"github.com/spf13/viper"
@@ -65,9 +63,7 @@ type Modules struct {
}
type HTTPServerConfig struct {
Port int `mapstructure:"port"`
Logger requestlogger.Config `mapstructure:"logger"`
RequestIP requestcontext.WithClientIPConfig `mapstructure:"requestip"`
Port int `mapstructure:"port"`
}
// Parse parse the configuration from environment variables

View File

@@ -2,7 +2,6 @@ package httphandler
import (
"encoding/hex"
"fmt"
"slices"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@@ -15,11 +14,9 @@ import (
)
type getTransactionsRequest struct {
Wallet string `query:"wallet"`
Id string `query:"id"`
FromBlock int64 `query:"fromBlock"`
ToBlock int64 `query:"toBlock"`
Wallet string `query:"wallet"`
Id string `query:"id"`
BlockHeight uint64 `query:"blockHeight"`
}
func (r getTransactionsRequest) Validate() error {
@@ -27,12 +24,6 @@ func (r getTransactionsRequest) Validate() error {
if r.Id != "" && !isRuneIdOrRuneName(r.Id) {
errList = append(errList, errors.New("'id' is not valid rune id or rune name"))
}
if r.FromBlock < -1 {
errList = append(errList, errors.Errorf("invalid fromBlock range"))
}
if r.ToBlock < -1 {
errList = append(errList, errors.Errorf("invalid toBlock range"))
}
return errs.WithPublicMessage(errors.Join(errList...), "validation error")
}
@@ -134,31 +125,17 @@ func (h *HttpHandler) GetTransactions(ctx *fiber.Ctx) (err error) {
}
}
// default to latest block
if req.ToBlock == 0 {
req.ToBlock = -1
}
// get latest block height if block height is -1
if req.FromBlock == -1 || req.ToBlock == -1 {
blockHeight := req.BlockHeight
// set blockHeight to the latest block height blockHeight, pkScript, and runeId are not provided
if blockHeight == 0 && pkScript == nil && runeId == (runes.RuneId{}) {
blockHeader, err := h.usecase.GetLatestBlock(ctx.UserContext())
if err != nil {
return errors.Wrap(err, "error during GetLatestBlock")
}
if req.FromBlock == -1 {
req.FromBlock = blockHeader.Height
}
if req.ToBlock == -1 {
req.ToBlock = blockHeader.Height
}
blockHeight = uint64(blockHeader.Height)
}
// validate block height range
if req.FromBlock > req.ToBlock {
return errs.NewPublicError(fmt.Sprintf("fromBlock must be less than or equal to toBlock, got fromBlock=%d, toBlock=%d", req.FromBlock, req.ToBlock))
}
txs, err := h.usecase.GetRuneTransactions(ctx.UserContext(), pkScript, runeId, uint64(req.FromBlock), uint64(req.ToBlock))
txs, err := h.usecase.GetRuneTransactions(ctx.UserContext(), pkScript, runeId, blockHeight)
if err != nil {
return errors.Wrap(err, "error during GetRuneTransactions")
}

View File

@@ -55,9 +55,8 @@ SELECT * FROM runes_transactions
OR runes_transactions.burns ? @rune_id
OR (runes_transactions.rune_etched = TRUE AND runes_transactions.block_height = @rune_id_block_height AND runes_transactions.index = @rune_id_tx_index)
) AND (
@from_block <= runes_transactions.block_height AND runes_transactions.block_height <= @to_block
)
ORDER BY runes_transactions.block_height DESC LIMIT 10000;
@block_height::INT = 0 OR runes_transactions.block_height = @block_height::INT -- if @block_height > 0, apply block_height filter
);
-- name: CountRuneEntries :one
SELECT COUNT(*) FROM runes_entries;

View File

@@ -27,7 +27,7 @@ type RunesReaderDataGateway interface {
GetLatestBlock(ctx context.Context) (types.BlockHeader, error)
GetIndexedBlockByHeight(ctx context.Context, height int64) (*entity.IndexedBlock, error)
// GetRuneTransactions returns the runes transactions, filterable by pkScript, runeId and height. If pkScript, runeId or height is zero value, that filter is ignored.
GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, fromBlock, toBlock uint64) ([]*entity.RuneTransaction, error)
GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, height uint64) ([]*entity.RuneTransaction, error)
GetRunesBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[runes.RuneId]*entity.OutPointBalance, error)
GetUnspentOutPointBalancesByPkScript(ctx context.Context, pkScript []byte, blockHeight uint64) ([]*entity.OutPointBalance, error)

View File

@@ -31,7 +31,6 @@ type Processor struct {
bitcoinClient btcclient.Contract
network common.Network
reportingClient *reportingclient.ReportingClient
cleanupFuncs []func(context.Context) error
newRuneEntries map[runes.RuneId]*runes.RuneEntry
newRuneEntryStates map[runes.RuneId]*runes.RuneEntry
@@ -41,14 +40,13 @@ type Processor struct {
newRuneTxs []*entity.RuneTransaction
}
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient, cleanupFuncs []func(context.Context) error) *Processor {
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient) *Processor {
return &Processor{
runesDg: runesDg,
indexerInfoDg: indexerInfoDg,
bitcoinClient: bitcoinClient,
network: network,
reportingClient: reportingClient,
cleanupFuncs: cleanupFuncs,
newRuneEntries: make(map[runes.RuneId]*runes.RuneEntry),
newRuneEntryStates: make(map[runes.RuneId]*runes.RuneEntry),
newOutPointBalances: make(map[wire.OutPoint][]*entity.OutPointBalance),
@@ -230,13 +228,3 @@ func (p *Processor) RevertData(ctx context.Context, from int64) error {
}
return nil
}
func (p *Processor) Shutdown(ctx context.Context) error {
var errs []error
for _, cleanup := range p.cleanupFuncs {
if err := cleanup(ctx); err != nil {
errs = append(errs, err)
}
}
return errors.WithStack(errors.Join(errs...))
}

View File

@@ -646,9 +646,8 @@ SELECT hash, runes_transactions.block_height, index, timestamp, inputs, outputs,
OR runes_transactions.burns ? $5
OR (runes_transactions.rune_etched = TRUE AND runes_transactions.block_height = $6 AND runes_transactions.index = $7)
) AND (
$8 <= runes_transactions.block_height AND runes_transactions.block_height <= $9
$8::INT = 0 OR runes_transactions.block_height = $8::INT -- if @block_height > 0, apply block_height filter
)
ORDER BY runes_transactions.block_height DESC LIMIT 10000
`
type GetRuneTransactionsParams struct {
@@ -659,8 +658,7 @@ type GetRuneTransactionsParams struct {
RuneID []byte
RuneIDBlockHeight int32
RuneIDTxIndex int32
FromBlock int32
ToBlock int32
BlockHeight int32
}
type GetRuneTransactionsRow struct {
@@ -705,8 +703,7 @@ func (q *Queries) GetRuneTransactions(ctx context.Context, arg GetRuneTransactio
arg.RuneID,
arg.RuneIDBlockHeight,
arg.RuneIDTxIndex,
arg.FromBlock,
arg.ToBlock,
arg.BlockHeight,
)
if err != nil {
return nil, err

View File

@@ -62,7 +62,7 @@ func (r *Repository) GetIndexedBlockByHeight(ctx context.Context, height int64)
return indexedBlock, nil
}
func (r *Repository) GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, fromBlock, toBlock uint64) ([]*entity.RuneTransaction, error) {
func (r *Repository) GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, height uint64) ([]*entity.RuneTransaction, error) {
pkScriptParam := []byte(fmt.Sprintf(`[{"pkScript":"%s"}]`, hex.EncodeToString(pkScript)))
runeIdParam := []byte(fmt.Sprintf(`[{"runeId":"%s"}]`, runeId.String()))
rows, err := r.queries.GetRuneTransactions(ctx, gen.GetRuneTransactionsParams{
@@ -75,8 +75,7 @@ func (r *Repository) GetRuneTransactions(ctx context.Context, pkScript []byte, r
RuneIDBlockHeight: int32(runeId.BlockHeight),
RuneIDTxIndex: int32(runeId.TxIndex),
FromBlock: int32(fromBlock),
ToBlock: int32(toBlock),
BlockHeight: int32(height),
})
if err != nil {
return nil, errors.Wrap(err, "error during query")

View File

@@ -33,7 +33,6 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
runesDg runesdatagateway.RunesDataGateway
indexerInfoDg runesdatagateway.IndexerInfoDataGateway
)
var cleanupFuncs []func(context.Context) error
switch strings.ToLower(conf.Modules.Runes.Database) {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
@@ -43,10 +42,7 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
}
return nil, errors.Wrap(err, "can't create Postgres connection pool")
}
cleanupFuncs = append(cleanupFuncs, func(ctx context.Context) error {
pg.Close()
return nil
})
defer pg.Close()
runesRepo := runespostgres.NewRepository(pg)
runesDg = runesRepo
indexerInfoDg = runesRepo
@@ -66,7 +62,7 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
return nil, errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
}
processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient, cleanupFuncs)
processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient)
if err := processor.VerifyStates(ctx); err != nil {
return nil, errors.WithStack(err)
}

View File

@@ -69,26 +69,8 @@ func ParseTag(input interface{}) (Tag, error) {
return input, nil
case uint128.Uint128:
return Tag(input), nil
case int:
return Tag(uint128.From64(uint64(input))), nil
case int8:
return Tag(uint128.From64(uint64(input))), nil
case int16:
return Tag(uint128.From64(uint64(input))), nil
case int32:
return Tag(uint128.From64(uint64(input))), nil
case int64:
return Tag(uint128.From64(uint64(input))), nil
case uint:
return Tag(uint128.From64(uint64(input))), nil
case uint8:
return Tag(uint128.From64(uint64(input))), nil
case uint16:
return Tag(uint128.From64(uint64(input))), nil
case uint32:
return Tag(uint128.From64(uint64(input))), nil
case uint64:
return Tag(uint128.From64(input)), nil
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
return Tag(uint128.From64(input.(uint64))), nil
case big.Int:
u128, err := uint128.FromBig(&input)
if err != nil {

View File

@@ -8,8 +8,8 @@ import (
"github.com/gaze-network/indexer-network/modules/runes/runes"
)
func (u *Usecase) GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, fromBlock, toBlock uint64) ([]*entity.RuneTransaction, error) {
txs, err := u.runesDg.GetRuneTransactions(ctx, pkScript, runeId, fromBlock, toBlock)
func (u *Usecase) GetRuneTransactions(ctx context.Context, pkScript []byte, runeId runes.RuneId, height uint64) ([]*entity.RuneTransaction, error) {
txs, err := u.runesDg.GetRuneTransactions(ctx, pkScript, runeId, height)
if err != nil {
return nil, errors.Wrap(err, "error during GetTransactionsByHeight")
}

View File

@@ -1,212 +0,0 @@
package btcutils
import (
"encoding/json"
"reflect"
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/txscript"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
)
const (
// MaxSupportedPkScriptSize is the maximum supported size of a pkScript.
MaxSupportedPkScriptSize = 40
)
// IsAddress returns whether or not the passed string is a valid bitcoin address and valid supported type.
//
// NetParams is optional. If provided, we only check for that network,
// otherwise, we check for all supported networks.
func IsAddress(address string, defaultNet ...*chaincfg.Params) bool {
if len(address) == 0 {
return false
}
// If defaultNet is provided, we only check for that network.
net, ok := utils.Optional(defaultNet)
if ok {
_, _, err := parseAddress(address, net)
return err == nil
}
// Otherwise, we check for all supported networks.
for _, net := range supportedNetworks {
_, _, err := parseAddress(address, net)
if err == nil {
return true
}
}
return false
}
// TODO: create GetAddressNetwork
// check `Bech32HRPSegwit` prefix or netID for P2SH/P2PKH is equal to `PubKeyHashAddrID/ScriptHashAddrID`
// GetAddressType returns the address type of the passed address.
func GetAddressType(address string, net *chaincfg.Params) (AddressType, error) {
_, addrType, err := parseAddress(address, net)
return addrType, errors.WithStack(err)
}
type Address struct {
decoded btcutil.Address
net *chaincfg.Params
encoded string
encodedType AddressType
scriptPubKey [MaxSupportedPkScriptSize]byte
scriptPubKeySize int
}
// NewAddress creates a new address from the given address string.
//
// defaultNet is required if your address is P2SH or P2PKH (legacy or nested segwit)
// If your address is P2WSH, P2WPKH or P2TR, defaultNet is not required.
func NewAddress(address string, defaultNet ...*chaincfg.Params) Address {
addr, err := SafeNewAddress(address, defaultNet...)
if err != nil {
logger.Panic("can't create parse address", slogx.Error(err), slogx.String("package", "btcutils"))
}
return addr
}
// SafeNewAddress creates a new address from the given address string.
// It returns an error if the address is invalid.
//
// defaultNet is required if your address is P2SH or P2PKH (legacy or nested segwit)
// If your address is P2WSH, P2WPKH or P2TR, defaultNet is not required.
func SafeNewAddress(address string, defaultNet ...*chaincfg.Params) (Address, error) {
net := utils.DefaultOptional(defaultNet, &chaincfg.MainNetParams)
decoded, addrType, err := parseAddress(address, net)
if err != nil {
return Address{}, errors.Wrap(err, "can't parse address")
}
scriptPubkey, err := txscript.PayToAddrScript(decoded)
if err != nil {
return Address{}, errors.Wrap(err, "can't get script pubkey")
}
fixedPkScript := [MaxSupportedPkScriptSize]byte{}
copy(fixedPkScript[:], scriptPubkey)
return Address{
decoded: decoded,
net: net,
encoded: decoded.EncodeAddress(),
encodedType: addrType,
scriptPubKey: fixedPkScript,
scriptPubKeySize: len(scriptPubkey),
}, nil
}
// String returns the address string.
func (a Address) String() string {
return a.encoded
}
// Type returns the address type.
func (a Address) Type() AddressType {
return a.encodedType
}
// Decoded returns the btcutil.Address
func (a Address) Decoded() btcutil.Address {
return a.decoded
}
// IsForNet returns whether or not the address is associated with the passed bitcoin network.
func (a Address) IsForNet(net *chaincfg.Params) bool {
return a.decoded.IsForNet(net)
}
// ScriptAddress returns the raw bytes of the address to be used when inserting the address into a txout's script.
func (a Address) ScriptAddress() []byte {
return a.decoded.ScriptAddress()
}
// Net returns the address network params.
func (a Address) Net() *chaincfg.Params {
return a.net
}
// NetworkName
func (a Address) NetworkName() string {
return a.net.Name
}
// ScriptPubKey or pubkey script
func (a Address) ScriptPubKey() []byte {
return a.scriptPubKey[:a.scriptPubKeySize]
}
// Equal return true if addresses are equal
func (a Address) Equal(b Address) bool {
return a.encoded == b.encoded
}
// MarshalText implements the encoding.TextMarshaler interface.
func (a Address) MarshalText() ([]byte, error) {
return []byte(a.encoded), nil
}
// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (a *Address) UnmarshalText(input []byte) error {
address := string(input)
addr, err := SafeNewAddress(address)
if err == nil {
*a = addr
return nil
}
return errors.Wrapf(errs.InvalidArgument, "invalid address `%s`", address)
}
// MarshalJSON implements the json.Marshaler interface.
func (a Address) MarshalJSON() ([]byte, error) {
t, err := a.MarshalText()
if err != nil {
return nil, &json.MarshalerError{Type: reflect.TypeOf(a), Err: err}
}
b := make([]byte, len(t)+2)
b[0], b[len(b)-1] = '"', '"' // add quotes
copy(b[1:], t)
return b, nil
}
// UnmarshalJSON parses a hash in hex syntax.
func (a *Address) UnmarshalJSON(input []byte) error {
if !(len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"') {
return &json.UnmarshalTypeError{Value: "non-string", Type: reflect.TypeOf(Address{})}
}
if err := a.UnmarshalText(input[1 : len(input)-1]); err != nil {
return err
}
return nil
}
func parseAddress(address string, params *chaincfg.Params) (btcutil.Address, AddressType, error) {
decoded, err := btcutil.DecodeAddress(address, params)
if err != nil {
return nil, 0, errors.Wrapf(err, "can't decode address `%s` for network `%s`", address, params.Name)
}
switch decoded.(type) {
case *btcutil.AddressWitnessPubKeyHash:
return decoded, AddressP2WPKH, nil
case *btcutil.AddressTaproot:
return decoded, AddressP2TR, nil
case *btcutil.AddressScriptHash:
return decoded, AddressP2SH, nil
case *btcutil.AddressPubKeyHash:
return decoded, AddressP2PKH, nil
case *btcutil.AddressWitnessScriptHash:
return decoded, AddressP2WSH, nil
default:
return nil, 0, errors.Wrap(errs.Unsupported, "unsupported address type")
}
}

View File

@@ -1,80 +0,0 @@
package btcutils_test
import (
"testing"
"github.com/btcsuite/btcd/chaincfg"
"github.com/gaze-network/indexer-network/pkg/btcutils"
)
/*
NOTE:
# Compare this benchmark to go-ethereum/common.Address utils
- go-ethereum/common.HexToAddress speed: 45 ns/op, 48 B/op, 1 allocs/op
- go-ethereum/common.IsHexAddress speed: 25 ns/op, 0 B/op, 0 allocs/op
It's slower than go-ethereum/common.Address utils because ethereum wallet address is Hex string 20 bytes,
but Bitcoin has many types of address and each type has complex algorithm to solve (can't solve and validate address type directly from address string)
20/Jan/2024 @Planxnx Macbook Air M1 16GB
BenchmarkIsAddress/specific-network/mainnet/P2WPKH-8 1776146 625.6 ns/op 120 B/op 3 allocs/op
BenchmarkIsAddress/specific-network/testnet3/P2WPKH-8 1917876 623.2 ns/op 120 B/op 3 allocs/op
BenchmarkIsAddress/specific-network/mainnet/P2TR-8 1330348 915.4 ns/op 160 B/op 3 allocs/op
BenchmarkIsAddress/specific-network/testnet3/P2TR-8 1235806 931.1 ns/op 160 B/op 3 allocs/op
BenchmarkIsAddress/specific-network/mainnet/P2WSH-8 1261730 960.9 ns/op 160 B/op 3 allocs/op
BenchmarkIsAddress/specific-network/testnet3/P2WSH-8 1307851 916.1 ns/op 160 B/op 3 allocs/op
BenchmarkIsAddress/specific-network/mainnet/P2SH-8 3081762 402.0 ns/op 192 B/op 8 allocs/op
BenchmarkIsAddress/specific-network/testnet3/P2SH-8 3245838 344.9 ns/op 176 B/op 7 allocs/op
BenchmarkIsAddress/specific-network/mainnet/P2PKH-8 2904252 410.4 ns/op 184 B/op 8 allocs/op
BenchmarkIsAddress/specific-network/testnet3/P2PKH-8 3522332 342.8 ns/op 176 B/op 7 allocs/op
BenchmarkIsAddress/automate-network/mainnet/P2WPKH-8 1882059 637.6 ns/op 120 B/op 3 allocs/op
BenchmarkIsAddress/automate-network/testnet3/P2WPKH-8 1626151 664.8 ns/op 120 B/op 3 allocs/op
BenchmarkIsAddress/automate-network/mainnet/P2TR-8 1250253 952.1 ns/op 160 B/op 3 allocs/op
BenchmarkIsAddress/automate-network/testnet3/P2TR-8 1257901 993.7 ns/op 160 B/op 3 allocs/op
BenchmarkIsAddress/automate-network/mainnet/P2WSH-8 1000000 1005 ns/op 160 B/op 3 allocs/op
BenchmarkIsAddress/automate-network/testnet3/P2WSH-8 1209108 971.2 ns/op 160 B/op 3 allocs/op
BenchmarkIsAddress/automate-network/mainnet/P2SH-8 1869075 625.0 ns/op 268 B/op 9 allocs/op
BenchmarkIsAddress/automate-network/testnet3/P2SH-8 779496 1609 ns/op 694 B/op 17 allocs/op
BenchmarkIsAddress/automate-network/mainnet/P2PKH-8 1924058 650.6 ns/op 259 B/op 9 allocs/op
BenchmarkIsAddress/automate-network/testnet3/P2PKH-8 721510 1690 ns/op 694 B/op 17 allocs/op
*/
func BenchmarkIsAddress(b *testing.B) {
cases := []btcutils.Address{
/* P2WPKH */ btcutils.NewAddress("bc1qfpgdxtpl7kz5qdus2pmexyjaza99c28q8uyczh", &chaincfg.MainNetParams),
/* P2WPKH */ btcutils.NewAddress("tb1qfpgdxtpl7kz5qdus2pmexyjaza99c28qd6ltey", &chaincfg.TestNet3Params),
/* P2TR */ btcutils.NewAddress("bc1p7h87kqsmpzatddzhdhuy9gmxdpvn5kvar6hhqlgau8d2ffa0pa3qvz5d38", &chaincfg.MainNetParams),
/* P2TR */ btcutils.NewAddress("tb1p7h87kqsmpzatddzhdhuy9gmxdpvn5kvar6hhqlgau8d2ffa0pa3qm2zztg", &chaincfg.TestNet3Params),
/* P2WSH */ btcutils.NewAddress("bc1qeklep85ntjz4605drds6aww9u0qr46qzrv5xswd35uhjuj8ahfcqgf6hak", &chaincfg.MainNetParams),
/* P2WSH */ btcutils.NewAddress("tb1qrp33g0q5c5txsp9arysrx4k6zdkfs4nce4xj0gdcccefvpysxf3q0sl5k7", &chaincfg.TestNet3Params),
/* P2SH */ btcutils.NewAddress("3Ccte7SJz71tcssLPZy3TdWz5DTPeNRbPw", &chaincfg.MainNetParams),
/* P2SH */ btcutils.NewAddress("2NCxMvHPTduZcCuUeAiWUpuwHga7Y66y9XJ", &chaincfg.TestNet3Params),
/* P2PKH */ btcutils.NewAddress("1KrRZSShVkdc8J71CtY4wdw46Rx3BRLKyH", &chaincfg.MainNetParams),
/* P2PKH */ btcutils.NewAddress("migbBPcDajPfffrhoLpYFTQNXQFbWbhpz3", &chaincfg.TestNet3Params),
}
b.Run("specific-network", func(b *testing.B) {
for _, c := range cases {
b.Run(c.NetworkName()+"/"+c.Type().String(), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = btcutils.IsAddress(c.String(), c.Net())
}
})
}
})
b.Run("automate-network", func(b *testing.B) {
for _, c := range cases {
b.Run(c.NetworkName()+"/"+c.Type().String(), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
ok := btcutils.IsAddress(c.String())
if !ok {
b.Error("IsAddress returned false")
}
}
})
}
})
}

View File

@@ -1,449 +0,0 @@
package btcutils_test
import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"testing"
"github.com/btcsuite/btcd/chaincfg"
"github.com/gaze-network/indexer-network/pkg/btcutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetAddressType(t *testing.T) {
type Spec struct {
Address string
DefaultNet *chaincfg.Params
ExpectedError error
ExpectedAddressType btcutils.AddressType
}
specs := []Spec{
{
Address: "bc1qfpgdxtpl7kz5qdus2pmexyjaza99c28q8uyczh",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2WPKH,
},
{
Address: "tb1qfpgdxtpl7kz5qdus2pmexyjaza99c28qd6ltey",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2WPKH,
},
{
Address: "bc1p7h87kqsmpzatddzhdhuy9gmxdpvn5kvar6hhqlgau8d2ffa0pa3qvz5d38",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2TR,
},
{
Address: "tb1p7h87kqsmpzatddzhdhuy9gmxdpvn5kvar6hhqlgau8d2ffa0pa3qm2zztg",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2TR,
},
{
Address: "3Ccte7SJz71tcssLPZy3TdWz5DTPeNRbPw",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2SH,
},
{
Address: "1KrRZSShVkdc8J71CtY4wdw46Rx3BRLKyH",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2PKH,
},
{
Address: "bc1qeklep85ntjz4605drds6aww9u0qr46qzrv5xswd35uhjuj8ahfcqgf6hak",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2WSH,
},
{
Address: "migbBPcDajPfffrhoLpYFTQNXQFbWbhpz3",
DefaultNet: &chaincfg.TestNet3Params,
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2PKH,
},
{
Address: "tb1qrp33g0q5c5txsp9arysrx4k6zdkfs4nce4xj0gdcccefvpysxf3q0sl5k7",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2WSH,
},
{
Address: "2NCxMvHPTduZcCuUeAiWUpuwHga7Y66y9XJ",
DefaultNet: &chaincfg.TestNet3Params,
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2SH,
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf("address:%s", spec.Address), func(t *testing.T) {
actualAddressType, actualError := btcutils.GetAddressType(spec.Address, spec.DefaultNet)
if spec.ExpectedError != nil {
assert.ErrorIs(t, actualError, spec.ExpectedError)
} else {
assert.Equal(t, spec.ExpectedAddressType, actualAddressType)
}
})
}
}
func TestNewAddress(t *testing.T) {
type Spec struct {
Address string
DefaultNet *chaincfg.Params
ExpectedAddressType btcutils.AddressType
}
specs := []Spec{
{
Address: "bc1qfpgdxtpl7kz5qdus2pmexyjaza99c28q8uyczh",
// DefaultNet: &chaincfg.MainNetParams, // Optional
ExpectedAddressType: btcutils.AddressP2WPKH,
},
{
Address: "tb1qfpgdxtpl7kz5qdus2pmexyjaza99c28qd6ltey",
// DefaultNet: &chaincfg.MainNetParams, // Optional
ExpectedAddressType: btcutils.AddressP2WPKH,
},
{
Address: "bc1p7h87kqsmpzatddzhdhuy9gmxdpvn5kvar6hhqlgau8d2ffa0pa3qvz5d38",
// DefaultNet: &chaincfg.MainNetParams, // Optional
ExpectedAddressType: btcutils.AddressP2TR,
},
{
Address: "tb1p7h87kqsmpzatddzhdhuy9gmxdpvn5kvar6hhqlgau8d2ffa0pa3qm2zztg",
// DefaultNet: &chaincfg.MainNetParams, // Optional
ExpectedAddressType: btcutils.AddressP2TR,
},
{
Address: "bc1qeklep85ntjz4605drds6aww9u0qr46qzrv5xswd35uhjuj8ahfcqgf6hak",
// DefaultNet: &chaincfg.MainNetParams, // Optional
ExpectedAddressType: btcutils.AddressP2WSH,
},
{
Address: "tb1qrp33g0q5c5txsp9arysrx4k6zdkfs4nce4xj0gdcccefvpysxf3q0sl5k7",
// DefaultNet: &chaincfg.MainNetParams, // Optional
ExpectedAddressType: btcutils.AddressP2WSH,
},
{
Address: "3Ccte7SJz71tcssLPZy3TdWz5DTPeNRbPw",
DefaultNet: &chaincfg.MainNetParams,
ExpectedAddressType: btcutils.AddressP2SH,
},
{
Address: "2NCxMvHPTduZcCuUeAiWUpuwHga7Y66y9XJ",
DefaultNet: &chaincfg.TestNet3Params,
ExpectedAddressType: btcutils.AddressP2SH,
},
{
Address: "1KrRZSShVkdc8J71CtY4wdw46Rx3BRLKyH",
DefaultNet: &chaincfg.MainNetParams,
ExpectedAddressType: btcutils.AddressP2PKH,
},
{
Address: "migbBPcDajPfffrhoLpYFTQNXQFbWbhpz3",
DefaultNet: &chaincfg.TestNet3Params,
ExpectedAddressType: btcutils.AddressP2PKH,
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf("address:%s,type:%s", spec.Address, spec.ExpectedAddressType), func(t *testing.T) {
addr := btcutils.NewAddress(spec.Address, spec.DefaultNet)
assert.Equal(t, spec.ExpectedAddressType, addr.Type())
assert.Equal(t, spec.Address, addr.String())
})
}
}
func TestIsAddress(t *testing.T) {
type Spec struct {
Address string
Expected bool
}
specs := []Spec{
{
Address: "bc1qfpgdxtpl7kz5qdus2pmexyjaza99c28q8uyczh",
Expected: true,
},
{
Address: "tb1qfpgdxtpl7kz5qdus2pmexyjaza99c28qd6ltey",
Expected: true,
},
{
Address: "bc1p7h87kqsmpzatddzhdhuy9gmxdpvn5kvar6hhqlgau8d2ffa0pa3qvz5d38",
Expected: true,
},
{
Address: "tb1p7h87kqsmpzatddzhdhuy9gmxdpvn5kvar6hhqlgau8d2ffa0pa3qm2zztg",
Expected: true,
},
{
Address: "bc1qeklep85ntjz4605drds6aww9u0qr46qzrv5xswd35uhjuj8ahfcqgf6hak",
Expected: true,
},
{
Address: "tb1qrp33g0q5c5txsp9arysrx4k6zdkfs4nce4xj0gdcccefvpysxf3q0sl5k7",
Expected: true,
},
{
Address: "3Ccte7SJz71tcssLPZy3TdWz5DTPeNRbPw",
Expected: true,
},
{
Address: "2NCxMvHPTduZcCuUeAiWUpuwHga7Y66y9XJ",
Expected: true,
},
{
Address: "1KrRZSShVkdc8J71CtY4wdw46Rx3BRLKyH",
Expected: true,
},
{
Address: "migbBPcDajPfffrhoLpYFTQNXQFbWbhpz3",
Expected: true,
},
{
Address: "",
Expected: false,
},
{
Address: "migbBPcDajPfffrhoLpYFTQNXQFbWbhpz2",
Expected: false,
},
{
Address: "bc1qfpgdxtpl7kz5qdus2pmexyjaza99c28q8uyczz",
Expected: false,
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf("address:%s", spec.Address), func(t *testing.T) {
ok := btcutils.IsAddress(spec.Address)
assert.Equal(t, spec.Expected, ok)
})
}
}
func TestAddressEncoding(t *testing.T) {
rawAddress := "bc1qfpgdxtpl7kz5qdus2pmexyjaza99c28q8uyczh"
address := btcutils.NewAddress(rawAddress, &chaincfg.MainNetParams)
type Spec struct {
Data interface{}
Expected string
}
specs := []Spec{
{
Data: address,
Expected: fmt.Sprintf(`"%s"`, rawAddress),
},
{
Data: map[string]interface{}{
"address": rawAddress,
},
Expected: fmt.Sprintf(`{"address":"%s"}`, rawAddress),
},
}
for i, spec := range specs {
t.Run(fmt.Sprint(i+1), func(t *testing.T) {
actual, err := json.Marshal(spec.Data)
assert.NoError(t, err)
assert.Equal(t, spec.Expected, string(actual))
})
}
}
func TestAddressDecoding(t *testing.T) {
rawAddress := "bc1qfpgdxtpl7kz5qdus2pmexyjaza99c28q8uyczh"
address := btcutils.NewAddress(rawAddress, &chaincfg.MainNetParams)
// Case #1: address is a string
t.Run("from_string", func(t *testing.T) {
input := fmt.Sprintf(`"%s"`, rawAddress)
expected := address
actual := btcutils.Address{}
err := json.Unmarshal([]byte(input), &actual)
if !assert.NoError(t, err) {
t.FailNow()
}
assert.Equal(t, expected, actual)
})
// Case #2: address is a field of a struct
t.Run("from_field_string", func(t *testing.T) {
type Data struct {
Address btcutils.Address `json:"address"`
}
input := fmt.Sprintf(`{"address":"%s"}`, rawAddress)
expected := Data{Address: address}
actual := Data{}
err := json.Unmarshal([]byte(input), &actual)
if !assert.NoError(t, err) {
t.FailNow()
}
assert.Equal(t, expected, actual)
})
// Case #3: address is an element of an array
t.Run("from_array", func(t *testing.T) {
input := fmt.Sprintf(`["%s"]`, rawAddress)
expected := []btcutils.Address{address}
actual := []btcutils.Address{}
err := json.Unmarshal([]byte(input), &actual)
if !assert.NoError(t, err) {
t.FailNow()
}
assert.Equal(t, expected, actual)
})
// Case #4: not supported address type
t.Run("from_string/not_address", func(t *testing.T) {
input := fmt.Sprintf(`"%s"`, "THIS_IS_NOT_SUPPORTED_ADDRESS")
actual := btcutils.Address{}
err := json.Unmarshal([]byte(input), &actual)
assert.Error(t, err)
})
// Case #5: invalid field type
t.Run("from_number", func(t *testing.T) {
type Data struct {
Address btcutils.Address `json:"address"`
}
input := fmt.Sprintf(`{"address":%d}`, 123)
actual := Data{}
err := json.Unmarshal([]byte(input), &actual)
assert.Error(t, err)
})
}
func TestAddressPkScript(t *testing.T) {
anyErr := errors.New("any error")
type Spec struct {
Address string
DefaultNet *chaincfg.Params
ExpectedError error
ExpectedPkScript string // hex encoded
}
specs := []Spec{
{
Address: "some_invalid_address",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: anyErr,
ExpectedPkScript: "",
},
{
// P2WPKH
Address: "bc1qdx72th7e3z8zc5wdrdxweswfcne974pjneyjln",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "001469bca5dfd9888e2c51cd1b4cecc1c9c4f25f5432",
},
{
// P2WPKH
Address: "bc1q7cj6gz6t3d28qg7kxhrc7h5t3h0re34fqqalga",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "0014f625a40b4b8b547023d635c78f5e8b8dde3cc6a9",
},
{
// P2TR
Address: "bc1pfd0zw2jwlpn4xckpr3dxpt7x0gw6wetuftxvrc4dt2qgn9azjuus65fug6",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "51204b5e272a4ef8675362c11c5a60afc67a1da7657c4accc1e2ad5a808997a29739",
},
{
// P2TR
Address: "bc1pxpumml545tqum5afarzlmnnez2npd35nvf0j0vnrp88nemqsn54qle05sm",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "51203079bdfe95a2c1cdd3a9e8c5fdce7912a616c693625f27b26309cf3cec109d2a",
},
{
// P2SH
Address: "3Ccte7SJz71tcssLPZy3TdWz5DTPeNRbPw",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "a91477e1a3d54f545d83869ae3a6b28b071422801d7b87",
},
{
// P2PKH
Address: "1KrRZSShVkdc8J71CtY4wdw46Rx3BRLKyH",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "76a914cecb25b53809991c7beef2d27bc2be49e78c684388ac",
},
{
// P2WSH
Address: "bc1qeklep85ntjz4605drds6aww9u0qr46qzrv5xswd35uhjuj8ahfcqgf6hak",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "0020cdbf909e935c855d3e8d1b61aeb9c5e3c03ae8021b286839b1a72f2e48fdba70",
},
}
for _, spec := range specs {
t.Run(spec.Address, func(t *testing.T) {
addr, err := btcutils.SafeNewAddress(spec.Address, spec.DefaultNet)
if spec.ExpectedError != nil {
if errors.Is(spec.ExpectedError, anyErr) {
require.Error(t, err)
} else {
require.ErrorIs(t, err, spec.ExpectedError)
}
return
}
require.NoError(t, err)
assert.Equal(t, spec.ExpectedPkScript, hex.EncodeToString(addr.ScriptPubKey()))
})
}
}

View File

@@ -1,58 +0,0 @@
package btcutils
import (
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
)
var (
// NullAddress is an address that script address is all zeros.
NullAddress = NewAddress("1111111111111111111114oLvT2", &chaincfg.MainNetParams)
// NullHash is a hash that all bytes are zero.
NullHash = utils.Must(chainhash.NewHashFromStr("0000000000000000000000000000000000000000000000000000000000000000"))
)
// TransactionType is the type of bitcoin transaction
// It's an alias of txscript.ScriptClass
type TransactionType = txscript.ScriptClass
// AddressType is the type of bitcoin address.
// It's an alias of txscript.ScriptClass
type AddressType = txscript.ScriptClass
// Types of bitcoin transaction
const (
TransactionP2WPKH = txscript.WitnessV0PubKeyHashTy
TransactionP2TR = txscript.WitnessV1TaprootTy
TransactionTaproot = TransactionP2TR // Alias of P2TR
TransactionP2SH = txscript.ScriptHashTy
TransactionP2PKH = txscript.PubKeyHashTy
TransactionP2WSH = txscript.WitnessV0ScriptHashTy
)
// Types of bitcoin address
const (
AddressP2WPKH = txscript.WitnessV0PubKeyHashTy
AddressP2TR = txscript.WitnessV1TaprootTy
AddressTaproot = AddressP2TR // Alias of P2TR
AddressP2SH = txscript.ScriptHashTy
AddressP2PKH = txscript.PubKeyHashTy
AddressP2WSH = txscript.WitnessV0ScriptHashTy
)
// IsSupportType returns true if the given tx/address type is supported.
func IsSupportType(t txscript.ScriptClass) bool {
_, ok := supportedTypes[t]
return ok
}
var supportedTypes = map[txscript.ScriptClass]struct{}{
txscript.WitnessV0PubKeyHashTy: {},
txscript.WitnessV1TaprootTy: {},
txscript.ScriptHashTy: {},
txscript.PubKeyHashTy: {},
txscript.WitnessV0ScriptHashTy: {},
}

View File

@@ -1,23 +0,0 @@
package btcutils
import (
"github.com/btcsuite/btcd/chaincfg"
)
var supportedNetworks = map[string]*chaincfg.Params{
"mainnet": &chaincfg.MainNetParams,
"testnet": &chaincfg.TestNet3Params,
}
// IsSupportedNetwork returns true if the given network is supported.
//
// TODO: create enum for network
func IsSupportedNetwork(network string) bool {
_, ok := supportedNetworks[network]
return ok
}
// GetNetParams returns the *chaincfg.Params for the given network.
func GetNetParams(network string) *chaincfg.Params {
return supportedNetworks[network]
}

View File

@@ -1,69 +0,0 @@
package btcutils
import (
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/txscript"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
)
// NewPkScript creates a pubkey script(or witness program) from the given address string
//
// see: https://en.bitcoin.it/wiki/Script
func NewPkScript(address string, defaultNet ...*chaincfg.Params) ([]byte, error) {
net := utils.DefaultOptional(defaultNet, &chaincfg.MainNetParams)
decoded, _, err := parseAddress(address, net)
if err != nil {
return nil, errors.Wrap(err, "can't parse address")
}
scriptPubkey, err := txscript.PayToAddrScript(decoded)
if err != nil {
return nil, errors.Wrap(err, "can't get script pubkey")
}
return scriptPubkey, nil
}
// GetAddressTypeFromPkScript returns the address type from the given pubkey script/script pubkey.
func GetAddressTypeFromPkScript(pkScript []byte, defaultNet ...*chaincfg.Params) (AddressType, error) {
net := utils.DefaultOptional(defaultNet, &chaincfg.MainNetParams)
scriptClass, _, _, err := txscript.ExtractPkScriptAddrs(pkScript, net)
if err != nil {
return txscript.NonStandardTy, errors.Wrap(err, "can't parse pkScript")
}
return scriptClass, nil
}
// ExtractAddressFromPkScript extracts address from the given pubkey script/script pubkey.
// multi-signature script not supported
func ExtractAddressFromPkScript(pkScript []byte, defaultNet ...*chaincfg.Params) (Address, error) {
if len(pkScript) == 0 {
return Address{}, errors.New("empty pkScript")
}
if pkScript[0] == txscript.OP_RETURN {
return Address{}, errors.Wrap(errs.NotSupported, "OP_RETURN script")
}
net := utils.DefaultOptional(defaultNet, &chaincfg.MainNetParams)
addrType, addrs, _, err := txscript.ExtractPkScriptAddrs(pkScript, net)
if err != nil {
return Address{}, errors.Wrap(err, "can't parse pkScript")
}
if !IsSupportType(addrType) {
return Address{}, errors.Wrapf(errs.NotSupported, "unsupported pkscript type %s", addrType)
}
if len(addrs) == 0 {
return Address{}, errors.New("can't extract address from pkScript")
}
fixedPkScript := [MaxSupportedPkScriptSize]byte{}
copy(fixedPkScript[:], pkScript)
return Address{
decoded: addrs[0],
net: net,
encoded: addrs[0].EncodeAddress(),
encodedType: addrType,
scriptPubKey: fixedPkScript,
scriptPubKeySize: len(pkScript),
}, nil
}

View File

@@ -1,217 +0,0 @@
package btcutils_test
import (
"encoding/hex"
"fmt"
"testing"
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/txscript"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/btcutils"
"github.com/stretchr/testify/assert"
)
func TestNewPkScript(t *testing.T) {
anyError := errors.New("any error")
type Spec struct {
Address string
DefaultNet *chaincfg.Params
ExpectedError error
ExpectedPkScript string // hex encoded
}
specs := []Spec{
{
Address: "some_invalid_address",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: anyError,
ExpectedPkScript: "",
},
{
// P2WPKH
Address: "bc1qdx72th7e3z8zc5wdrdxweswfcne974pjneyjln",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "001469bca5dfd9888e2c51cd1b4cecc1c9c4f25f5432",
},
{
// P2WPKH
Address: "bc1q7cj6gz6t3d28qg7kxhrc7h5t3h0re34fqqalga",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "0014f625a40b4b8b547023d635c78f5e8b8dde3cc6a9",
},
{
// P2TR
Address: "bc1pfd0zw2jwlpn4xckpr3dxpt7x0gw6wetuftxvrc4dt2qgn9azjuus65fug6",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "51204b5e272a4ef8675362c11c5a60afc67a1da7657c4accc1e2ad5a808997a29739",
},
{
// P2TR
Address: "bc1pxpumml545tqum5afarzlmnnez2npd35nvf0j0vnrp88nemqsn54qle05sm",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "51203079bdfe95a2c1cdd3a9e8c5fdce7912a616c693625f27b26309cf3cec109d2a",
},
{
// P2SH
Address: "3Ccte7SJz71tcssLPZy3TdWz5DTPeNRbPw",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "a91477e1a3d54f545d83869ae3a6b28b071422801d7b87",
},
{
// P2PKH
Address: "1KrRZSShVkdc8J71CtY4wdw46Rx3BRLKyH",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "76a914cecb25b53809991c7beef2d27bc2be49e78c684388ac",
},
{
// P2WSH
Address: "bc1qeklep85ntjz4605drds6aww9u0qr46qzrv5xswd35uhjuj8ahfcqgf6hak",
DefaultNet: &chaincfg.MainNetParams,
ExpectedError: nil,
ExpectedPkScript: "0020cdbf909e935c855d3e8d1b61aeb9c5e3c03ae8021b286839b1a72f2e48fdba70",
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf("address:%s", spec.Address), func(t *testing.T) {
// Validate Expected PkScript
if spec.ExpectedError == nil {
{
expectedPkScriptRaw, err := hex.DecodeString(spec.ExpectedPkScript)
if err != nil {
t.Fatalf("can't decode expected pkscript %s, Reason: %s", spec.ExpectedPkScript, err)
}
expectedPkScript, err := txscript.ParsePkScript(expectedPkScriptRaw)
if err != nil {
t.Fatalf("invalid expected pkscript %s, Reason: %s", spec.ExpectedPkScript, err)
}
expectedAddress, err := expectedPkScript.Address(spec.DefaultNet)
if err != nil {
t.Fatalf("can't get address from expected pkscript %s, Reason: %s", spec.ExpectedPkScript, err)
}
assert.Equal(t, spec.Address, expectedAddress.EncodeAddress())
}
{
address, err := btcutil.DecodeAddress(spec.Address, spec.DefaultNet)
if err != nil {
t.Fatalf("can't decode address %s(%s),Reason: %s", spec.Address, spec.DefaultNet.Name, err)
}
pkScript, err := txscript.PayToAddrScript(address)
if err != nil {
t.Fatalf("can't get pkscript from address %s(%s),Reason: %s", spec.Address, spec.DefaultNet.Name, err)
}
pkScriptStr := hex.EncodeToString(pkScript)
assert.Equal(t, spec.ExpectedPkScript, pkScriptStr)
}
}
pkScript, err := btcutils.NewPkScript(spec.Address, spec.DefaultNet)
if spec.ExpectedError == anyError {
assert.Error(t, err)
} else if spec.ExpectedError != nil {
assert.ErrorIs(t, err, spec.ExpectedError)
} else {
address, err := btcutils.SafeNewAddress(spec.Address, spec.DefaultNet)
if err != nil {
t.Fatalf("can't create address %s(%s),Reason: %s", spec.Address, spec.DefaultNet.Name, err)
}
// ScriptPubKey from address and from NewPkScript should be the same
assert.Equal(t, address.ScriptPubKey(), pkScript)
// Expected PkScript and New PkScript should be the same
pkScriptStr := hex.EncodeToString(pkScript)
assert.Equal(t, spec.ExpectedPkScript, pkScriptStr)
// Can convert PkScript back to same address
acualPkScript, err := txscript.ParsePkScript(address.ScriptPubKey())
if !assert.NoError(t, err) {
t.Fail()
}
assert.Equal(t, address.Decoded().String(), utils.Must(acualPkScript.Address(spec.DefaultNet)).String())
}
})
}
}
func TestGetAddressTypeFromPkScript(t *testing.T) {
type Spec struct {
PubkeyScript string
ExpectedError error
ExpectedAddressType btcutils.AddressType
}
specs := []Spec{
{
PubkeyScript: "0014602181cc89f7c9f54cb6d7607a3445e3e022895d",
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2WPKH,
},
{
PubkeyScript: "5120ef8d59038dd51093fbfff794f658a07a3697b94d9e6d24e45b28abd88f10e33d",
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2TR,
},
{
PubkeyScript: "a91416eef7e84fb9821db1341b6ccef1c4a4e5ec21e487",
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2SH,
},
{
PubkeyScript: "76a914cecb25b53809991c7beef2d27bc2be49e78c684388ac",
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2PKH,
},
{
PubkeyScript: "0020cdbf909e935c855d3e8d1b61aeb9c5e3c03ae8021b286839b1a72f2e48fdba70",
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2WSH,
},
{
PubkeyScript: "0020cdbf909e935c855d3e8d1b61aeb9c5e3c03ae8021b286839b1a72f2e48fdba70",
ExpectedError: nil,
ExpectedAddressType: btcutils.AddressP2WSH,
},
{
PubkeyScript: "6a5d0614c0a2331441",
ExpectedError: nil,
ExpectedAddressType: txscript.NonStandardTy,
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf("PkScript:%s", spec.PubkeyScript), func(t *testing.T) {
pkScript, err := hex.DecodeString(spec.PubkeyScript)
if err != nil {
t.Fail()
}
actualAddressType, actualError := btcutils.GetAddressTypeFromPkScript(pkScript)
if spec.ExpectedError != nil {
assert.ErrorIs(t, actualError, spec.ExpectedError)
} else {
assert.Equal(t, spec.ExpectedAddressType, actualAddressType)
}
})
}
}

View File

@@ -1,92 +0,0 @@
package psbtutils
import (
"bytes"
"encoding/base64"
"encoding/hex"
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/btcutil/psbt"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
)
const (
// default psbt encoding is hex
DefaultEncoding = EncodingHex
)
type Encoding string
const (
EncodingBase64 Encoding = "base64"
EncodingHex Encoding = "hex"
)
// DecodeString decodes a psbt hex/base64 string into a psbt.Packet
//
// encoding is optional, default is EncodingHex
func DecodeString(psbtStr string, encoding ...Encoding) (*psbt.Packet, error) {
pC, err := Decode([]byte(psbtStr), encoding...)
return pC, errors.WithStack(err)
}
// Decode decodes a psbt hex/base64 byte into a psbt.Packet
//
// encoding is optional, default is EncodingHex
func Decode(psbtB []byte, encoding ...Encoding) (*psbt.Packet, error) {
enc, ok := utils.Optional(encoding)
if !ok {
enc = DefaultEncoding
}
var (
psbtBytes []byte
err error
)
switch enc {
case EncodingBase64, "b64":
psbtBytes = make([]byte, base64.StdEncoding.DecodedLen(len(psbtB)))
_, err = base64.StdEncoding.Decode(psbtBytes, psbtB)
case EncodingHex:
psbtBytes = make([]byte, hex.DecodedLen(len(psbtB)))
_, err = hex.Decode(psbtBytes, psbtB)
default:
return nil, errors.Wrap(errs.Unsupported, "invalid encoding")
}
if err != nil {
return nil, errors.Wrap(err, "can't decode psbt string")
}
pC, err := psbt.NewFromRawBytes(bytes.NewReader(psbtBytes), false)
if err != nil {
return nil, errors.Wrap(err, "can't create psbt from given psbt")
}
return pC, nil
}
// EncodeToString encodes a psbt.Packet into a psbt hex/base64 string
//
// encoding is optional, default is EncodingHex
func EncodeToString(pC *psbt.Packet, encoding ...Encoding) (string, error) {
enc, ok := utils.Optional(encoding)
if !ok {
enc = DefaultEncoding
}
var buf bytes.Buffer
if err := pC.Serialize(&buf); err != nil {
return "", errors.Wrap(err, "can't serialize psbt")
}
switch enc {
case EncodingBase64, "b64":
return base64.StdEncoding.EncodeToString(buf.Bytes()), nil
case EncodingHex:
return hex.EncodeToString(buf.Bytes()), nil
default:
return "", errors.Wrap(errs.Unsupported, "invalid encoding")
}
}

View File

@@ -1,110 +0,0 @@
package psbtutils
import (
"math"
"github.com/btcsuite/btcd/btcutil/psbt"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/pkg/btcutils"
)
// TxFee returns satoshis fee of a transaction given the fee rate (sat/vB)
// and the number of inputs and outputs.
func TxFee(feeRate int64, p *psbt.Packet) (int64, error) {
size, err := PSBTSize(p)
if err != nil {
return 0, errors.Wrap(err, "psbt size")
}
return int64(math.Ceil(size * float64(feeRate))), nil
}
func PredictTxFee(feeRate int64, inputs, outputs int) int64 {
/**
TODO: handle edge cases like:
1. when we predict that we need to use unnecessary UTXOs
2. when we predict that we need to use more value than user have, but user do have enough for the actual transaction
Idea for solving this:
- When trying to find the best UTXOs to use, we:
- Will not reject when user's balance is not enough, instead we will return all UTXOs even if it's not enough.
- Will be okay returning excessive UTXOs (say we predict we need 10K satoshis, but actually we only need 5K satoshis, then we will return UTXOs enough for 10K satoshis)
- And then we:
- Construct the actual PSBT, then select UTXOs to use accordingly,
- If the user's balance is not enough, then we will return an error,
- Or if when we predict we expect to use more UTXOs than the actual transaction, then we will just use what's needed.
*/
size := defaultOverhead + 148*float64(inputs) + 43*float64(outputs)
return int64(math.Ceil(size * float64(feeRate)))
}
type txSize struct {
Overhead float64
Inputs float64
Outputs float64
}
const defaultOverhead = 10.5
// Transaction Virtual Sizes Bytes
//
// Reference: https://bitcoinops.org/en/tools/calc-size/
var txSizes = map[btcutils.TransactionType]txSize{
btcutils.TransactionP2WPKH: {
Inputs: 68,
Outputs: 31,
},
btcutils.TransactionP2TR: {
Inputs: 57.5,
Outputs: 43,
},
btcutils.TransactionP2SH: {
Inputs: 91,
Outputs: 32,
},
btcutils.TransactionP2PKH: {
Inputs: 148,
Outputs: 34,
},
btcutils.TransactionP2WSH: {
Inputs: 104.5,
Outputs: 43,
},
}
func PSBTSize(psbt *psbt.Packet) (float64, error) {
if err := psbt.SanityCheck(); err != nil {
return 0, errors.Wrap(errors.Join(err, errs.InvalidArgument), "psbt sanity check")
}
inputs := map[btcutils.TransactionType]int{}
outputs := map[btcutils.TransactionType]int{}
for _, input := range psbt.Inputs {
addrType, err := btcutils.GetAddressTypeFromPkScript(input.WitnessUtxo.PkScript)
if err != nil {
return 0, errors.Wrap(err, "get address type from pk script")
}
inputs[addrType]++
}
for _, output := range psbt.UnsignedTx.TxOut {
addrType, err := btcutils.GetAddressTypeFromPkScript(output.PkScript)
if err != nil {
return 0, errors.Wrap(err, "get address type from pk script")
}
outputs[addrType]++
}
totalSize := defaultOverhead
for txType, txSizeData := range txSizes {
if inputCount, ok := inputs[txType]; ok {
totalSize += txSizeData.Inputs * float64(inputCount)
}
if outputCount, ok := outputs[txType]; ok {
totalSize += txSizeData.Outputs * float64(outputCount)
}
}
return totalSize, nil
}

View File

@@ -1,131 +0,0 @@
package psbtutils_test
import (
"fmt"
"math"
"testing"
"github.com/gaze-network/indexer-network/pkg/btcutils/psbtutils"
"github.com/stretchr/testify/assert"
)
func TestPSBTSize(t *testing.T) {
type Spec struct {
Name string
PSBTString string
ExpectedError error
ExpectedSize float64
}
specs := []Spec{
{
Name: "3-inputs-3-outputs-taproot",
PSBTString: "70736274ff0100fd06010100000003866c72cfeef533940eaee49b68778e6223914ea671411ec387bdb61f620889910000000000ffffffff866c72cfeef533940eaee49b68778e6223914ea671411ec387bdb61f620889910100000000ffffffff866c72cfeef533940eaee49b68778e6223914ea671411ec387bdb61f620889910200000000ffffffff03b0040000000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f22020000000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f4d370f00000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f000000000001012b58020000000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f0001012b58020000000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f0001012bcb3c0f00000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f00000000",
ExpectedError: nil,
ExpectedSize: 312,
},
{
Name: "mixed-segwit-taproot",
PSBTString: "70736274ff0100fd230202000000061f34960fef4e73c3c4c023f303c16e06f0eebb268bc0d3bac99fa78c031a45b90300000000ffffffff1f34960fef4e73c3c4c023f303c16e06f0eebb268bc0d3bac99fa78c031a45b90400000000ffffffff21c8ec368f2aff1a7baf4964e4070f52e7247ae39edfbda3976f8df4da1b72a00000000000ffffffff969e65b705e3d5071f1743a63381b3aa1ec31e1dbbbd63ab594a19ca399a58af0000000000ffffffffcca5cfd28bd6c54a851d97d029560b3047f7c6482fda7b2f2603d56ade8c95890000000000ffffffff1f34960fef4e73c3c4c023f303c16e06f0eebb268bc0d3bac99fa78c031a45b90500000000ffffffff0908070000000000001600144850d32c3ff585403790507793125d174a5c28e022020000000000001600144850d32c3ff585403790507793125d174a5c28e022020000000000001600144850d32c3ff585403790507793125d174a5c28e0b03600000000000016001459805fc1fdb9f05e190db569987c95c4f9deaa532a680000000000002251203a9ddeb6a2a327fed0f50d18778b28168e3ddb7fdfd4b05f4e438c9174d76a8d58020000000000001600144850d32c3ff585403790507793125d174a5c28e058020000000000001600144850d32c3ff585403790507793125d174a5c28e058020000000000001600144850d32c3ff585403790507793125d174a5c28e0b21f1e00000000001600144850d32c3ff585403790507793125d174a5c28e0000000000001011f58020000000000001600144850d32c3ff585403790507793125d174a5c28e00001011f58020000000000001600144850d32c3ff585403790507793125d174a5c28e00001011f58020000000000001600144850d32c3ff585403790507793125d174a5c28e00001011f220200000000000016001459805fc1fdb9f05e190db569987c95c4f9deaa53010304830000000001012b22020000000000002251203a9ddeb6a2a327fed0f50d18778b28168e3ddb7fdfd4b05f4e438c9174d76a8d010304830000000001011f06432000000000001600144850d32c3ff585403790507793125d174a5c28e000000000000000000000",
ExpectedError: nil,
ExpectedSize: 699,
},
{
Name: "segwit-transfer-to-legacy",
PSBTString: "70736274ff010074020000000124ba4becfc732f3b4729784a3dd0cc2494ae890d826377fd98aeb0607feb1ace0100000000ffffffff0210270000000000001976a91422bae94117be666b593916527d55bdaf030d756e88ac25f62e000000000016001476d1e072c9b8a18fa1e4be697c175e0c642026ac000000000001011fc51d2f000000000016001476d1e072c9b8a18fa1e4be697c175e0c642026ac01086b024730440220759df9d109298a1ef69b9faa1786f4118f0d4d63a68cd2061e217b6090573f62022053ffa117fc21e5bf20e7d16bb786de52dc0214c9a21af87b4e92a639ef66e997012103e0cb213a46a68b1f463a4858635ee44694ce4b512788833d629840341b1219c9000000",
ExpectedError: nil,
ExpectedSize: 143.5,
},
}
for _, spec := range specs {
t.Run(spec.Name, func(t *testing.T) {
p, err := psbtutils.DecodeString(spec.PSBTString)
assert.NoError(t, err)
size, err := psbtutils.PSBTSize(p)
if spec.ExpectedError != nil {
assert.ErrorIs(t, err, spec.ExpectedError)
} else {
assert.Equal(t, spec.ExpectedSize, size)
}
})
}
}
func TestPredictTxFee(t *testing.T) {
type Spec struct {
FeeRate int64
InputsCount int
OutputsCount int
ExpectedFee int64
}
specs := []Spec{
{
FeeRate: 100,
InputsCount: 1,
OutputsCount: 1,
ExpectedFee: int64(math.Ceil((10.5 + 148 + 43) * 100)),
},
{
FeeRate: 1,
InputsCount: 99,
OutputsCount: 99,
ExpectedFee: int64(math.Ceil((10.5 + (99 * 148) + (99 * 43)) * 1)),
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf("feeRate=%d:inputs=%d:outputs=%d", spec.FeeRate, spec.InputsCount, spec.OutputsCount), func(t *testing.T) {
fee := psbtutils.PredictTxFee(spec.FeeRate, spec.InputsCount, spec.OutputsCount)
assert.Equal(t, spec.ExpectedFee, fee)
})
}
}
func TestTxFee(t *testing.T) {
type Spec struct {
Name string
FeeRate int64
PSBTString string
ExpectedError error
ExpectedFee int64
}
specs := []Spec{
{
Name: "3-inputs-3-outputs-taproot",
FeeRate: 10,
PSBTString: "70736274ff0100fd06010100000003866c72cfeef533940eaee49b68778e6223914ea671411ec387bdb61f620889910000000000ffffffff866c72cfeef533940eaee49b68778e6223914ea671411ec387bdb61f620889910100000000ffffffff866c72cfeef533940eaee49b68778e6223914ea671411ec387bdb61f620889910200000000ffffffff03b0040000000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f22020000000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f4d370f00000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f000000000001012b58020000000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f0001012b58020000000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f0001012bcb3c0f00000000002251205b954b2f91ded08c553551037bc71265a69a7586855ba4fdcf785a2494f0c37f00000000",
ExpectedError: nil,
ExpectedFee: 312 * 10,
},
{
Name: "mixed-segwit-taproot",
FeeRate: 20,
PSBTString: "70736274ff0100fd230202000000061f34960fef4e73c3c4c023f303c16e06f0eebb268bc0d3bac99fa78c031a45b90300000000ffffffff1f34960fef4e73c3c4c023f303c16e06f0eebb268bc0d3bac99fa78c031a45b90400000000ffffffff21c8ec368f2aff1a7baf4964e4070f52e7247ae39edfbda3976f8df4da1b72a00000000000ffffffff969e65b705e3d5071f1743a63381b3aa1ec31e1dbbbd63ab594a19ca399a58af0000000000ffffffffcca5cfd28bd6c54a851d97d029560b3047f7c6482fda7b2f2603d56ade8c95890000000000ffffffff1f34960fef4e73c3c4c023f303c16e06f0eebb268bc0d3bac99fa78c031a45b90500000000ffffffff0908070000000000001600144850d32c3ff585403790507793125d174a5c28e022020000000000001600144850d32c3ff585403790507793125d174a5c28e022020000000000001600144850d32c3ff585403790507793125d174a5c28e0b03600000000000016001459805fc1fdb9f05e190db569987c95c4f9deaa532a680000000000002251203a9ddeb6a2a327fed0f50d18778b28168e3ddb7fdfd4b05f4e438c9174d76a8d58020000000000001600144850d32c3ff585403790507793125d174a5c28e058020000000000001600144850d32c3ff585403790507793125d174a5c28e058020000000000001600144850d32c3ff585403790507793125d174a5c28e0b21f1e00000000001600144850d32c3ff585403790507793125d174a5c28e0000000000001011f58020000000000001600144850d32c3ff585403790507793125d174a5c28e00001011f58020000000000001600144850d32c3ff585403790507793125d174a5c28e00001011f58020000000000001600144850d32c3ff585403790507793125d174a5c28e00001011f220200000000000016001459805fc1fdb9f05e190db569987c95c4f9deaa53010304830000000001012b22020000000000002251203a9ddeb6a2a327fed0f50d18778b28168e3ddb7fdfd4b05f4e438c9174d76a8d010304830000000001011f06432000000000001600144850d32c3ff585403790507793125d174a5c28e000000000000000000000",
ExpectedError: nil,
ExpectedFee: 699 * 20,
},
{
Name: "segwit-transfer-to-legacy",
FeeRate: 99,
PSBTString: "70736274ff010074020000000124ba4becfc732f3b4729784a3dd0cc2494ae890d826377fd98aeb0607feb1ace0100000000ffffffff0210270000000000001976a91422bae94117be666b593916527d55bdaf030d756e88ac25f62e000000000016001476d1e072c9b8a18fa1e4be697c175e0c642026ac000000000001011fc51d2f000000000016001476d1e072c9b8a18fa1e4be697c175e0c642026ac01086b024730440220759df9d109298a1ef69b9faa1786f4118f0d4d63a68cd2061e217b6090573f62022053ffa117fc21e5bf20e7d16bb786de52dc0214c9a21af87b4e92a639ef66e997012103e0cb213a46a68b1f463a4858635ee44694ce4b512788833d629840341b1219c9000000",
ExpectedError: nil,
ExpectedFee: int64(math.Ceil((143.5) * 99)),
},
}
for _, spec := range specs {
t.Run(spec.Name, func(t *testing.T) {
p, err := psbtutils.DecodeString(spec.PSBTString)
assert.NoError(t, err)
fee, err := psbtutils.TxFee(spec.FeeRate, p)
if spec.ExpectedError != nil {
assert.ErrorIs(t, err, spec.ExpectedError)
} else {
assert.Equal(t, spec.ExpectedFee, fee)
}
})
}
}

View File

@@ -1,35 +0,0 @@
package psbtutils
import (
"github.com/btcsuite/btcd/btcutil/psbt"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
)
func IsReadyPSBT(pC *psbt.Packet, feeRate int64) (bool, error) {
// if input = output + fee then it's ready
// Calculate tx fee
fee, err := TxFee(feeRate, pC)
if err != nil {
return false, errors.Wrap(err, "calculate fee")
}
// sum total input and output
totalInputValue := lo.SumBy(pC.Inputs, func(input psbt.PInput) int64 { return input.WitnessUtxo.Value })
totalOutputValue := lo.SumBy(pC.UnsignedTx.TxOut, func(txout *wire.TxOut) int64 { return txout.Value }) + fee
// it's perfect match
if totalInputValue == totalOutputValue {
return true, nil
}
// if input is more than output + fee but not more than 1000 satoshi,
// then it's ready
if totalInputValue > totalOutputValue && totalInputValue-totalOutputValue < 1000 {
return true, nil
}
return false, nil
}

21
pkg/btcutils/satoshi.go Normal file
View File

@@ -0,0 +1,21 @@
package btcutils
import "github.com/shopspring/decimal"
const (
BitcoinDecimals = 8
)
// satsUnit is 10^8
var satsUnit = decimal.New(1, BitcoinDecimals)
// BitcoinToSatoshi converts a amount in Bitcoin format to Satoshi format.
func BitcoinToSatoshi(v float64) int64 {
amount := decimal.NewFromFloat(v)
return amount.Mul(satsUnit).IntPart()
}
// SatoshiToBitcoin converts a amount in Satoshi format to Bitcoin format.
func SatoshiToBitcoin(v int64) float64 {
return decimal.New(v, -BitcoinDecimals).InexactFloat64()
}

View File

@@ -0,0 +1,39 @@
package btcutils
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSatoshiConversion(t *testing.T) {
testcases := []struct {
sats float64
btc int64
}{
{2.29980951, 229980951},
{1.29609085, 129609085},
{1.2768897, 127688970},
{0.62518296, 62518296},
{0.29998462, 29998462},
{0.1251, 12510000},
{0.02016011, 2016011},
{0.0198473, 1984730},
{0.0051711, 517110},
{0.0012, 120000},
{7e-05, 7000},
{3.835e-05, 3835},
{1.962e-05, 1962},
}
for _, testcase := range testcases {
t.Run(fmt.Sprintf("BtcToSats/%v", testcase.sats), func(t *testing.T) {
require.NotEqual(t, testcase.btc, int64(testcase.sats*1e8), "Testcase value should have precision error")
assert.Equal(t, testcase.btc, BitcoinToSatoshi(testcase.sats))
})
t.Run(fmt.Sprintf("SatsToBtc/%v", testcase.sats), func(t *testing.T) {
assert.Equal(t, testcase.sats, SatoshiToBitcoin(testcase.btc))
})
}
}

View File

@@ -1,21 +0,0 @@
package btcutils
import (
"github.com/Cleverse/go-utilities/utils"
verifier "github.com/bitonicnl/verify-signed-message/pkg"
"github.com/btcsuite/btcd/chaincfg"
"github.com/cockroachdb/errors"
)
func VerifySignature(address string, message string, sigBase64 string, defaultNet ...*chaincfg.Params) error {
net := utils.DefaultOptional(defaultNet, &chaincfg.MainNetParams)
_, err := verifier.VerifyWithChain(verifier.SignedMessage{
Address: address,
Message: message,
Signature: sigBase64,
}, net)
if err != nil {
return errors.WithStack(err)
}
return nil
}

View File

@@ -1,69 +0,0 @@
package btcutils
import (
"testing"
"github.com/btcsuite/btcd/chaincfg"
"github.com/stretchr/testify/assert"
)
func TestVerifySignature(t *testing.T) {
{
message := "Test123"
address := "18J72YSM9pKLvyXX1XAjFXA98zeEvxBYmw"
signature := "Gzhfsw0ItSrrTCChykFhPujeTyAcvVxiXwywxpHmkwFiKuUR2ETbaoFcocmcSshrtdIjfm8oXlJoTOLosZp3Yc8="
network := &chaincfg.MainNetParams
err := VerifySignature(address, message, signature, network)
assert.NoError(t, err)
}
{
address := "tb1qr97cuq4kvq7plfetmxnl6kls46xaka78n2288z"
message := "The outage comes at a time when bitcoin has been fast approaching new highs not seen since June 26, 2019."
signature := "H/bSByRH7BW1YydfZlEx9x/nt4EAx/4A691CFlK1URbPEU5tJnTIu4emuzkgZFwC0ptvKuCnyBThnyLDCqPqT10="
network := &chaincfg.TestNet3Params
err := VerifySignature(address, message, signature, network)
assert.NoError(t, err)
}
{
// Missmatch address
address := "tb1qp7y2ywgrv8a4t9h47yphtgj8w759rk6vgd9ran"
message := "The outage comes at a time when bitcoin has been fast approaching new highs not seen since June 26, 2019."
signature := "H/bSByRH7BW1YydfZlEx9x/nt4EAx/4A691CFlK1URbPEU5tJnTIu4emuzkgZFwC0ptvKuCnyBThnyLDCqPqT10="
network := &chaincfg.TestNet3Params
err := VerifySignature(address, message, signature, network)
assert.Error(t, err)
}
{
// Missmatch signature
address := "tb1qr97cuq4kvq7plfetmxnl6kls46xaka78n2288z"
message := "The outage comes at a time when bitcoin has been fast approaching new highs not seen since June 26, 2019."
signature := "Gzhfsw0ItSrrTCChykFhPujeTyAcvVxiXwywxpHmkwFiKuUR2ETbaoFcocmcSshrtdIjfm8oXlJoTOLosZp3Yc8="
network := &chaincfg.TestNet3Params
err := VerifySignature(address, message, signature, network)
assert.Error(t, err)
}
{
// Missmatch message
address := "tb1qr97cuq4kvq7plfetmxnl6kls46xaka78n2288z"
message := "Hello World"
signature := "H/bSByRH7BW1YydfZlEx9x/nt4EAx/4A691CFlK1URbPEU5tJnTIu4emuzkgZFwC0ptvKuCnyBThnyLDCqPqT10="
network := &chaincfg.TestNet3Params
err := VerifySignature(address, message, signature, network)
assert.Error(t, err)
}
{
// Missmatch network
address := "tb1qr97cuq4kvq7plfetmxnl6kls46xaka78n2288z"
message := "The outage comes at a time when bitcoin has been fast approaching new highs not seen since June 26, 2019."
signature := "H/bSByRH7BW1YydfZlEx9x/nt4EAx/4A691CFlK1URbPEU5tJnTIu4emuzkgZFwC0ptvKuCnyBThnyLDCqPqT10="
network := &chaincfg.MainNetParams
err := VerifySignature(address, message, signature, network)
assert.Error(t, err)
}
}

View File

@@ -1,10 +0,0 @@
package btcutils
const (
// TxVersion is the current latest supported transaction version.
TxVersion = 2
// MaxTxInSequenceNum is the maximum sequence number the sequence field
// of a transaction input can be.
MaxTxInSequenceNum uint32 = 0xffffffff
)

View File

@@ -4,6 +4,7 @@ import (
"encoding/hex"
"strings"
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
)
@@ -12,6 +13,9 @@ const (
witnessSeparator = " "
)
// CoinbaseWitness is the witness data for a coinbase transaction.
var CoinbaseWitness = utils.Must(WitnessFromHex([]string{"0000000000000000000000000000000000000000000000000000000000000000"}))
// WitnessToHex formats the passed witness stack as a slice of hex-encoded strings.
func WitnessToHex(witness wire.TxWitness) []string {
if len(witness) == 0 {
@@ -37,7 +41,10 @@ func WitnessToString(witness wire.TxWitness) string {
// WitnessFromHex parses the passed slice of hex-encoded strings into a witness stack.
func WitnessFromHex(witnesses []string) (wire.TxWitness, error) {
// NOTE: some witness from bitcoin node are empty and some are nil(most are nil), it's not clear why.
// For now, we will return nil for both cases.
if len(witnesses) == 0 {
// return wire.TxWitness{}, nil
return nil, nil
}

View File

@@ -10,28 +10,23 @@ import (
"github.com/gofiber/fiber/v2"
)
// New setup error handler middleware
func New() fiber.Handler {
return func(ctx *fiber.Ctx) error {
err := ctx.Next()
if err == nil {
return nil
}
func NewHTTPErrorHandler() func(ctx *fiber.Ctx, err error) error {
return func(ctx *fiber.Ctx, err error) error {
if e := new(errs.PublicError); errors.As(err, &e) {
return errors.WithStack(ctx.Status(http.StatusBadRequest).JSON(fiber.Map{
return errors.WithStack(ctx.Status(http.StatusBadRequest).JSON(map[string]any{
"error": e.Message(),
}))
}
if e := new(fiber.Error); errors.As(err, &e) {
return errors.WithStack(ctx.Status(e.Code).JSON(fiber.Map{
"error": e.Error(),
}))
return errors.WithStack(ctx.Status(e.Code).SendString(e.Error()))
}
logger.ErrorContext(ctx.UserContext(), "Something went wrong, api error",
slogx.String("event", "api_error"),
logger.ErrorContext(ctx.UserContext(), "Something went wrong, unhandled api error",
slogx.String("event", "api_unhandled_error"),
slogx.Error(err),
)
return errors.WithStack(ctx.Status(http.StatusInternalServerError).JSON(fiber.Map{
return errors.WithStack(ctx.Status(http.StatusInternalServerError).JSON(map[string]any{
"error": "Internal Server Error",
}))
}

View File

@@ -5,11 +5,12 @@ import (
"encoding/json"
"log/slog"
"net/url"
"path"
"strings"
"time"
"github.com/Cleverse/go-utilities/utils"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/valyala/fasthttp"
)
@@ -23,14 +24,13 @@ type Config struct {
}
type Client struct {
baseURL *url.URL
baseURL string
Config
}
func New(baseURL string, config ...Config) (*Client, error) {
parsedBaseURL, err := url.Parse(baseURL)
if err != nil {
return nil, errors.Wrap(err, "can't parse base url")
if _, err := url.Parse(baseURL); err != nil {
return nil, errors.Join(errs.InvalidArgument, errors.Wrap(err, "can't parse base url"))
}
var cf Config
if len(config) > 0 {
@@ -40,7 +40,7 @@ func New(baseURL string, config ...Config) (*Client, error) {
cf.Headers = make(map[string]string)
}
return &Client{
baseURL: parsedBaseURL,
baseURL: baseURL,
Config: cf,
}, nil
}
@@ -60,21 +60,11 @@ type HttpResponse struct {
}
func (r *HttpResponse) UnmarshalBody(out any) error {
body, err := r.BodyUncompressed()
err := json.Unmarshal(r.Body(), out)
if err != nil {
return errors.Wrapf(err, "can't uncompress body from %v", r.URL)
}
switch strings.ToLower(string(r.Header.ContentType())) {
case "application/json", "application/json; charset=utf-8":
if err := json.Unmarshal(body, out); err != nil {
return errors.Wrapf(err, "can't unmarshal json body from %s, %q", r.URL, string(body))
}
return nil
case "text/plain", "text/plain; charset=utf-8":
return errors.Errorf("can't unmarshal plain text %q", string(body))
default:
return errors.Errorf("unsupported content type: %s, contents: %v", r.Header.ContentType(), string(r.Body()))
return errors.Wrapf(err, "can't unmarshal json body from %v, %v", r.URL, string(r.Body()))
}
return nil
}
func (h *Client) request(ctx context.Context, reqOptions RequestOptions) (*HttpResponse, error) {
@@ -87,14 +77,9 @@ func (h *Client) request(ctx context.Context, reqOptions RequestOptions) (*HttpR
for k, v := range reqOptions.Header {
req.Header.Set(k, v)
}
parsedUrl := h.BaseURL()
parsedUrl.Path = path.Join(parsedUrl.Path, reqOptions.path)
baseQuery := parsedUrl.Query()
for k, v := range reqOptions.Query {
baseQuery[k] = v
}
parsedUrl.RawQuery = baseQuery.Encode()
parsedUrl := utils.Must(url.Parse(h.baseURL)) // checked in httpclient.New
parsedUrl.Path = reqOptions.path
parsedUrl.RawQuery = reqOptions.Query.Encode()
// remove %20 from url (empty space)
url := strings.TrimSuffix(parsedUrl.String(), "%20")
@@ -126,7 +111,6 @@ func (h *Client) request(ctx context.Context, reqOptions RequestOptions) (*HttpR
logger = logger.With(
slog.Int("status_code", resp.StatusCode()),
slog.String("resp_content_type", string(resp.Header.ContentType())),
slog.String("resp_content_encoding", string(resp.Header.ContentEncoding())),
slog.Int("resp_content_length", len(resp.Body())),
)
}
@@ -150,12 +134,6 @@ func (h *Client) request(ctx context.Context, reqOptions RequestOptions) (*HttpR
return &httpResponse, nil
}
// BaseURL returns the cloned base URL of the client.
func (h *Client) BaseURL() *url.URL {
u := *h.baseURL
return &u
}
func (h *Client) Do(ctx context.Context, method, path string, reqOptions RequestOptions) (*HttpResponse, error) {
reqOptions.path = path
reqOptions.method = method

View File

@@ -119,10 +119,10 @@ type Config struct {
// - Text (default)
// - JSON
// - GCP: Output format for Stackdriver Logging/Cloud Logging or others GCP services.
Output string `mapstructure:"output" env:"OUTPUT" envDefault:"text"`
Output string `mapstructure:"output"`
// Debug is enabled logger level debug. (default: false)
Debug bool `mapstructure:"debug" env:"DEBUG" envDefault:"false"`
Debug bool `mapstructure:"debug"`
}
var (

View File

@@ -1,7 +0,0 @@
# Proxies IP Range Resources
- Cloudflare - https://www.cloudflare.com/ips/
- GCP Load Balancer - https://cloud.google.com/load-balancing/docs/health-check-concepts#ip-ranges
- GCP Compute Engine, Customer-usable external IP address ranges - https://www.gstatic.com/ipranges/cloud.json
- Other GCP Services - https://cloud.google.com/compute/docs/faq#networking
- Other Resources - https://github.com/lord-alfred/ipranges

View File

@@ -1,21 +0,0 @@
package requestcontext
// requestcontextError implements error interface
var _ error = requestcontextError{}
type requestcontextError struct {
err error
status int
message string
}
func (r requestcontextError) Error() string {
if r.err != nil {
return r.err.Error()
}
return r.message
}
func (r requestcontextError) Unwrap() error {
return r.err
}

View File

@@ -1,44 +0,0 @@
package requestcontext
import (
"context"
"log/slog"
"net/http"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gofiber/fiber/v2"
)
type Response struct {
Result any `json:"result"`
Error string `json:"error,omitempty"`
}
type Option func(ctx context.Context, c *fiber.Ctx) (context.Context, error)
func New(opts ...Option) fiber.Handler {
return func(c *fiber.Ctx) error {
var err error
ctx := c.UserContext()
for i, opt := range opts {
ctx, err = opt(ctx, c)
if err != nil {
rErr := requestcontextError{}
if errors.As(err, &rErr) {
return c.Status(rErr.status).JSON(Response{Error: rErr.message})
}
logger.ErrorContext(ctx, "failed to extract request context",
err,
slog.String("event", "requestcontext/error"),
slog.String("module", "requestcontext"),
slog.Int("optionIndex", i),
)
return c.Status(http.StatusInternalServerError).JSON(Response{Error: "internal server error"})
}
}
c.SetUserContext(ctx)
return c.Next()
}
}

View File

@@ -1,150 +0,0 @@
package requestcontext
import (
"context"
"log/slog"
"net"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gofiber/fiber/v2"
)
type clientIPKey struct{}
type WithClientIPConfig struct {
// [Optional] TrustedProxiesIP is a list of all proxies IP ranges that's between the server and the client.
//
// If it's provided, it will walk backwards from the last IP in `X-Forwarded-For` header
// and use first IP that's not trusted proxy(not in the given IP ranges.)
//
// **If you want to use this option, you should provide all of probable proxies IP ranges.**
//
// This is lowest priority.
TrustedProxiesIP []string `env:"TRUSTED_PROXIES_IP" mapstructure:"trusted_proxies_ip"`
// [Optional] TrustedHeader is a header name for getting client IP. (e.g. X-Real-IP, CF-Connecting-IP, etc.)
//
// This is highest priority, it will ignore rest of the options if it's provided.
TrustedHeader string `env:"TRUSTED_HEADER" mapstructure:"trusted_proxies_header"`
// EnableRejectMalformedRequest return 403 Forbidden if the request is from proxies, but can't extract client IP
EnableRejectMalformedRequest bool `env:"ENABLE_REJECT_MALFORMED_REQUEST" envDefault:"false" mapstructure:"enable_reject_malformed_request"`
}
// WithClientIP setup client IP context with XFF Spoofing prevention support.
//
// If request is from proxies, it will use first IP from `X-Forwarded-For` header by default.
func WithClientIP(config WithClientIPConfig) Option {
var trustedProxies trustedProxy
if len(config.TrustedProxiesIP) > 0 {
proxy, err := newTrustedProxy(config.TrustedProxiesIP)
if err != nil {
logger.Panic("Failed to parse trusted proxies", err)
}
trustedProxies = proxy
}
return func(ctx context.Context, c *fiber.Ctx) (context.Context, error) {
// Extract client IP from given header
if config.TrustedHeader != "" {
headerIP := c.Get(config.TrustedHeader)
// validate ip from header
if ip := net.ParseIP(headerIP); ip != nil {
return context.WithValue(ctx, clientIPKey{}, headerIP), nil
}
}
// Extract client IP from XFF header
rawIPs := c.IPs()
ips := parseIPs(rawIPs)
// If the request is directly from client, we can use direct remote IP address
if len(ips) == 0 {
return context.WithValue(ctx, clientIPKey{}, c.IP()), nil
}
// Walk back and find first IP that's not trusted proxy
if len(trustedProxies) > 0 {
for i := len(ips) - 1; i >= 0; i-- {
if !trustedProxies.IsTrusted(ips[i]) {
return context.WithValue(ctx, clientIPKey{}, ips[i].String()), nil
}
}
// If all IPs are trusted proxies, return first IP in XFF header
return context.WithValue(ctx, clientIPKey{}, rawIPs[0]), nil
}
// Finally, if we can't extract client IP, return forbidden
if config.EnableRejectMalformedRequest {
logger.WarnContext(ctx, "IP Spoofing detected, returning 403 Forbidden",
slog.String("event", "requestcontext/ip_spoofing_detected"),
slog.String("module", "requestcontext/with_clientip"),
slog.String("ip", c.IP()),
slog.Any("ips", rawIPs),
)
return nil, requestcontextError{
status: fiber.StatusForbidden,
message: "not allowed to access",
}
}
// Fallback to first IP in XFF header
return context.WithValue(ctx, clientIPKey{}, rawIPs[0]), nil
}
}
// GetClientIP get clientIP from context. If not found, return empty string
//
// Warning: Request context should be setup before using this function
func GetClientIP(ctx context.Context) string {
if ip, ok := ctx.Value(clientIPKey{}).(string); ok {
return ip
}
return ""
}
type trustedProxy []*net.IPNet
// newTrustedProxy create a new trusted proxies instance for preventing IP spoofing (XFF Attacks)
func newTrustedProxy(ranges []string) (trustedProxy, error) {
nets, err := parseCIDRs(ranges)
if err != nil {
return nil, errors.WithStack(err)
}
return trustedProxy(nets), nil
}
func (t trustedProxy) IsTrusted(ip net.IP) bool {
if ip == nil {
return false
}
for _, r := range t {
if r.Contains(ip) {
return true
}
}
return false
}
func parseCIDRs(ranges []string) ([]*net.IPNet, error) {
nets := make([]*net.IPNet, 0, len(ranges))
for _, r := range ranges {
_, ipnet, err := net.ParseCIDR(r)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse CIDR for %q", r)
}
nets = append(nets, ipnet)
}
return nets, nil
}
func parseIPs(ranges []string) []net.IP {
ip := make([]net.IP, 0, len(ranges))
for _, r := range ranges {
ip = append(ip, net.ParseIP(r))
}
return ip
}

View File

@@ -1,47 +0,0 @@
package requestcontext
import (
"context"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/requestid"
fiberutils "github.com/gofiber/fiber/v2/utils"
)
type requestIdKey struct{}
// GetRequestId get requestId from context. If not found, return empty string
//
// Warning: Request context should be setup before using this function
func GetRequestId(ctx context.Context) string {
if id, ok := ctx.Value(requestIdKey{}).(string); ok {
return id
}
return ""
}
func WithRequestId() Option {
return func(ctx context.Context, c *fiber.Ctx) (context.Context, error) {
// Try to get id from fiber context.
requestId, ok := c.Locals(requestid.ConfigDefault.ContextKey).(string)
if !ok || requestId == "" {
// Try to get id from request, else we generate one
requestId = c.Get(requestid.ConfigDefault.Header, fiberutils.UUID())
// Set new id to response header
c.Set(requestid.ConfigDefault.Header, requestId)
// Add the request ID to locals (fasthttp UserValue storage)
c.Locals(requestid.ConfigDefault.ContextKey, requestId)
}
// Add the request ID to context
ctx = context.WithValue(ctx, requestIdKey{}, requestId)
// Add the requuest ID to context logger
ctx = logger.WithContext(ctx, "requestId", requestId)
return ctx, nil
}
}

View File

@@ -1,116 +0,0 @@
package requestlogger
import (
"log/slog"
"net/http"
"strings"
"time"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
"github.com/gofiber/fiber/v2"
)
type Config struct {
WithRequestHeader bool `env:"REQUEST_HEADER" envDefault:"false" mapstructure:"request_header"`
WithRequestQuery bool `env:"REQUEST_QUERY" envDefault:"false" mapstructure:"request_query"`
Disable bool `env:"DISABLE" envDefault:"false" mapstructure:"disable"` // Disable logger level `INFO`
HiddenRequestHeaders []string `env:"HIDDEN_REQUEST_HEADERS" mapstructure:"hidden_request_headers"`
}
// New setup request context and information
func New(config Config) fiber.Handler {
hiddenRequestHeaders := make(map[string]struct{}, len(config.HiddenRequestHeaders))
for _, header := range config.HiddenRequestHeaders {
hiddenRequestHeaders[strings.TrimSpace(strings.ToLower(header))] = struct{}{}
}
return func(c *fiber.Ctx) error {
start := time.Now()
// Continue stack
err := c.Next()
end := time.Now()
latency := end.Sub(start)
status := c.Response().StatusCode()
baseAttrs := []slog.Attr{
slog.String("event", "api_request"),
slog.Int64("latency", latency.Milliseconds()),
slog.String("latencyHuman", latency.String()),
}
// prep request attributes
requestAttributes := []slog.Attr{
slog.Time("time", start),
slog.String("method", c.Method()),
slog.String("host", c.Hostname()),
slog.String("path", c.Path()),
slog.String("route", c.Route().Path),
slog.String("ip", requestcontext.GetClientIP(c.UserContext())),
slog.String("remoteIP", c.Context().RemoteIP().String()),
slog.Any("x-forwarded-for", c.IPs()),
slog.String("user-agent", string(c.Context().UserAgent())),
slog.Any("params", c.AllParams()),
slog.Any("query", c.Queries()),
slog.Int("length", len((c.Body()))),
}
// prep response attributes
responseAttributes := []slog.Attr{
slog.Time("time", end),
slog.Int("status", status),
slog.Int("length", len(c.Response().Body())),
}
// request query
if config.WithRequestQuery {
requestAttributes = append(requestAttributes, slog.String("query", string(c.Request().URI().QueryString())))
}
// request headers
if config.WithRequestHeader {
kv := []any{}
for k, v := range c.GetReqHeaders() {
if _, found := hiddenRequestHeaders[strings.ToLower(k)]; found {
continue
}
kv = append(kv, slog.Any(k, v))
}
requestAttributes = append(requestAttributes, slog.Group("header", kv...))
}
level := slog.LevelInfo
if err != nil || status >= http.StatusInternalServerError {
level = slog.LevelError
// error attributes
logErr := err
if logErr == nil {
logErr = fiber.NewError(status)
}
baseAttrs = append(baseAttrs, slog.Any("error", logErr))
}
if config.Disable && level == slog.LevelInfo {
return errors.WithStack(err)
}
logger.LogAttrs(c.UserContext(), level, "Request Completed", append([]slog.Attr{
{
Key: "request",
Value: slog.GroupValue(requestAttributes...),
},
{
Key: "response",
Value: slog.GroupValue(responseAttributes...),
},
}, baseAttrs...)...,
)
return errors.WithStack(err)
}
}

133
pkg/parquetutils/buffer.go Normal file
View File

@@ -0,0 +1,133 @@
// nolint: wrapcheck
package parquetutils
import (
"errors"
"io"
"sync"
"github.com/xitongsys/parquet-go/source"
)
var (
// Make sure Buffer implements the ParquetFile interface.
_ source.ParquetFile = (*Buffer)(nil)
// Make sure Buffer implements the io.WriterAt interface.
_ io.WriterAt = (*Buffer)(nil)
)
// Buffer allows reading parquet messages from a memory buffer.
type Buffer struct {
buf []byte
loc int
m sync.Mutex
}
// NewBuffer creates a new in memory parquet buffer.
func NewBuffer() *Buffer {
return &Buffer{buf: make([]byte, 0, 512)}
}
// NewBufferFrom creates new in memory parquet buffer from the given bytes.
// It uses the provided slice as its buffer.
func NewBufferFrom(s []byte) *Buffer {
return &Buffer{
buf: s,
}
}
func (b *Buffer) Create(string) (source.ParquetFile, error) {
return &Buffer{buf: make([]byte, 0, 512)}, nil
}
func (b *Buffer) Open(string) (source.ParquetFile, error) {
return NewBufferFrom(b.Bytes()), nil
}
// Seek seeks in the underlying memory buffer.
func (b *Buffer) Seek(offset int64, whence int) (int64, error) {
newLoc := b.loc
switch whence {
case io.SeekStart:
newLoc = int(offset)
case io.SeekCurrent:
newLoc += int(offset)
case io.SeekEnd:
newLoc = len(b.buf) + int(offset)
default:
return int64(b.loc), errors.New("Seek: invalid whence")
}
if newLoc < 0 {
return int64(b.loc), errors.New("Seek: invalid offset")
}
if newLoc > len(b.buf) {
newLoc = len(b.buf)
}
b.loc = newLoc
return int64(b.loc), nil
}
// Read reads data form BufferFile into p.
func (b *Buffer) Read(p []byte) (n int, err error) {
n = copy(p, b.buf[b.loc:len(b.buf)])
b.loc += n
if b.loc == len(b.buf) {
return n, io.EOF
}
return n, nil
}
// Write writes data from p into BufferFile.
func (b *Buffer) Write(p []byte) (n int, err error) {
n, err = b.WriteAt(p, int64(b.loc))
if err != nil {
return 0, err
}
b.loc += n
return
}
// WriteAt writes a slice of bytes to a buffer starting at the position provided
// The number of bytes written will be returned, or error. Can overwrite previous
// written slices if the write ats overlap.
func (b *Buffer) WriteAt(p []byte, pos int64) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
pLen := len(p)
expLen := pos + int64(pLen)
if int64(len(b.buf)) < expLen {
if int64(cap(b.buf)) < expLen {
newBuf := make([]byte, expLen)
copy(newBuf, b.buf)
b.buf = newBuf
}
b.buf = b.buf[:expLen]
}
copy(b.buf[pos:], p)
return pLen, nil
}
// Close is a no-op for a memory buffer.
func (*Buffer) Close() error {
return nil
}
// Bytes returns the underlying buffer bytes.
func (b *Buffer) Bytes() []byte {
return b.buf
}
// Reset resets the buffer to be empty,
// but it retains the underlying storage for use by future writes.
func (b *Buffer) Reset() {
b.m.Lock()
defer b.m.Unlock()
b.buf = b.buf[:0]
b.loc = 0
}

View File

@@ -0,0 +1,26 @@
package parquetutils
import (
"github.com/pkg/errors"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/source"
)
// ReaderConcurrency parallel number of file readers.
var ReaderConcurrency int64 = 8
// ReadAll reads all records from the parquet file.
func ReadAll[T any](sourceFile source.ParquetFile) ([]T, error) {
r, err := reader.NewParquetReader(sourceFile, new(T), ReaderConcurrency)
if err != nil {
return nil, errors.Wrap(err, "can't create parquet reader")
}
defer r.ReadStop()
data := make([]T, r.GetNumRows())
if err = r.Read(&data); err != nil {
return nil, errors.Wrap(err, "failed to read parquet data")
}
return data, nil
}