mirror of
https://github.com/alexgo-io/gaze-indexer.git
synced 2026-01-12 22:43:22 +08:00
Compare commits
17 Commits
v0.7.3
...
feature/s3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f2dd80546 | ||
|
|
ad26ea0bff | ||
|
|
23b88c7859 | ||
|
|
d8b8ae42fb | ||
|
|
c1362ae328 | ||
|
|
6585da5907 | ||
|
|
41cb5de9c0 | ||
|
|
fc8bad75a5 | ||
|
|
7373944c85 | ||
|
|
89a2e58622 | ||
|
|
9ea9ebdb30 | ||
|
|
5e46a87201 | ||
|
|
e98c3def55 | ||
|
|
a3f902f5d5 | ||
|
|
e8b4f5a2de | ||
|
|
611717706b | ||
|
|
83b38bc67b |
688
core/datasources/aws_public_data.go
Normal file
688
core/datasources/aws_public_data.go
Normal file
@@ -0,0 +1,688 @@
|
||||
// AWS Public Blockchain Datasource
|
||||
// - https://registry.opendata.aws/aws-public-blockchain
|
||||
// - https://github.com/aws-solutions-library-samples/guidance-for-digital-assets-on-aws
|
||||
//
|
||||
// To setup your own data source, see: https://github.com/aws-solutions-library-samples/guidance-for-digital-assets-on-aws/blob/main/analytics/producer/README.md
|
||||
package datasources
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"math"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/common"
|
||||
"github.com/gaze-network/indexer-network/common/errs"
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
"github.com/gaze-network/indexer-network/internal/subscription"
|
||||
"github.com/gaze-network/indexer-network/pkg/btcutils"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
"github.com/gaze-network/indexer-network/pkg/parquetutils"
|
||||
"github.com/samber/lo"
|
||||
parquettypes "github.com/xitongsys/parquet-go/types"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
awsPublicDataS3Region = "us-east-2"
|
||||
awsPublicDataS3Bucket = "aws-public-blockchain"
|
||||
|
||||
defaultAWSPublicDataDownloadConcurrency = 8
|
||||
)
|
||||
|
||||
var firstBitcoinTimestamp = time.Date(2009, time.January, 3, 18, 15, 5, 0, time.UTC)
|
||||
|
||||
// Make sure to implement the BitcoinDatasource interface
|
||||
var _ Datasource[*types.Block] = (*AWSPublicDataDatasource)(nil)
|
||||
|
||||
type AWSPublicDataDatasourceConfig struct {
|
||||
// The number of goroutines to spin up in parallel when downloading parts.
|
||||
// Concurrency of 1 will download the parts sequentially.
|
||||
// Default is 8.
|
||||
//
|
||||
// CAUTION: High concurrency with low part size can reduce the time to download file,
|
||||
// but it can also increase the memory usage.
|
||||
DownloadConcurrency int
|
||||
|
||||
// The size (in bytes) to request from S3 for each part.
|
||||
// Default is depend on the concurrency and file size (part size = file size / concurrency).
|
||||
//
|
||||
// CAUTION: High concurrency with low part size can reduce the time to download file,
|
||||
// but it can also increase the memory usage.
|
||||
DownloadPartSize int64 `mapstructure:"download_part_size"`
|
||||
}
|
||||
|
||||
type AWSPublicDataDatasource struct {
|
||||
btcDatasource Datasource[*types.Block]
|
||||
s3Client *s3.Client
|
||||
s3Bucket string
|
||||
config AWSPublicDataDatasourceConfig
|
||||
}
|
||||
|
||||
func NewAWSPublicData(btcDatasource Datasource[*types.Block], conf AWSPublicDataDatasourceConfig) *AWSPublicDataDatasource {
|
||||
sdkConfig, err := config.LoadDefaultConfig(context.Background())
|
||||
if err != nil {
|
||||
logger.Panic("Can't load AWS SDK user config", slogx.Error(err), slog.String("package", "datasources"))
|
||||
}
|
||||
|
||||
// TODO: support user defined config (self-hosted s3 bucket)
|
||||
s3client := s3.NewFromConfig(sdkConfig, func(o *s3.Options) {
|
||||
o.Region = awsPublicDataS3Region
|
||||
o.Credentials = aws.AnonymousCredentials{}
|
||||
})
|
||||
|
||||
if conf.DownloadConcurrency <= 0 {
|
||||
conf.DownloadConcurrency = defaultAWSPublicDataDownloadConcurrency
|
||||
}
|
||||
|
||||
return &AWSPublicDataDatasource{
|
||||
btcDatasource: btcDatasource,
|
||||
s3Client: s3client,
|
||||
s3Bucket: awsPublicDataS3Bucket,
|
||||
config: conf,
|
||||
}
|
||||
}
|
||||
|
||||
func (d AWSPublicDataDatasource) Name() string {
|
||||
return fmt.Sprintf("aws_public_data/%s", d.btcDatasource.Name())
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
|
||||
ch := make(chan []*types.Block)
|
||||
subscription, err := d.FetchAsync(ctx, from, to, ch)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
defer subscription.Unsubscribe()
|
||||
|
||||
blocks := make([]*types.Block, 0)
|
||||
for {
|
||||
select {
|
||||
case b, ok := <-ch:
|
||||
if !ok {
|
||||
return blocks, nil
|
||||
}
|
||||
blocks = append(blocks, b...)
|
||||
case <-subscription.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, errors.Wrap(err, "context done")
|
||||
}
|
||||
return blocks, nil
|
||||
case err := <-subscription.Err():
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "got error while fetch async")
|
||||
}
|
||||
return blocks, nil
|
||||
case <-ctx.Done():
|
||||
return nil, errors.Wrap(ctx.Err(), "context done")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
|
||||
ctx = logger.WithContext(ctx,
|
||||
slogx.String("package", "datasources"),
|
||||
slogx.String("datasource", d.Name()),
|
||||
)
|
||||
|
||||
start, end, skip, err := d.prepareRange(ctx, from, to)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to prepare fetch range")
|
||||
}
|
||||
|
||||
subscription := subscription.NewSubscription(ch)
|
||||
if skip {
|
||||
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unsubscribe")
|
||||
}
|
||||
return subscription.Client(), nil
|
||||
}
|
||||
|
||||
startFiles, err := d.listBlocksFilesByDate(ctx, start.Timestamp)
|
||||
if err != nil {
|
||||
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unsubscribe")
|
||||
}
|
||||
return nil, errors.Wrap(err, "failed to list files by date")
|
||||
}
|
||||
|
||||
// supported only merged blocks files
|
||||
startFiles = lo.Filter(startFiles, func(file awsFile, _ int) bool {
|
||||
return strings.Contains(file.Key, "part-")
|
||||
})
|
||||
|
||||
// use other datasource instead of s3 if there's no supported data
|
||||
if len(startFiles) == 0 {
|
||||
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unsubscribe")
|
||||
}
|
||||
s, err := d.btcDatasource.FetchAsync(ctx, start.Height, end.Height, ch)
|
||||
return s, errors.WithStack(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
// add a bit delay to prevent shutdown before client receive all blocks
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
subscription.Unsubscribe()
|
||||
}()
|
||||
// loop through each day until reach the end of supported data or within end block date
|
||||
for ts := start.Timestamp; ts.Before(end.Timestamp.Round(24*time.Hour)) && ts.Before(time.Now()); ts = ts.Add(24 * time.Hour) {
|
||||
ctx := logger.WithContext(ctx,
|
||||
slogx.Time("date", ts),
|
||||
slogx.Int64("date_unix", ts.Unix()),
|
||||
)
|
||||
|
||||
logger.DebugContext(ctx, "Fetching data from AWS S3", slogx.Int64("start", start.Height), slogx.Int64("end", end.Height))
|
||||
|
||||
allBlocksFiles, err := d.listBlocksFilesByDate(ctx, ts)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to list blocks files by date from aws s3", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.WithStack(err)); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
allTxsFiles, err := d.listTxsFilesByDate(ctx, ts)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to list txs files by date from aws s3", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.WithStack(err)); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
blocksFiles := lo.Filter(allBlocksFiles, func(file awsFile, _ int) bool {
|
||||
return strings.Contains(file.Key, "part-")
|
||||
})
|
||||
txsFiles := lo.Filter(allTxsFiles, func(file awsFile, _ int) bool {
|
||||
return strings.Contains(file.Key, "part-")
|
||||
})
|
||||
|
||||
logger.DebugContext(ctx, "Found files in AWS S3 bucket",
|
||||
slogx.Int("files_blocks", len(allBlocksFiles)),
|
||||
slogx.Int("files_blocks_merged", len(blocksFiles)),
|
||||
slogx.Int("files_txs_all", len(allTxsFiles)),
|
||||
slogx.Int("files_txs_merged", len(txsFiles)),
|
||||
)
|
||||
|
||||
// Reach the end of supported data,
|
||||
// stop fetching data from AWS S3
|
||||
if len(blocksFiles) == 0 || len(txsFiles) == 0 {
|
||||
logger.DebugContext(ctx, "No blocks files found, stop fetching data from AWS S3")
|
||||
return
|
||||
}
|
||||
|
||||
// prevent unexpected error
|
||||
{
|
||||
if len(blocksFiles) != 1 {
|
||||
logger.ErrorContext(ctx, "Unexpected blocks files count, should be 1", slogx.Int("count", len(blocksFiles)))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(errs.InternalError, "unexpected blocks files count")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(txsFiles) != 1 {
|
||||
logger.ErrorContext(ctx, "Unexpected txs files count, should be 1", slogx.Int("count", len(txsFiles)))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(errs.InternalError, "unexpected txs files count")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: use concurrent stream (max 2 goroutine) to download files then sequentially read parquet files
|
||||
// to improve performance while not consuming too much memory (increase around 500 MB per goroutine)
|
||||
var (
|
||||
// TODO: create []byte pool to reduce alloc ops (reduce GC pressure)
|
||||
// TODO: use FileSystem for default buffer (can choose memory or disk buffer)
|
||||
blocksBuffer = parquetutils.NewBuffer()
|
||||
txsBuffer = parquetutils.NewBuffer()
|
||||
)
|
||||
startDownload := time.Now()
|
||||
if err := d.downloadFile(ctx, blocksFiles[0], blocksBuffer); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to download blocks file from AWS S3", slogx.Int("count", len(txsFiles)))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't download blocks file")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
if err := d.downloadFile(ctx, txsFiles[0], txsBuffer); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to download blocks file from AWS S3", slogx.Int("count", len(txsFiles)))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't download blocks file")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
logger.DebugContext(ctx, "Downloaded files from AWS S3",
|
||||
slogx.Duration("duration", time.Since(startDownload)),
|
||||
slogx.Int("sizes_blocks", len(blocksBuffer.Bytes())),
|
||||
slogx.Int("sizes_txs", len(txsBuffer.Bytes())),
|
||||
)
|
||||
|
||||
// Read parquet files
|
||||
startRead := time.Now()
|
||||
|
||||
// we can read all blocks data at once because it's small
|
||||
rawAllBlocks, err := parquetutils.ReadAll[awsBlock](blocksBuffer)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to read parquet blocks data", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't read parquet blocks data")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: We shouldn't read all txs data at once because it's very huge (up to ~1.5GB memory usage)
|
||||
// we should read it by chunk and send it to subscription client to reduce memory usage.
|
||||
// But AWS Public Dataset are not sorted by block number and index,
|
||||
// so we can't avoid reading all transactions data by skip unnecessary transactions
|
||||
// or chunk data by block number to reduce memory usage :(
|
||||
rawAllTxs, err := parquetutils.ReadAll[awsTransaction](blocksBuffer)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to read parquet txs data", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't read parquet blocks data")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
groupRawTxs := lo.GroupBy(rawAllTxs, func(tx awsTransaction) int64 {
|
||||
return tx.BlockNumber
|
||||
})
|
||||
|
||||
// filter blocks data by height range
|
||||
rawFilteredBlocks := lo.Filter(rawAllBlocks, func(block awsBlock, _ int) bool {
|
||||
return block.Number >= start.Height && block.Number <= end.Height
|
||||
})
|
||||
slices.SortFunc(rawFilteredBlocks, func(i, j awsBlock) int {
|
||||
return cmp.Compare(i.Number, j.Number)
|
||||
})
|
||||
|
||||
logger.DebugContext(ctx, "Read parquet files",
|
||||
slogx.Duration("duration", time.Since(startRead)),
|
||||
slogx.Int("total_blocks", len(rawAllBlocks)),
|
||||
slogx.Int("filtered_blocks", len(rawFilteredBlocks)),
|
||||
slogx.Int("total_txs", len(rawAllTxs)),
|
||||
slogx.Int("total_txs_grouped", len(groupRawTxs)),
|
||||
)
|
||||
|
||||
blocks := make([]*types.Block, 0, len(rawFilteredBlocks))
|
||||
for _, rawBlock := range rawFilteredBlocks {
|
||||
blockHeader, err := rawBlock.ToBlockHeader()
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to convert aws block to type block header", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't convert aws block to type block header")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
txs := make([]*types.Transaction, 0, len(groupRawTxs[blockHeader.Height]))
|
||||
for _, rawTx := range groupRawTxs[rawBlock.Number] {
|
||||
tx, err := rawTx.ToTransaction(rawBlock)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to convert aws transaction to type transaction", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't convert aws transaction to type transaction")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
txs = append(txs, tx)
|
||||
}
|
||||
slices.SortFunc(txs, func(i, j *types.Transaction) int {
|
||||
return cmp.Compare(i.Index, j.Index)
|
||||
})
|
||||
|
||||
blocks = append(blocks, &types.Block{
|
||||
Header: blockHeader,
|
||||
Transactions: txs,
|
||||
})
|
||||
}
|
||||
|
||||
logger.DebugContext(ctx, "Send blocks to subscription client", slogx.Int("count", len(blocks)))
|
||||
if err := subscription.Send(ctx, blocks); err != nil {
|
||||
if errors.Is(err, errs.Closed) {
|
||||
logger.DebugContext(ctx, "Subscription client closed, can't send", slogx.Error(err))
|
||||
return
|
||||
}
|
||||
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
|
||||
slogx.Int64("start", blocks[0].Header.Height),
|
||||
slogx.Int64("end", blocks[len(blocks)-1].Header.Height),
|
||||
slogx.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return subscription.Client(), nil
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error) {
|
||||
header, err := d.btcDatasource.GetBlockHeader(ctx, height)
|
||||
return header, errors.WithStack(err)
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) GetCurrentBlockHeight(ctx context.Context) (int64, error) {
|
||||
height, err := d.btcDatasource.GetCurrentBlockHeight(ctx)
|
||||
return height, errors.WithStack(err)
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) prepareRange(ctx context.Context, fromHeight, toHeight int64) (startHeader, endHeader types.BlockHeader, skip bool, err error) {
|
||||
start := fromHeight
|
||||
end := toHeight
|
||||
|
||||
// get current bitcoin block height
|
||||
latestBlockHeight, err := d.btcDatasource.GetCurrentBlockHeight(ctx)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrap(err, "failed to get block count")
|
||||
}
|
||||
|
||||
// set start to genesis block height
|
||||
if start < 0 {
|
||||
start = 0
|
||||
}
|
||||
|
||||
// set end to current bitcoin block height if
|
||||
// - end is -1
|
||||
// - end is greater that current bitcoin block height
|
||||
if end < 0 || end > latestBlockHeight {
|
||||
end = latestBlockHeight
|
||||
}
|
||||
|
||||
// if start is greater than end, skip this round
|
||||
if start > end {
|
||||
return types.BlockHeader{}, types.BlockHeader{}, true, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrapf(err, "block %v", end)
|
||||
}
|
||||
|
||||
group, groupctx := errgroup.WithContext(ctx)
|
||||
group.Go(func() error {
|
||||
startHeader, err = d.GetBlockHeader(groupctx, start)
|
||||
return errors.Wrapf(err, "block %v", start)
|
||||
})
|
||||
group.Go(func() error {
|
||||
endHeader, err = d.GetBlockHeader(ctx, end)
|
||||
return errors.Wrapf(err, "block %v", end)
|
||||
})
|
||||
if err := group.Wait(); err != nil {
|
||||
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrap(err, "failed to get block header")
|
||||
}
|
||||
|
||||
return startHeader, endHeader, false, nil
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string) ([]awsFile, error) {
|
||||
result, err := d.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(d.s3Bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "can't list s3 bucket objects for bucket %q and prefix %q", d.s3Bucket, prefix)
|
||||
}
|
||||
|
||||
// filter empty keys
|
||||
objs := lo.Filter(result.Contents, func(item s3types.Object, _ int) bool { return item.Key != nil })
|
||||
return lo.Map(objs, func(item s3types.Object, _ int) awsFile {
|
||||
return awsFile{
|
||||
Key: *item.Key,
|
||||
Size: *item.Size,
|
||||
LastModified: *item.LastModified,
|
||||
}
|
||||
}), nil
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
|
||||
if date.Before(firstBitcoinTimestamp) {
|
||||
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
|
||||
}
|
||||
prefix := "v1.0/btc/blocks/date=" + date.UTC().Format(time.DateOnly)
|
||||
files, err := d.listFiles(ctx, prefix)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to list blocks files by date")
|
||||
}
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
|
||||
if date.Before(firstBitcoinTimestamp) {
|
||||
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
|
||||
}
|
||||
prefix := "v1.0/btc/transactions/date=" + date.UTC().Format(time.DateOnly)
|
||||
files, err := d.listFiles(ctx, prefix)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to list txs files by date")
|
||||
}
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) downloadFile(ctx context.Context, f awsFile, w io.WriterAt) error {
|
||||
downloader := manager.NewDownloader(d.s3Client, func(md *manager.Downloader) {
|
||||
md.Concurrency = d.config.DownloadConcurrency
|
||||
md.PartSize = d.config.DownloadPartSize
|
||||
if md.PartSize <= 0 {
|
||||
md.PartSize = f.Size / int64(md.Concurrency)
|
||||
}
|
||||
})
|
||||
|
||||
numBytes, err := downloader.Download(ctx, w, &s3.GetObjectInput{
|
||||
Bucket: aws.String(d.s3Bucket),
|
||||
Key: aws.String(f.Key),
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to download file for bucket %q and key %q", d.s3Bucket, f.Key)
|
||||
}
|
||||
|
||||
if numBytes < 1 {
|
||||
return errors.Wrap(errs.NotFound, "got empty file")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: remove unused fields to reduce memory usage
|
||||
type (
|
||||
awsBlock struct {
|
||||
Hash string `parquet:"name=hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
Bits string `parquet:"name=bits, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"` // Hex string format
|
||||
PreviousBlockHash string `parquet:"name=previousblockhash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
MerkleRoot string `parquet:"name=merkle_root, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
CoinbaseParam string `parquet:"name=coinbase_param, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
Timestamp string `parquet:"name=timestamp, type=INT96, repetitiontype=OPTIONAL"`
|
||||
Number int64 `parquet:"name=number, type=INT64, repetitiontype=OPTIONAL"`
|
||||
Version int64 `parquet:"name=version, type=INT64, repetitiontype=OPTIONAL"`
|
||||
Nonce int64 `parquet:"name=nonce, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// MedianTime string `parquet:"name=mediantime, type=INT96, repetitiontype=OPTIONAL"`
|
||||
// Difficulty float64 `parquet:"name=difficulty, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
// Chainwork string `parquet:"name=chainwork, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Size int64 `parquet:"name=size, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// Weight int64 `parquet:"name=weight, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// TransactionCount int64 `parquet:"name=transaction_count, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// StrippedSize int64 `parquet:"name=stripped_size, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// Date string `parquet:"name=date, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// LastModified string `parquet:"name=last_modified, type=INT96, repetitiontype=OPTIONAL"`
|
||||
}
|
||||
awsTransaction struct {
|
||||
Hash string `parquet:"name=hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
BlockHash string `parquet:"name=block_hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
Outputs []*awsTxOutput `parquet:"name=outputs, type=LIST, repetitiontype=OPTIONAL, valuetype=STRUCT"`
|
||||
Inputs []*awsTxInput `parquet:"name=inputs, type=LIST, repetitiontype=OPTIONAL, valuetype=STRUCT"`
|
||||
Version int64 `parquet:"name=version, type=INT64, repetitiontype=OPTIONAL"`
|
||||
Size int64 `parquet:"name=size, type=INT64, repetitiontype=OPTIONAL"`
|
||||
BlockNumber int64 `parquet:"name=block_number, type=INT64, repetitiontype=OPTIONAL"`
|
||||
Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
|
||||
LockTime int64 `parquet:"name=lock_time, type=INT64, repetitiontype=OPTIONAL"`
|
||||
IsCoinbase bool `parquet:"name=is_coinbase, type=BOOLEAN, repetitiontype=OPTIONAL"`
|
||||
// VirtualSize int64 `parquet:"name=virtual_size, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// InputCount int64 `parquet:"name=input_count, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// OutputCount int64 `parquet:"name=output_count, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// OutputValue float64 `parquet:"name=output_value, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
// BlockTimestamp string `parquet:"name=block_timestamp, type=INT96, repetitiontype=OPTIONAL"`
|
||||
// Date string `parquet:"name=date, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// LastModified string `parquet:"name=last_modified, type=INT96, repetitiontype=OPTIONAL"`
|
||||
// Fee float64 `parquet:"name=fee, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
// InputValue float64 `parquet:"name=input_value, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
}
|
||||
awsTxInput struct {
|
||||
ScriptHex string `parquet:"name=script_hex, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
SpentTransactionHash string `parquet:"name=spent_transaction_hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
TxInWitness []*string `parquet:"name=txinwitness, type=LIST, repetitiontype=OPTIONAL, valuetype=BYTE_ARRAY, convertedtype=UTF8"`
|
||||
SpentOutputIndex int64 `parquet:"name=spent_output_index, type=INT64, repetitiontype=OPTIONAL"`
|
||||
Sequence int64 `parquet:"name=sequence, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// Address string `parquet:"name=address, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// RequiredSignatures int64 `parquet:"name=required_signatures, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// ScriptAsm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Value float64 `parquet:"name=value, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
}
|
||||
awsTxOutput struct {
|
||||
Script_hex string `parquet:"name=script_hex, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
Value float64 `parquet:"name=value, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
// Address string `parquet:"name=address, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// Required_signatures int64 `parquet:"name=required_signatures, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// Script_asm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
}
|
||||
|
||||
awsFile struct {
|
||||
Key string
|
||||
Size int64
|
||||
LastModified time.Time
|
||||
}
|
||||
)
|
||||
|
||||
func (a awsBlock) ToBlockHeader() (types.BlockHeader, error) {
|
||||
hash, err := chainhash.NewHashFromStr(a.Hash)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.Wrap(err, "can't convert block hash")
|
||||
}
|
||||
prevBlockHash, err := chainhash.NewHashFromStr(a.PreviousBlockHash)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.Wrap(err, "can't convert previous block hash")
|
||||
}
|
||||
merkleRoot, err := chainhash.NewHashFromStr(a.MerkleRoot)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.Wrap(err, "can't convert merkle root")
|
||||
}
|
||||
|
||||
bits, err := strconv.ParseUint(a.Bits, 16, 32)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.Wrap(err, "can't convert bits from hex str to uint32")
|
||||
}
|
||||
|
||||
return types.BlockHeader{
|
||||
Hash: *hash,
|
||||
Height: a.Number,
|
||||
Version: int32(a.Version),
|
||||
PrevBlock: *prevBlockHash,
|
||||
MerkleRoot: *merkleRoot,
|
||||
Timestamp: parquettypes.INT96ToTime(a.Timestamp),
|
||||
Bits: uint32(bits),
|
||||
Nonce: uint32(a.Nonce),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a awsTransaction) ToTransaction(block awsBlock) (*types.Transaction, error) {
|
||||
blockhash, err := chainhash.NewHashFromStr(block.Hash)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't convert block hash")
|
||||
}
|
||||
msgtx, err := a.MsgTx(block)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't convert aws tx to wire.msgtx")
|
||||
}
|
||||
return types.ParseMsgTx(msgtx, a.BlockNumber, *blockhash, uint32(a.Index)), nil
|
||||
}
|
||||
|
||||
func (a awsTransaction) MsgTx(block awsBlock) (*wire.MsgTx, error) {
|
||||
txIn := make([]*wire.TxIn, 0, len(a.Inputs))
|
||||
txOut := make([]*wire.TxOut, 0, len(a.Outputs))
|
||||
|
||||
// NOTE: coinbase tx from AWS S3 has no inputs, so we need to add it manually,
|
||||
// but we can't guarantee this data is correct especially the sequence number.
|
||||
if a.IsCoinbase && len(a.Inputs) == 0 {
|
||||
scriptsig, err := hex.DecodeString(block.CoinbaseParam)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't decode script hex")
|
||||
}
|
||||
|
||||
txIn = append(txIn, &wire.TxIn{
|
||||
PreviousOutPoint: wire.OutPoint{
|
||||
Hash: common.ZeroHash,
|
||||
Index: math.MaxUint32,
|
||||
},
|
||||
SignatureScript: scriptsig,
|
||||
Witness: btcutils.CoinbaseWitness,
|
||||
Sequence: math.MaxUint32, // most coinbase tx are using max sequence number
|
||||
})
|
||||
}
|
||||
|
||||
for _, in := range a.Inputs {
|
||||
scriptsig, err := hex.DecodeString(in.ScriptHex)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't decode script hex")
|
||||
}
|
||||
|
||||
witness, err := btcutils.WitnessFromHex(lo.Map(in.TxInWitness, func(src *string, _ int) string {
|
||||
if src == nil {
|
||||
return ""
|
||||
}
|
||||
return *src
|
||||
}))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't convert witness")
|
||||
}
|
||||
|
||||
prevOutHash, err := chainhash.NewHashFromStr(in.SpentTransactionHash)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't convert prevout hash")
|
||||
}
|
||||
|
||||
txIn = append(txIn, &wire.TxIn{
|
||||
PreviousOutPoint: wire.OutPoint{
|
||||
Hash: *prevOutHash,
|
||||
Index: uint32(in.SpentOutputIndex),
|
||||
},
|
||||
SignatureScript: scriptsig,
|
||||
Witness: witness,
|
||||
Sequence: uint32(in.Sequence),
|
||||
})
|
||||
}
|
||||
|
||||
for _, out := range a.Outputs {
|
||||
scriptpubkey, err := hex.DecodeString(out.Script_hex)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't decode script hex")
|
||||
}
|
||||
txOut = append(txOut, &wire.TxOut{
|
||||
Value: btcutils.BitcoinToSatoshi(out.Value),
|
||||
PkScript: scriptpubkey,
|
||||
})
|
||||
}
|
||||
|
||||
return &wire.MsgTx{
|
||||
Version: int32(a.Version),
|
||||
TxIn: txIn,
|
||||
TxOut: txOut,
|
||||
LockTime: uint32(a.LockTime),
|
||||
}, nil
|
||||
}
|
||||
@@ -88,7 +88,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
slogx.String("datasource", d.Name()),
|
||||
)
|
||||
|
||||
from, to, skip, err := d.prepareRange(from, to)
|
||||
from, to, skip, err := d.prepareRange(ctx, from, to)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to prepare fetch range")
|
||||
}
|
||||
@@ -212,12 +212,12 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
return subscription.Client(), nil
|
||||
}
|
||||
|
||||
func (d *BitcoinNodeDatasource) prepareRange(fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
|
||||
func (d *BitcoinNodeDatasource) prepareRange(ctx context.Context, fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
|
||||
start = fromHeight
|
||||
end = toHeight
|
||||
|
||||
// get current bitcoin block height
|
||||
latestBlockHeight, err := d.btcclient.GetBlockCount()
|
||||
latestBlockHeight, err := d.GetCurrentBlockHeight(ctx)
|
||||
if err != nil {
|
||||
return -1, -1, false, errors.Wrap(err, "failed to get block count")
|
||||
}
|
||||
@@ -227,7 +227,7 @@ func (d *BitcoinNodeDatasource) prepareRange(fromHeight, toHeight int64) (start,
|
||||
start = 0
|
||||
}
|
||||
|
||||
// set end to current bitcoin block height if
|
||||
// set end to current bitcoin block height if d
|
||||
// - end is -1
|
||||
// - end is greater that current bitcoin block height
|
||||
if end < 0 || end > latestBlockHeight {
|
||||
@@ -292,3 +292,12 @@ func (d *BitcoinNodeDatasource) GetBlockHeader(ctx context.Context, height int64
|
||||
|
||||
return types.ParseMsgBlockHeader(*block, height), nil
|
||||
}
|
||||
|
||||
// GetCurrentBlockHeight fetch current block height from Bitcoin node
|
||||
func (d *BitcoinNodeDatasource) GetCurrentBlockHeight(ctx context.Context) (int64, error) {
|
||||
height, err := d.btcclient.GetBlockCount()
|
||||
if err != nil {
|
||||
return -1, errors.Wrap(err, "failed to get block height")
|
||||
}
|
||||
return height, nil
|
||||
}
|
||||
|
||||
@@ -13,4 +13,5 @@ type Datasource[T any] interface {
|
||||
Fetch(ctx context.Context, from, to int64) ([]T, error)
|
||||
FetchAsync(ctx context.Context, from, to int64, ch chan<- []T) (*subscription.ClientSubscription[[]T], error)
|
||||
GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error)
|
||||
GetCurrentBlockHeight(ctx context.Context) (int64, error)
|
||||
}
|
||||
|
||||
29
go.mod
29
go.mod
@@ -4,6 +4,10 @@ go 1.22
|
||||
|
||||
require (
|
||||
github.com/Cleverse/go-utilities/utils v0.0.0-20240119201306-d71eb577ef11
|
||||
github.com/aws/aws-sdk-go-v2 v1.23.0
|
||||
github.com/aws/aws-sdk-go-v2/config v1.25.3
|
||||
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.14.0
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.43.0
|
||||
github.com/btcsuite/btcd v0.24.0
|
||||
github.com/btcsuite/btcd/btcutil v1.1.5
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
|
||||
@@ -16,18 +20,37 @@ require (
|
||||
github.com/planxnx/concurrent-stream v0.1.5
|
||||
github.com/samber/do/v2 v2.0.0-beta.7
|
||||
github.com/samber/lo v1.39.0
|
||||
github.com/shopspring/decimal v1.3.1
|
||||
github.com/shopspring/decimal v1.4.0
|
||||
github.com/spf13/cobra v1.8.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.18.2
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/valyala/fasthttp v1.51.0
|
||||
github.com/xitongsys/parquet-go v1.6.2
|
||||
github.com/xitongsys/parquet-go-source v0.0.0-20240122235623-d6294584ab18
|
||||
go.uber.org/automaxprocs v1.5.3
|
||||
golang.org/x/sync v0.5.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/andybalholm/brotli v1.0.5 // indirect
|
||||
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
|
||||
github.com/apache/thrift v0.20.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.16.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.17.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.25.3 // indirect
|
||||
github.com/aws/smithy-go v1.17.0 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
|
||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
|
||||
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
|
||||
@@ -40,6 +63,7 @@ require (
|
||||
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
||||
github.com/getsentry/sentry-go v0.18.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/google/uuid v1.5.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
@@ -49,6 +73,7 @@ require (
|
||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
|
||||
github.com/jackc/pgx v3.6.2+incompatible // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
github.com/klauspost/compress v1.17.0 // indirect
|
||||
github.com/kr/pretty v0.3.1 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
@@ -59,6 +84,7 @@ require (
|
||||
github.com/mattn/go-runewidth v0.0.15 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.16 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/rivo/uniseg v0.2.0 // indirect
|
||||
@@ -78,6 +104,7 @@ require (
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
|
||||
golang.org/x/sys v0.17.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
21
pkg/btcutils/satoshi.go
Normal file
21
pkg/btcutils/satoshi.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package btcutils
|
||||
|
||||
import "github.com/shopspring/decimal"
|
||||
|
||||
const (
|
||||
BitcoinDecimals = 8
|
||||
)
|
||||
|
||||
// satsUnit is 10^8
|
||||
var satsUnit = decimal.New(1, BitcoinDecimals)
|
||||
|
||||
// BitcoinToSatoshi converts a amount in Bitcoin format to Satoshi format.
|
||||
func BitcoinToSatoshi(v float64) int64 {
|
||||
amount := decimal.NewFromFloat(v)
|
||||
return amount.Mul(satsUnit).IntPart()
|
||||
}
|
||||
|
||||
// SatoshiToBitcoin converts a amount in Satoshi format to Bitcoin format.
|
||||
func SatoshiToBitcoin(v int64) float64 {
|
||||
return decimal.New(v, -BitcoinDecimals).InexactFloat64()
|
||||
}
|
||||
39
pkg/btcutils/satoshi_test.go
Normal file
39
pkg/btcutils/satoshi_test.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package btcutils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSatoshiConversion(t *testing.T) {
|
||||
testcases := []struct {
|
||||
sats float64
|
||||
btc int64
|
||||
}{
|
||||
{2.29980951, 229980951},
|
||||
{1.29609085, 129609085},
|
||||
{1.2768897, 127688970},
|
||||
{0.62518296, 62518296},
|
||||
{0.29998462, 29998462},
|
||||
{0.1251, 12510000},
|
||||
{0.02016011, 2016011},
|
||||
{0.0198473, 1984730},
|
||||
{0.0051711, 517110},
|
||||
{0.0012, 120000},
|
||||
{7e-05, 7000},
|
||||
{3.835e-05, 3835},
|
||||
{1.962e-05, 1962},
|
||||
}
|
||||
for _, testcase := range testcases {
|
||||
t.Run(fmt.Sprintf("BtcToSats/%v", testcase.sats), func(t *testing.T) {
|
||||
require.NotEqual(t, testcase.btc, int64(testcase.sats*1e8), "Testcase value should have precision error")
|
||||
assert.Equal(t, testcase.btc, BitcoinToSatoshi(testcase.sats))
|
||||
})
|
||||
t.Run(fmt.Sprintf("SatsToBtc/%v", testcase.sats), func(t *testing.T) {
|
||||
assert.Equal(t, testcase.sats, SatoshiToBitcoin(testcase.btc))
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/hex"
|
||||
"strings"
|
||||
|
||||
"github.com/Cleverse/go-utilities/utils"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/cockroachdb/errors"
|
||||
)
|
||||
@@ -12,6 +13,9 @@ const (
|
||||
witnessSeparator = " "
|
||||
)
|
||||
|
||||
// CoinbaseWitness is the witness data for a coinbase transaction.
|
||||
var CoinbaseWitness = utils.Must(WitnessFromHex([]string{"0000000000000000000000000000000000000000000000000000000000000000"}))
|
||||
|
||||
// WitnessToHex formats the passed witness stack as a slice of hex-encoded strings.
|
||||
func WitnessToHex(witness wire.TxWitness) []string {
|
||||
if len(witness) == 0 {
|
||||
@@ -37,7 +41,10 @@ func WitnessToString(witness wire.TxWitness) string {
|
||||
|
||||
// WitnessFromHex parses the passed slice of hex-encoded strings into a witness stack.
|
||||
func WitnessFromHex(witnesses []string) (wire.TxWitness, error) {
|
||||
// NOTE: some witness from bitcoin node are empty and some are nil(most are nil), it's not clear why.
|
||||
// For now, we will return nil for both cases.
|
||||
if len(witnesses) == 0 {
|
||||
// return wire.TxWitness{}, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
||||
133
pkg/parquetutils/buffer.go
Normal file
133
pkg/parquetutils/buffer.go
Normal file
@@ -0,0 +1,133 @@
|
||||
// nolint: wrapcheck
|
||||
package parquetutils
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/xitongsys/parquet-go/source"
|
||||
)
|
||||
|
||||
var (
|
||||
// Make sure Buffer implements the ParquetFile interface.
|
||||
_ source.ParquetFile = (*Buffer)(nil)
|
||||
|
||||
// Make sure Buffer implements the io.WriterAt interface.
|
||||
_ io.WriterAt = (*Buffer)(nil)
|
||||
)
|
||||
|
||||
// Buffer allows reading parquet messages from a memory buffer.
|
||||
type Buffer struct {
|
||||
buf []byte
|
||||
loc int
|
||||
m sync.Mutex
|
||||
}
|
||||
|
||||
// NewBuffer creates a new in memory parquet buffer.
|
||||
func NewBuffer() *Buffer {
|
||||
return &Buffer{buf: make([]byte, 0, 512)}
|
||||
}
|
||||
|
||||
// NewBufferFrom creates new in memory parquet buffer from the given bytes.
|
||||
// It uses the provided slice as its buffer.
|
||||
func NewBufferFrom(s []byte) *Buffer {
|
||||
return &Buffer{
|
||||
buf: s,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Buffer) Create(string) (source.ParquetFile, error) {
|
||||
return &Buffer{buf: make([]byte, 0, 512)}, nil
|
||||
}
|
||||
|
||||
func (b *Buffer) Open(string) (source.ParquetFile, error) {
|
||||
return NewBufferFrom(b.Bytes()), nil
|
||||
}
|
||||
|
||||
// Seek seeks in the underlying memory buffer.
|
||||
func (b *Buffer) Seek(offset int64, whence int) (int64, error) {
|
||||
newLoc := b.loc
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
newLoc = int(offset)
|
||||
case io.SeekCurrent:
|
||||
newLoc += int(offset)
|
||||
case io.SeekEnd:
|
||||
newLoc = len(b.buf) + int(offset)
|
||||
default:
|
||||
return int64(b.loc), errors.New("Seek: invalid whence")
|
||||
}
|
||||
|
||||
if newLoc < 0 {
|
||||
return int64(b.loc), errors.New("Seek: invalid offset")
|
||||
}
|
||||
|
||||
if newLoc > len(b.buf) {
|
||||
newLoc = len(b.buf)
|
||||
}
|
||||
|
||||
b.loc = newLoc
|
||||
return int64(b.loc), nil
|
||||
}
|
||||
|
||||
// Read reads data form BufferFile into p.
|
||||
func (b *Buffer) Read(p []byte) (n int, err error) {
|
||||
n = copy(p, b.buf[b.loc:len(b.buf)])
|
||||
b.loc += n
|
||||
|
||||
if b.loc == len(b.buf) {
|
||||
return n, io.EOF
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Write writes data from p into BufferFile.
|
||||
func (b *Buffer) Write(p []byte) (n int, err error) {
|
||||
n, err = b.WriteAt(p, int64(b.loc))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
b.loc += n
|
||||
return
|
||||
}
|
||||
|
||||
// WriteAt writes a slice of bytes to a buffer starting at the position provided
|
||||
// The number of bytes written will be returned, or error. Can overwrite previous
|
||||
// written slices if the write ats overlap.
|
||||
func (b *Buffer) WriteAt(p []byte, pos int64) (n int, err error) {
|
||||
b.m.Lock()
|
||||
defer b.m.Unlock()
|
||||
pLen := len(p)
|
||||
expLen := pos + int64(pLen)
|
||||
if int64(len(b.buf)) < expLen {
|
||||
if int64(cap(b.buf)) < expLen {
|
||||
newBuf := make([]byte, expLen)
|
||||
copy(newBuf, b.buf)
|
||||
b.buf = newBuf
|
||||
}
|
||||
b.buf = b.buf[:expLen]
|
||||
}
|
||||
copy(b.buf[pos:], p)
|
||||
return pLen, nil
|
||||
}
|
||||
|
||||
// Close is a no-op for a memory buffer.
|
||||
func (*Buffer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Bytes returns the underlying buffer bytes.
|
||||
func (b *Buffer) Bytes() []byte {
|
||||
return b.buf
|
||||
}
|
||||
|
||||
// Reset resets the buffer to be empty,
|
||||
// but it retains the underlying storage for use by future writes.
|
||||
func (b *Buffer) Reset() {
|
||||
b.m.Lock()
|
||||
defer b.m.Unlock()
|
||||
b.buf = b.buf[:0]
|
||||
b.loc = 0
|
||||
}
|
||||
26
pkg/parquetutils/parquetutils.go
Normal file
26
pkg/parquetutils/parquetutils.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package parquetutils
|
||||
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/xitongsys/parquet-go/reader"
|
||||
"github.com/xitongsys/parquet-go/source"
|
||||
)
|
||||
|
||||
// ReaderConcurrency parallel number of file readers.
|
||||
var ReaderConcurrency int64 = 8
|
||||
|
||||
// ReadAll reads all records from the parquet file.
|
||||
func ReadAll[T any](sourceFile source.ParquetFile) ([]T, error) {
|
||||
r, err := reader.NewParquetReader(sourceFile, new(T), ReaderConcurrency)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create parquet reader")
|
||||
}
|
||||
defer r.ReadStop()
|
||||
|
||||
data := make([]T, r.GetNumRows())
|
||||
if err = r.Read(&data); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to read parquet data")
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
Reference in New Issue
Block a user