feat: support s3 downloading configure

This commit is contained in:
Planxnx
2024-05-30 22:50:21 +07:00
parent ad26ea0bff
commit 4f2dd80546

View File

@@ -43,7 +43,7 @@ const (
awsPublicDataS3Region = "us-east-2"
awsPublicDataS3Bucket = "aws-public-blockchain"
parquetReaderConcurrency = 8
defaultAWSPublicDataDownloadConcurrency = 8
)
var firstBitcoinTimestamp = time.Date(2009, time.January, 3, 18, 15, 5, 0, time.UTC)
@@ -51,13 +51,31 @@ var firstBitcoinTimestamp = time.Date(2009, time.January, 3, 18, 15, 5, 0, time.
// Make sure to implement the BitcoinDatasource interface
var _ Datasource[*types.Block] = (*AWSPublicDataDatasource)(nil)
type AWSPublicDataDatasourceConfig struct {
// The number of goroutines to spin up in parallel when downloading parts.
// Concurrency of 1 will download the parts sequentially.
// Default is 8.
//
// CAUTION: High concurrency with low part size can reduce the time to download file,
// but it can also increase the memory usage.
DownloadConcurrency int
// The size (in bytes) to request from S3 for each part.
// Default is depend on the concurrency and file size (part size = file size / concurrency).
//
// CAUTION: High concurrency with low part size can reduce the time to download file,
// but it can also increase the memory usage.
DownloadPartSize int64 `mapstructure:"download_part_size"`
}
type AWSPublicDataDatasource struct {
btcDatasource Datasource[*types.Block]
s3Client *s3.Client
s3Bucket string
config AWSPublicDataDatasourceConfig
}
func NewAWSPublicData(btcDatasource Datasource[*types.Block]) *AWSPublicDataDatasource {
func NewAWSPublicData(btcDatasource Datasource[*types.Block], conf AWSPublicDataDatasourceConfig) *AWSPublicDataDatasource {
sdkConfig, err := config.LoadDefaultConfig(context.Background())
if err != nil {
logger.Panic("Can't load AWS SDK user config", slogx.Error(err), slog.String("package", "datasources"))
@@ -69,10 +87,15 @@ func NewAWSPublicData(btcDatasource Datasource[*types.Block]) *AWSPublicDataData
o.Credentials = aws.AnonymousCredentials{}
})
if conf.DownloadConcurrency <= 0 {
conf.DownloadConcurrency = defaultAWSPublicDataDownloadConcurrency
}
return &AWSPublicDataDatasource{
btcDatasource: btcDatasource,
s3Client: s3client,
s3Bucket: awsPublicDataS3Bucket,
config: conf,
}
}
@@ -140,8 +163,8 @@ func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64
}
// supported only merged blocks files
startFiles = lo.Filter(startFiles, func(key string, _ int) bool {
return strings.Contains(key, "part-")
startFiles = lo.Filter(startFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
// use other datasource instead of s3 if there's no supported data
@@ -187,11 +210,11 @@ func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64
return
}
blocksFiles := lo.Filter(allBlocksFiles, func(key string, _ int) bool {
return strings.Contains(key, "part-")
blocksFiles := lo.Filter(allBlocksFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
txsFiles := lo.Filter(allTxsFiles, func(key string, _ int) bool {
return strings.Contains(key, "part-")
txsFiles := lo.Filter(allTxsFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
logger.DebugContext(ctx, "Found files in AWS S3 bucket",
@@ -230,6 +253,7 @@ func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64
// to improve performance while not consuming too much memory (increase around 500 MB per goroutine)
var (
// TODO: create []byte pool to reduce alloc ops (reduce GC pressure)
// TODO: use FileSystem for default buffer (can choose memory or disk buffer)
blocksBuffer = parquetutils.NewBuffer()
txsBuffer = parquetutils.NewBuffer()
)
@@ -405,7 +429,7 @@ func (d *AWSPublicDataDatasource) prepareRange(ctx context.Context, fromHeight,
return startHeader, endHeader, false, nil
}
func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string) ([]string, error) {
func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string) ([]awsFile, error) {
result, err := d.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.s3Bucket),
Prefix: aws.String(prefix),
@@ -416,13 +440,16 @@ func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string)
// filter empty keys
objs := lo.Filter(result.Contents, func(item s3types.Object, _ int) bool { return item.Key != nil })
return lo.Map(objs, func(item s3types.Object, _ int) string {
return *item.Key
return lo.Map(objs, func(item s3types.Object, _ int) awsFile {
return awsFile{
Key: *item.Key,
Size: *item.Size,
LastModified: *item.LastModified,
}
}), nil
}
func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, date time.Time) ([]string, error) {
func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
if date.Before(firstBitcoinTimestamp) {
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
}
@@ -434,7 +461,7 @@ func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, dat
return files, nil
}
func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date time.Time) ([]string, error) {
func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
if date.Before(firstBitcoinTimestamp) {
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
}
@@ -446,18 +473,21 @@ func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date t
return files, nil
}
func (d *AWSPublicDataDatasource) downloadFile(ctx context.Context, key string, w io.WriterAt) error {
downloader := manager.NewDownloader(d.s3Client, func(d *manager.Downloader) {
d.Concurrency = 32
d.PartSize = 10 * 1024 * 1024
func (d *AWSPublicDataDatasource) downloadFile(ctx context.Context, f awsFile, w io.WriterAt) error {
downloader := manager.NewDownloader(d.s3Client, func(md *manager.Downloader) {
md.Concurrency = d.config.DownloadConcurrency
md.PartSize = d.config.DownloadPartSize
if md.PartSize <= 0 {
md.PartSize = f.Size / int64(md.Concurrency)
}
})
numBytes, err := downloader.Download(ctx, w, &s3.GetObjectInput{
Bucket: aws.String(d.s3Bucket),
Key: aws.String(key),
Key: aws.String(f.Key),
})
if err != nil {
return errors.Wrapf(err, "failed to download file for bucket %q and key %q", d.s3Bucket, key)
return errors.Wrapf(err, "failed to download file for bucket %q and key %q", d.s3Bucket, f.Key)
}
if numBytes < 1 {
@@ -532,6 +562,12 @@ type (
// Script_asm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
}
awsFile struct {
Key string
Size int64
LastModified time.Time
}
)
func (a awsBlock) ToBlockHeader() (types.BlockHeader, error) {