Compare commits

...

17 Commits

Author SHA1 Message Date
Planxnx
4f2dd80546 feat: support s3 downloading configure 2024-05-30 22:50:21 +07:00
Planxnx
ad26ea0bff feat: increase download connection 2024-05-27 09:48:01 +07:00
Planxnx
23b88c7859 feat(parquet): create buffer for parquet file reader and s3 downloader 2024-05-25 06:25:41 +07:00
Planxnx
d8b8ae42fb feat(parquet): add read all rows function 2024-05-25 06:15:30 +07:00
Planxnx
c1362ae328 doc: remove unnecessary todo 2024-05-25 05:20:26 +07:00
Planxnx
6585da5907 perf: remove unused field to reduce ~1.5GB of memory usage 2024-05-25 05:20:26 +07:00
Planxnx
41cb5de9c0 doc: update note about aws public data 2024-05-25 01:56:05 +07:00
Planxnx
fc8bad75a5 doc: add note about coinbase txinput 2024-05-24 04:44:04 +07:00
Planxnx
7373944c85 doc: add note about nill and empty witness 2024-05-24 03:53:14 +07:00
Planxnx
89a2e58622 fix: invalid blocks to ordered 2024-05-24 03:04:46 +07:00
Planxnx
9ea9ebdb30 fix: filtering block heights and use utc date for key 2024-05-24 03:00:59 +07:00
Planxnx
5e46a87201 wip: no return error when construct aws datasource 2024-05-23 16:58:23 +07:00
Planxnx
e98c3def55 wip: parse aws transaction to type transaction 2024-05-23 02:37:25 +07:00
Planxnx
a3f902f5d5 feat: map parquet data to internal types 2024-05-21 16:29:56 +07:00
Planxnx
e8b4f5a2de wip: implement aws public data datasource 2024-05-21 16:01:52 +07:00
Planxnx
611717706b feat(btc): support get current block height 2024-05-21 15:40:55 +07:00
Planxnx
83b38bc67b wip: create aws public data datasource 2024-05-21 02:48:10 +07:00
10 changed files with 1957 additions and 7 deletions

View File

@@ -0,0 +1,688 @@
// AWS Public Blockchain Datasource
// - https://registry.opendata.aws/aws-public-blockchain
// - https://github.com/aws-solutions-library-samples/guidance-for-digital-assets-on-aws
//
// To setup your own data source, see: https://github.com/aws-solutions-library-samples/guidance-for-digital-assets-on-aws/blob/main/analytics/producer/README.md
package datasources
import (
"cmp"
"context"
"encoding/hex"
"fmt"
"io"
"log/slog"
"math"
"slices"
"strconv"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/subscription"
"github.com/gaze-network/indexer-network/pkg/btcutils"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/parquetutils"
"github.com/samber/lo"
parquettypes "github.com/xitongsys/parquet-go/types"
"golang.org/x/sync/errgroup"
)
const (
awsPublicDataS3Region = "us-east-2"
awsPublicDataS3Bucket = "aws-public-blockchain"
defaultAWSPublicDataDownloadConcurrency = 8
)
var firstBitcoinTimestamp = time.Date(2009, time.January, 3, 18, 15, 5, 0, time.UTC)
// Make sure to implement the BitcoinDatasource interface
var _ Datasource[*types.Block] = (*AWSPublicDataDatasource)(nil)
type AWSPublicDataDatasourceConfig struct {
// The number of goroutines to spin up in parallel when downloading parts.
// Concurrency of 1 will download the parts sequentially.
// Default is 8.
//
// CAUTION: High concurrency with low part size can reduce the time to download file,
// but it can also increase the memory usage.
DownloadConcurrency int
// The size (in bytes) to request from S3 for each part.
// Default is depend on the concurrency and file size (part size = file size / concurrency).
//
// CAUTION: High concurrency with low part size can reduce the time to download file,
// but it can also increase the memory usage.
DownloadPartSize int64 `mapstructure:"download_part_size"`
}
type AWSPublicDataDatasource struct {
btcDatasource Datasource[*types.Block]
s3Client *s3.Client
s3Bucket string
config AWSPublicDataDatasourceConfig
}
func NewAWSPublicData(btcDatasource Datasource[*types.Block], conf AWSPublicDataDatasourceConfig) *AWSPublicDataDatasource {
sdkConfig, err := config.LoadDefaultConfig(context.Background())
if err != nil {
logger.Panic("Can't load AWS SDK user config", slogx.Error(err), slog.String("package", "datasources"))
}
// TODO: support user defined config (self-hosted s3 bucket)
s3client := s3.NewFromConfig(sdkConfig, func(o *s3.Options) {
o.Region = awsPublicDataS3Region
o.Credentials = aws.AnonymousCredentials{}
})
if conf.DownloadConcurrency <= 0 {
conf.DownloadConcurrency = defaultAWSPublicDataDownloadConcurrency
}
return &AWSPublicDataDatasource{
btcDatasource: btcDatasource,
s3Client: s3client,
s3Bucket: awsPublicDataS3Bucket,
config: conf,
}
}
func (d AWSPublicDataDatasource) Name() string {
return fmt.Sprintf("aws_public_data/%s", d.btcDatasource.Name())
}
func (d *AWSPublicDataDatasource) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
ch := make(chan []*types.Block)
subscription, err := d.FetchAsync(ctx, from, to, ch)
if err != nil {
return nil, errors.WithStack(err)
}
defer subscription.Unsubscribe()
blocks := make([]*types.Block, 0)
for {
select {
case b, ok := <-ch:
if !ok {
return blocks, nil
}
blocks = append(blocks, b...)
case <-subscription.Done():
if err := ctx.Err(); err != nil {
return nil, errors.Wrap(err, "context done")
}
return blocks, nil
case err := <-subscription.Err():
if err != nil {
return nil, errors.Wrap(err, "got error while fetch async")
}
return blocks, nil
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "context done")
}
}
}
func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
ctx = logger.WithContext(ctx,
slogx.String("package", "datasources"),
slogx.String("datasource", d.Name()),
)
start, end, skip, err := d.prepareRange(ctx, from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
}
subscription := subscription.NewSubscription(ch)
if skip {
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
return subscription.Client(), nil
}
startFiles, err := d.listBlocksFilesByDate(ctx, start.Timestamp)
if err != nil {
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
return nil, errors.Wrap(err, "failed to list files by date")
}
// supported only merged blocks files
startFiles = lo.Filter(startFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
// use other datasource instead of s3 if there's no supported data
if len(startFiles) == 0 {
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
s, err := d.btcDatasource.FetchAsync(ctx, start.Height, end.Height, ch)
return s, errors.WithStack(err)
}
go func() {
defer func() {
// add a bit delay to prevent shutdown before client receive all blocks
time.Sleep(100 * time.Millisecond)
subscription.Unsubscribe()
}()
// loop through each day until reach the end of supported data or within end block date
for ts := start.Timestamp; ts.Before(end.Timestamp.Round(24*time.Hour)) && ts.Before(time.Now()); ts = ts.Add(24 * time.Hour) {
ctx := logger.WithContext(ctx,
slogx.Time("date", ts),
slogx.Int64("date_unix", ts.Unix()),
)
logger.DebugContext(ctx, "Fetching data from AWS S3", slogx.Int64("start", start.Height), slogx.Int64("end", end.Height))
allBlocksFiles, err := d.listBlocksFilesByDate(ctx, ts)
if err != nil {
logger.ErrorContext(ctx, "Failed to list blocks files by date from aws s3", slogx.Error(err))
if err := subscription.SendError(ctx, errors.WithStack(err)); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
allTxsFiles, err := d.listTxsFilesByDate(ctx, ts)
if err != nil {
logger.ErrorContext(ctx, "Failed to list txs files by date from aws s3", slogx.Error(err))
if err := subscription.SendError(ctx, errors.WithStack(err)); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
blocksFiles := lo.Filter(allBlocksFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
txsFiles := lo.Filter(allTxsFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
logger.DebugContext(ctx, "Found files in AWS S3 bucket",
slogx.Int("files_blocks", len(allBlocksFiles)),
slogx.Int("files_blocks_merged", len(blocksFiles)),
slogx.Int("files_txs_all", len(allTxsFiles)),
slogx.Int("files_txs_merged", len(txsFiles)),
)
// Reach the end of supported data,
// stop fetching data from AWS S3
if len(blocksFiles) == 0 || len(txsFiles) == 0 {
logger.DebugContext(ctx, "No blocks files found, stop fetching data from AWS S3")
return
}
// prevent unexpected error
{
if len(blocksFiles) != 1 {
logger.ErrorContext(ctx, "Unexpected blocks files count, should be 1", slogx.Int("count", len(blocksFiles)))
if err := subscription.SendError(ctx, errors.Wrap(errs.InternalError, "unexpected blocks files count")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
if len(txsFiles) != 1 {
logger.ErrorContext(ctx, "Unexpected txs files count, should be 1", slogx.Int("count", len(txsFiles)))
if err := subscription.SendError(ctx, errors.Wrap(errs.InternalError, "unexpected txs files count")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
}
// TODO: use concurrent stream (max 2 goroutine) to download files then sequentially read parquet files
// to improve performance while not consuming too much memory (increase around 500 MB per goroutine)
var (
// TODO: create []byte pool to reduce alloc ops (reduce GC pressure)
// TODO: use FileSystem for default buffer (can choose memory or disk buffer)
blocksBuffer = parquetutils.NewBuffer()
txsBuffer = parquetutils.NewBuffer()
)
startDownload := time.Now()
if err := d.downloadFile(ctx, blocksFiles[0], blocksBuffer); err != nil {
logger.ErrorContext(ctx, "Failed to download blocks file from AWS S3", slogx.Int("count", len(txsFiles)))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't download blocks file")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
if err := d.downloadFile(ctx, txsFiles[0], txsBuffer); err != nil {
logger.ErrorContext(ctx, "Failed to download blocks file from AWS S3", slogx.Int("count", len(txsFiles)))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't download blocks file")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
logger.DebugContext(ctx, "Downloaded files from AWS S3",
slogx.Duration("duration", time.Since(startDownload)),
slogx.Int("sizes_blocks", len(blocksBuffer.Bytes())),
slogx.Int("sizes_txs", len(txsBuffer.Bytes())),
)
// Read parquet files
startRead := time.Now()
// we can read all blocks data at once because it's small
rawAllBlocks, err := parquetutils.ReadAll[awsBlock](blocksBuffer)
if err != nil {
logger.ErrorContext(ctx, "Failed to read parquet blocks data", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't read parquet blocks data")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
// NOTE: We shouldn't read all txs data at once because it's very huge (up to ~1.5GB memory usage)
// we should read it by chunk and send it to subscription client to reduce memory usage.
// But AWS Public Dataset are not sorted by block number and index,
// so we can't avoid reading all transactions data by skip unnecessary transactions
// or chunk data by block number to reduce memory usage :(
rawAllTxs, err := parquetutils.ReadAll[awsTransaction](blocksBuffer)
if err != nil {
logger.ErrorContext(ctx, "Failed to read parquet txs data", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't read parquet blocks data")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
groupRawTxs := lo.GroupBy(rawAllTxs, func(tx awsTransaction) int64 {
return tx.BlockNumber
})
// filter blocks data by height range
rawFilteredBlocks := lo.Filter(rawAllBlocks, func(block awsBlock, _ int) bool {
return block.Number >= start.Height && block.Number <= end.Height
})
slices.SortFunc(rawFilteredBlocks, func(i, j awsBlock) int {
return cmp.Compare(i.Number, j.Number)
})
logger.DebugContext(ctx, "Read parquet files",
slogx.Duration("duration", time.Since(startRead)),
slogx.Int("total_blocks", len(rawAllBlocks)),
slogx.Int("filtered_blocks", len(rawFilteredBlocks)),
slogx.Int("total_txs", len(rawAllTxs)),
slogx.Int("total_txs_grouped", len(groupRawTxs)),
)
blocks := make([]*types.Block, 0, len(rawFilteredBlocks))
for _, rawBlock := range rawFilteredBlocks {
blockHeader, err := rawBlock.ToBlockHeader()
if err != nil {
logger.ErrorContext(ctx, "Failed to convert aws block to type block header", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't convert aws block to type block header")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
txs := make([]*types.Transaction, 0, len(groupRawTxs[blockHeader.Height]))
for _, rawTx := range groupRawTxs[rawBlock.Number] {
tx, err := rawTx.ToTransaction(rawBlock)
if err != nil {
logger.ErrorContext(ctx, "Failed to convert aws transaction to type transaction", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't convert aws transaction to type transaction")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
txs = append(txs, tx)
}
slices.SortFunc(txs, func(i, j *types.Transaction) int {
return cmp.Compare(i.Index, j.Index)
})
blocks = append(blocks, &types.Block{
Header: blockHeader,
Transactions: txs,
})
}
logger.DebugContext(ctx, "Send blocks to subscription client", slogx.Int("count", len(blocks)))
if err := subscription.Send(ctx, blocks); err != nil {
if errors.Is(err, errs.Closed) {
logger.DebugContext(ctx, "Subscription client closed, can't send", slogx.Error(err))
return
}
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
slogx.Int64("start", blocks[0].Header.Height),
slogx.Int64("end", blocks[len(blocks)-1].Header.Height),
slogx.Error(err),
)
}
}
}()
return subscription.Client(), nil
}
func (d *AWSPublicDataDatasource) GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error) {
header, err := d.btcDatasource.GetBlockHeader(ctx, height)
return header, errors.WithStack(err)
}
func (d *AWSPublicDataDatasource) GetCurrentBlockHeight(ctx context.Context) (int64, error) {
height, err := d.btcDatasource.GetCurrentBlockHeight(ctx)
return height, errors.WithStack(err)
}
func (d *AWSPublicDataDatasource) prepareRange(ctx context.Context, fromHeight, toHeight int64) (startHeader, endHeader types.BlockHeader, skip bool, err error) {
start := fromHeight
end := toHeight
// get current bitcoin block height
latestBlockHeight, err := d.btcDatasource.GetCurrentBlockHeight(ctx)
if err != nil {
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrap(err, "failed to get block count")
}
// set start to genesis block height
if start < 0 {
start = 0
}
// set end to current bitcoin block height if
// - end is -1
// - end is greater that current bitcoin block height
if end < 0 || end > latestBlockHeight {
end = latestBlockHeight
}
// if start is greater than end, skip this round
if start > end {
return types.BlockHeader{}, types.BlockHeader{}, true, nil
}
if err != nil {
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrapf(err, "block %v", end)
}
group, groupctx := errgroup.WithContext(ctx)
group.Go(func() error {
startHeader, err = d.GetBlockHeader(groupctx, start)
return errors.Wrapf(err, "block %v", start)
})
group.Go(func() error {
endHeader, err = d.GetBlockHeader(ctx, end)
return errors.Wrapf(err, "block %v", end)
})
if err := group.Wait(); err != nil {
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrap(err, "failed to get block header")
}
return startHeader, endHeader, false, nil
}
func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string) ([]awsFile, error) {
result, err := d.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.s3Bucket),
Prefix: aws.String(prefix),
})
if err != nil {
return nil, errors.Wrapf(err, "can't list s3 bucket objects for bucket %q and prefix %q", d.s3Bucket, prefix)
}
// filter empty keys
objs := lo.Filter(result.Contents, func(item s3types.Object, _ int) bool { return item.Key != nil })
return lo.Map(objs, func(item s3types.Object, _ int) awsFile {
return awsFile{
Key: *item.Key,
Size: *item.Size,
LastModified: *item.LastModified,
}
}), nil
}
func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
if date.Before(firstBitcoinTimestamp) {
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
}
prefix := "v1.0/btc/blocks/date=" + date.UTC().Format(time.DateOnly)
files, err := d.listFiles(ctx, prefix)
if err != nil {
return nil, errors.Wrap(err, "failed to list blocks files by date")
}
return files, nil
}
func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
if date.Before(firstBitcoinTimestamp) {
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
}
prefix := "v1.0/btc/transactions/date=" + date.UTC().Format(time.DateOnly)
files, err := d.listFiles(ctx, prefix)
if err != nil {
return nil, errors.Wrap(err, "failed to list txs files by date")
}
return files, nil
}
func (d *AWSPublicDataDatasource) downloadFile(ctx context.Context, f awsFile, w io.WriterAt) error {
downloader := manager.NewDownloader(d.s3Client, func(md *manager.Downloader) {
md.Concurrency = d.config.DownloadConcurrency
md.PartSize = d.config.DownloadPartSize
if md.PartSize <= 0 {
md.PartSize = f.Size / int64(md.Concurrency)
}
})
numBytes, err := downloader.Download(ctx, w, &s3.GetObjectInput{
Bucket: aws.String(d.s3Bucket),
Key: aws.String(f.Key),
})
if err != nil {
return errors.Wrapf(err, "failed to download file for bucket %q and key %q", d.s3Bucket, f.Key)
}
if numBytes < 1 {
return errors.Wrap(errs.NotFound, "got empty file")
}
return nil
}
// TODO: remove unused fields to reduce memory usage
type (
awsBlock struct {
Hash string `parquet:"name=hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Bits string `parquet:"name=bits, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"` // Hex string format
PreviousBlockHash string `parquet:"name=previousblockhash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
MerkleRoot string `parquet:"name=merkle_root, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
CoinbaseParam string `parquet:"name=coinbase_param, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Timestamp string `parquet:"name=timestamp, type=INT96, repetitiontype=OPTIONAL"`
Number int64 `parquet:"name=number, type=INT64, repetitiontype=OPTIONAL"`
Version int64 `parquet:"name=version, type=INT64, repetitiontype=OPTIONAL"`
Nonce int64 `parquet:"name=nonce, type=INT64, repetitiontype=OPTIONAL"`
// MedianTime string `parquet:"name=mediantime, type=INT96, repetitiontype=OPTIONAL"`
// Difficulty float64 `parquet:"name=difficulty, type=DOUBLE, repetitiontype=OPTIONAL"`
// Chainwork string `parquet:"name=chainwork, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Size int64 `parquet:"name=size, type=INT64, repetitiontype=OPTIONAL"`
// Weight int64 `parquet:"name=weight, type=INT64, repetitiontype=OPTIONAL"`
// TransactionCount int64 `parquet:"name=transaction_count, type=INT64, repetitiontype=OPTIONAL"`
// StrippedSize int64 `parquet:"name=stripped_size, type=INT64, repetitiontype=OPTIONAL"`
// Date string `parquet:"name=date, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// LastModified string `parquet:"name=last_modified, type=INT96, repetitiontype=OPTIONAL"`
}
awsTransaction struct {
Hash string `parquet:"name=hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
BlockHash string `parquet:"name=block_hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Outputs []*awsTxOutput `parquet:"name=outputs, type=LIST, repetitiontype=OPTIONAL, valuetype=STRUCT"`
Inputs []*awsTxInput `parquet:"name=inputs, type=LIST, repetitiontype=OPTIONAL, valuetype=STRUCT"`
Version int64 `parquet:"name=version, type=INT64, repetitiontype=OPTIONAL"`
Size int64 `parquet:"name=size, type=INT64, repetitiontype=OPTIONAL"`
BlockNumber int64 `parquet:"name=block_number, type=INT64, repetitiontype=OPTIONAL"`
Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
LockTime int64 `parquet:"name=lock_time, type=INT64, repetitiontype=OPTIONAL"`
IsCoinbase bool `parquet:"name=is_coinbase, type=BOOLEAN, repetitiontype=OPTIONAL"`
// VirtualSize int64 `parquet:"name=virtual_size, type=INT64, repetitiontype=OPTIONAL"`
// InputCount int64 `parquet:"name=input_count, type=INT64, repetitiontype=OPTIONAL"`
// OutputCount int64 `parquet:"name=output_count, type=INT64, repetitiontype=OPTIONAL"`
// OutputValue float64 `parquet:"name=output_value, type=DOUBLE, repetitiontype=OPTIONAL"`
// BlockTimestamp string `parquet:"name=block_timestamp, type=INT96, repetitiontype=OPTIONAL"`
// Date string `parquet:"name=date, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// LastModified string `parquet:"name=last_modified, type=INT96, repetitiontype=OPTIONAL"`
// Fee float64 `parquet:"name=fee, type=DOUBLE, repetitiontype=OPTIONAL"`
// InputValue float64 `parquet:"name=input_value, type=DOUBLE, repetitiontype=OPTIONAL"`
}
awsTxInput struct {
ScriptHex string `parquet:"name=script_hex, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
SpentTransactionHash string `parquet:"name=spent_transaction_hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
TxInWitness []*string `parquet:"name=txinwitness, type=LIST, repetitiontype=OPTIONAL, valuetype=BYTE_ARRAY, convertedtype=UTF8"`
SpentOutputIndex int64 `parquet:"name=spent_output_index, type=INT64, repetitiontype=OPTIONAL"`
Sequence int64 `parquet:"name=sequence, type=INT64, repetitiontype=OPTIONAL"`
// Address string `parquet:"name=address, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
// RequiredSignatures int64 `parquet:"name=required_signatures, type=INT64, repetitiontype=OPTIONAL"`
// ScriptAsm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Value float64 `parquet:"name=value, type=DOUBLE, repetitiontype=OPTIONAL"`
}
awsTxOutput struct {
Script_hex string `parquet:"name=script_hex, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Value float64 `parquet:"name=value, type=DOUBLE, repetitiontype=OPTIONAL"`
// Address string `parquet:"name=address, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
// Required_signatures int64 `parquet:"name=required_signatures, type=INT64, repetitiontype=OPTIONAL"`
// Script_asm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
}
awsFile struct {
Key string
Size int64
LastModified time.Time
}
)
func (a awsBlock) ToBlockHeader() (types.BlockHeader, error) {
hash, err := chainhash.NewHashFromStr(a.Hash)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert block hash")
}
prevBlockHash, err := chainhash.NewHashFromStr(a.PreviousBlockHash)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert previous block hash")
}
merkleRoot, err := chainhash.NewHashFromStr(a.MerkleRoot)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert merkle root")
}
bits, err := strconv.ParseUint(a.Bits, 16, 32)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert bits from hex str to uint32")
}
return types.BlockHeader{
Hash: *hash,
Height: a.Number,
Version: int32(a.Version),
PrevBlock: *prevBlockHash,
MerkleRoot: *merkleRoot,
Timestamp: parquettypes.INT96ToTime(a.Timestamp),
Bits: uint32(bits),
Nonce: uint32(a.Nonce),
}, nil
}
func (a awsTransaction) ToTransaction(block awsBlock) (*types.Transaction, error) {
blockhash, err := chainhash.NewHashFromStr(block.Hash)
if err != nil {
return nil, errors.Wrap(err, "can't convert block hash")
}
msgtx, err := a.MsgTx(block)
if err != nil {
return nil, errors.Wrap(err, "can't convert aws tx to wire.msgtx")
}
return types.ParseMsgTx(msgtx, a.BlockNumber, *blockhash, uint32(a.Index)), nil
}
func (a awsTransaction) MsgTx(block awsBlock) (*wire.MsgTx, error) {
txIn := make([]*wire.TxIn, 0, len(a.Inputs))
txOut := make([]*wire.TxOut, 0, len(a.Outputs))
// NOTE: coinbase tx from AWS S3 has no inputs, so we need to add it manually,
// but we can't guarantee this data is correct especially the sequence number.
if a.IsCoinbase && len(a.Inputs) == 0 {
scriptsig, err := hex.DecodeString(block.CoinbaseParam)
if err != nil {
return nil, errors.Wrap(err, "can't decode script hex")
}
txIn = append(txIn, &wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Hash: common.ZeroHash,
Index: math.MaxUint32,
},
SignatureScript: scriptsig,
Witness: btcutils.CoinbaseWitness,
Sequence: math.MaxUint32, // most coinbase tx are using max sequence number
})
}
for _, in := range a.Inputs {
scriptsig, err := hex.DecodeString(in.ScriptHex)
if err != nil {
return nil, errors.Wrap(err, "can't decode script hex")
}
witness, err := btcutils.WitnessFromHex(lo.Map(in.TxInWitness, func(src *string, _ int) string {
if src == nil {
return ""
}
return *src
}))
if err != nil {
return nil, errors.Wrap(err, "can't convert witness")
}
prevOutHash, err := chainhash.NewHashFromStr(in.SpentTransactionHash)
if err != nil {
return nil, errors.Wrap(err, "can't convert prevout hash")
}
txIn = append(txIn, &wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Hash: *prevOutHash,
Index: uint32(in.SpentOutputIndex),
},
SignatureScript: scriptsig,
Witness: witness,
Sequence: uint32(in.Sequence),
})
}
for _, out := range a.Outputs {
scriptpubkey, err := hex.DecodeString(out.Script_hex)
if err != nil {
return nil, errors.Wrap(err, "can't decode script hex")
}
txOut = append(txOut, &wire.TxOut{
Value: btcutils.BitcoinToSatoshi(out.Value),
PkScript: scriptpubkey,
})
}
return &wire.MsgTx{
Version: int32(a.Version),
TxIn: txIn,
TxOut: txOut,
LockTime: uint32(a.LockTime),
}, nil
}

View File

@@ -88,7 +88,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
slogx.String("datasource", d.Name()),
)
from, to, skip, err := d.prepareRange(from, to)
from, to, skip, err := d.prepareRange(ctx, from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
}
@@ -212,12 +212,12 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
return subscription.Client(), nil
}
func (d *BitcoinNodeDatasource) prepareRange(fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
func (d *BitcoinNodeDatasource) prepareRange(ctx context.Context, fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
start = fromHeight
end = toHeight
// get current bitcoin block height
latestBlockHeight, err := d.btcclient.GetBlockCount()
latestBlockHeight, err := d.GetCurrentBlockHeight(ctx)
if err != nil {
return -1, -1, false, errors.Wrap(err, "failed to get block count")
}
@@ -227,7 +227,7 @@ func (d *BitcoinNodeDatasource) prepareRange(fromHeight, toHeight int64) (start,
start = 0
}
// set end to current bitcoin block height if
// set end to current bitcoin block height if d
// - end is -1
// - end is greater that current bitcoin block height
if end < 0 || end > latestBlockHeight {
@@ -292,3 +292,12 @@ func (d *BitcoinNodeDatasource) GetBlockHeader(ctx context.Context, height int64
return types.ParseMsgBlockHeader(*block, height), nil
}
// GetCurrentBlockHeight fetch current block height from Bitcoin node
func (d *BitcoinNodeDatasource) GetCurrentBlockHeight(ctx context.Context) (int64, error) {
height, err := d.btcclient.GetBlockCount()
if err != nil {
return -1, errors.Wrap(err, "failed to get block height")
}
return height, nil
}

View File

@@ -13,4 +13,5 @@ type Datasource[T any] interface {
Fetch(ctx context.Context, from, to int64) ([]T, error)
FetchAsync(ctx context.Context, from, to int64, ch chan<- []T) (*subscription.ClientSubscription[[]T], error)
GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error)
GetCurrentBlockHeight(ctx context.Context) (int64, error)
}

29
go.mod
View File

@@ -4,6 +4,10 @@ go 1.22
require (
github.com/Cleverse/go-utilities/utils v0.0.0-20240119201306-d71eb577ef11
github.com/aws/aws-sdk-go-v2 v1.23.0
github.com/aws/aws-sdk-go-v2/config v1.25.3
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.14.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.43.0
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/btcutil v1.1.5
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
@@ -16,18 +20,37 @@ require (
github.com/planxnx/concurrent-stream v0.1.5
github.com/samber/do/v2 v2.0.0-beta.7
github.com/samber/lo v1.39.0
github.com/shopspring/decimal v1.3.1
github.com/shopspring/decimal v1.4.0
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.8.4
github.com/valyala/fasthttp v1.51.0
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20240122235623-d6294584ab18
go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.5.0
)
require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.20.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.16.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.17.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.25.3 // indirect
github.com/aws/smithy-go v1.17.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
@@ -40,6 +63,7 @@ require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
@@ -49,6 +73,7 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx v3.6.2+incompatible // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
@@ -59,6 +84,7 @@ require (
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.16 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
@@ -78,6 +104,7 @@ require (
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

1003
go.sum

File diff suppressed because it is too large Load Diff

21
pkg/btcutils/satoshi.go Normal file
View File

@@ -0,0 +1,21 @@
package btcutils
import "github.com/shopspring/decimal"
const (
BitcoinDecimals = 8
)
// satsUnit is 10^8
var satsUnit = decimal.New(1, BitcoinDecimals)
// BitcoinToSatoshi converts a amount in Bitcoin format to Satoshi format.
func BitcoinToSatoshi(v float64) int64 {
amount := decimal.NewFromFloat(v)
return amount.Mul(satsUnit).IntPart()
}
// SatoshiToBitcoin converts a amount in Satoshi format to Bitcoin format.
func SatoshiToBitcoin(v int64) float64 {
return decimal.New(v, -BitcoinDecimals).InexactFloat64()
}

View File

@@ -0,0 +1,39 @@
package btcutils
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSatoshiConversion(t *testing.T) {
testcases := []struct {
sats float64
btc int64
}{
{2.29980951, 229980951},
{1.29609085, 129609085},
{1.2768897, 127688970},
{0.62518296, 62518296},
{0.29998462, 29998462},
{0.1251, 12510000},
{0.02016011, 2016011},
{0.0198473, 1984730},
{0.0051711, 517110},
{0.0012, 120000},
{7e-05, 7000},
{3.835e-05, 3835},
{1.962e-05, 1962},
}
for _, testcase := range testcases {
t.Run(fmt.Sprintf("BtcToSats/%v", testcase.sats), func(t *testing.T) {
require.NotEqual(t, testcase.btc, int64(testcase.sats*1e8), "Testcase value should have precision error")
assert.Equal(t, testcase.btc, BitcoinToSatoshi(testcase.sats))
})
t.Run(fmt.Sprintf("SatsToBtc/%v", testcase.sats), func(t *testing.T) {
assert.Equal(t, testcase.sats, SatoshiToBitcoin(testcase.btc))
})
}
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/hex"
"strings"
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
)
@@ -12,6 +13,9 @@ const (
witnessSeparator = " "
)
// CoinbaseWitness is the witness data for a coinbase transaction.
var CoinbaseWitness = utils.Must(WitnessFromHex([]string{"0000000000000000000000000000000000000000000000000000000000000000"}))
// WitnessToHex formats the passed witness stack as a slice of hex-encoded strings.
func WitnessToHex(witness wire.TxWitness) []string {
if len(witness) == 0 {
@@ -37,7 +41,10 @@ func WitnessToString(witness wire.TxWitness) string {
// WitnessFromHex parses the passed slice of hex-encoded strings into a witness stack.
func WitnessFromHex(witnesses []string) (wire.TxWitness, error) {
// NOTE: some witness from bitcoin node are empty and some are nil(most are nil), it's not clear why.
// For now, we will return nil for both cases.
if len(witnesses) == 0 {
// return wire.TxWitness{}, nil
return nil, nil
}

133
pkg/parquetutils/buffer.go Normal file
View File

@@ -0,0 +1,133 @@
// nolint: wrapcheck
package parquetutils
import (
"errors"
"io"
"sync"
"github.com/xitongsys/parquet-go/source"
)
var (
// Make sure Buffer implements the ParquetFile interface.
_ source.ParquetFile = (*Buffer)(nil)
// Make sure Buffer implements the io.WriterAt interface.
_ io.WriterAt = (*Buffer)(nil)
)
// Buffer allows reading parquet messages from a memory buffer.
type Buffer struct {
buf []byte
loc int
m sync.Mutex
}
// NewBuffer creates a new in memory parquet buffer.
func NewBuffer() *Buffer {
return &Buffer{buf: make([]byte, 0, 512)}
}
// NewBufferFrom creates new in memory parquet buffer from the given bytes.
// It uses the provided slice as its buffer.
func NewBufferFrom(s []byte) *Buffer {
return &Buffer{
buf: s,
}
}
func (b *Buffer) Create(string) (source.ParquetFile, error) {
return &Buffer{buf: make([]byte, 0, 512)}, nil
}
func (b *Buffer) Open(string) (source.ParquetFile, error) {
return NewBufferFrom(b.Bytes()), nil
}
// Seek seeks in the underlying memory buffer.
func (b *Buffer) Seek(offset int64, whence int) (int64, error) {
newLoc := b.loc
switch whence {
case io.SeekStart:
newLoc = int(offset)
case io.SeekCurrent:
newLoc += int(offset)
case io.SeekEnd:
newLoc = len(b.buf) + int(offset)
default:
return int64(b.loc), errors.New("Seek: invalid whence")
}
if newLoc < 0 {
return int64(b.loc), errors.New("Seek: invalid offset")
}
if newLoc > len(b.buf) {
newLoc = len(b.buf)
}
b.loc = newLoc
return int64(b.loc), nil
}
// Read reads data form BufferFile into p.
func (b *Buffer) Read(p []byte) (n int, err error) {
n = copy(p, b.buf[b.loc:len(b.buf)])
b.loc += n
if b.loc == len(b.buf) {
return n, io.EOF
}
return n, nil
}
// Write writes data from p into BufferFile.
func (b *Buffer) Write(p []byte) (n int, err error) {
n, err = b.WriteAt(p, int64(b.loc))
if err != nil {
return 0, err
}
b.loc += n
return
}
// WriteAt writes a slice of bytes to a buffer starting at the position provided
// The number of bytes written will be returned, or error. Can overwrite previous
// written slices if the write ats overlap.
func (b *Buffer) WriteAt(p []byte, pos int64) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
pLen := len(p)
expLen := pos + int64(pLen)
if int64(len(b.buf)) < expLen {
if int64(cap(b.buf)) < expLen {
newBuf := make([]byte, expLen)
copy(newBuf, b.buf)
b.buf = newBuf
}
b.buf = b.buf[:expLen]
}
copy(b.buf[pos:], p)
return pLen, nil
}
// Close is a no-op for a memory buffer.
func (*Buffer) Close() error {
return nil
}
// Bytes returns the underlying buffer bytes.
func (b *Buffer) Bytes() []byte {
return b.buf
}
// Reset resets the buffer to be empty,
// but it retains the underlying storage for use by future writes.
func (b *Buffer) Reset() {
b.m.Lock()
defer b.m.Unlock()
b.buf = b.buf[:0]
b.loc = 0
}

View File

@@ -0,0 +1,26 @@
package parquetutils
import (
"github.com/pkg/errors"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/source"
)
// ReaderConcurrency parallel number of file readers.
var ReaderConcurrency int64 = 8
// ReadAll reads all records from the parquet file.
func ReadAll[T any](sourceFile source.ParquetFile) ([]T, error) {
r, err := reader.NewParquetReader(sourceFile, new(T), ReaderConcurrency)
if err != nil {
return nil, errors.Wrap(err, "can't create parquet reader")
}
defer r.ReadStop()
data := make([]T, r.GetNumRows())
if err = r.Read(&data); err != nil {
return nil, errors.Wrap(err, "failed to read parquet data")
}
return data, nil
}