diff --git a/core/datasources/aws_public_data.go b/core/datasources/aws_public_data.go index 3deaeeb..ac18a9e 100644 --- a/core/datasources/aws_public_data.go +++ b/core/datasources/aws_public_data.go @@ -43,7 +43,7 @@ const ( awsPublicDataS3Region = "us-east-2" awsPublicDataS3Bucket = "aws-public-blockchain" - parquetReaderConcurrency = 8 + defaultAWSPublicDataDownloadConcurrency = 8 ) var firstBitcoinTimestamp = time.Date(2009, time.January, 3, 18, 15, 5, 0, time.UTC) @@ -51,13 +51,31 @@ var firstBitcoinTimestamp = time.Date(2009, time.January, 3, 18, 15, 5, 0, time. // Make sure to implement the BitcoinDatasource interface var _ Datasource[*types.Block] = (*AWSPublicDataDatasource)(nil) +type AWSPublicDataDatasourceConfig struct { + // The number of goroutines to spin up in parallel when downloading parts. + // Concurrency of 1 will download the parts sequentially. + // Default is 8. + // + // CAUTION: High concurrency with low part size can reduce the time to download file, + // but it can also increase the memory usage. + DownloadConcurrency int + + // The size (in bytes) to request from S3 for each part. + // Default is depend on the concurrency and file size (part size = file size / concurrency). + // + // CAUTION: High concurrency with low part size can reduce the time to download file, + // but it can also increase the memory usage. + DownloadPartSize int64 `mapstructure:"download_part_size"` +} + type AWSPublicDataDatasource struct { btcDatasource Datasource[*types.Block] s3Client *s3.Client s3Bucket string + config AWSPublicDataDatasourceConfig } -func NewAWSPublicData(btcDatasource Datasource[*types.Block]) *AWSPublicDataDatasource { +func NewAWSPublicData(btcDatasource Datasource[*types.Block], conf AWSPublicDataDatasourceConfig) *AWSPublicDataDatasource { sdkConfig, err := config.LoadDefaultConfig(context.Background()) if err != nil { logger.Panic("Can't load AWS SDK user config", slogx.Error(err), slog.String("package", "datasources")) @@ -69,10 +87,15 @@ func NewAWSPublicData(btcDatasource Datasource[*types.Block]) *AWSPublicDataData o.Credentials = aws.AnonymousCredentials{} }) + if conf.DownloadConcurrency <= 0 { + conf.DownloadConcurrency = defaultAWSPublicDataDownloadConcurrency + } + return &AWSPublicDataDatasource{ btcDatasource: btcDatasource, s3Client: s3client, s3Bucket: awsPublicDataS3Bucket, + config: conf, } } @@ -140,8 +163,8 @@ func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64 } // supported only merged blocks files - startFiles = lo.Filter(startFiles, func(key string, _ int) bool { - return strings.Contains(key, "part-") + startFiles = lo.Filter(startFiles, func(file awsFile, _ int) bool { + return strings.Contains(file.Key, "part-") }) // use other datasource instead of s3 if there's no supported data @@ -187,11 +210,11 @@ func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64 return } - blocksFiles := lo.Filter(allBlocksFiles, func(key string, _ int) bool { - return strings.Contains(key, "part-") + blocksFiles := lo.Filter(allBlocksFiles, func(file awsFile, _ int) bool { + return strings.Contains(file.Key, "part-") }) - txsFiles := lo.Filter(allTxsFiles, func(key string, _ int) bool { - return strings.Contains(key, "part-") + txsFiles := lo.Filter(allTxsFiles, func(file awsFile, _ int) bool { + return strings.Contains(file.Key, "part-") }) logger.DebugContext(ctx, "Found files in AWS S3 bucket", @@ -230,6 +253,7 @@ func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64 // to improve performance while not consuming too much memory (increase around 500 MB per goroutine) var ( // TODO: create []byte pool to reduce alloc ops (reduce GC pressure) + // TODO: use FileSystem for default buffer (can choose memory or disk buffer) blocksBuffer = parquetutils.NewBuffer() txsBuffer = parquetutils.NewBuffer() ) @@ -405,7 +429,7 @@ func (d *AWSPublicDataDatasource) prepareRange(ctx context.Context, fromHeight, return startHeader, endHeader, false, nil } -func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string) ([]string, error) { +func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string) ([]awsFile, error) { result, err := d.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ Bucket: aws.String(d.s3Bucket), Prefix: aws.String(prefix), @@ -416,13 +440,16 @@ func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string) // filter empty keys objs := lo.Filter(result.Contents, func(item s3types.Object, _ int) bool { return item.Key != nil }) - - return lo.Map(objs, func(item s3types.Object, _ int) string { - return *item.Key + return lo.Map(objs, func(item s3types.Object, _ int) awsFile { + return awsFile{ + Key: *item.Key, + Size: *item.Size, + LastModified: *item.LastModified, + } }), nil } -func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, date time.Time) ([]string, error) { +func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) { if date.Before(firstBitcoinTimestamp) { return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp) } @@ -434,7 +461,7 @@ func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, dat return files, nil } -func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date time.Time) ([]string, error) { +func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) { if date.Before(firstBitcoinTimestamp) { return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp) } @@ -446,18 +473,21 @@ func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date t return files, nil } -func (d *AWSPublicDataDatasource) downloadFile(ctx context.Context, key string, w io.WriterAt) error { - downloader := manager.NewDownloader(d.s3Client, func(d *manager.Downloader) { - d.Concurrency = 32 - d.PartSize = 10 * 1024 * 1024 +func (d *AWSPublicDataDatasource) downloadFile(ctx context.Context, f awsFile, w io.WriterAt) error { + downloader := manager.NewDownloader(d.s3Client, func(md *manager.Downloader) { + md.Concurrency = d.config.DownloadConcurrency + md.PartSize = d.config.DownloadPartSize + if md.PartSize <= 0 { + md.PartSize = f.Size / int64(md.Concurrency) + } }) numBytes, err := downloader.Download(ctx, w, &s3.GetObjectInput{ Bucket: aws.String(d.s3Bucket), - Key: aws.String(key), + Key: aws.String(f.Key), }) if err != nil { - return errors.Wrapf(err, "failed to download file for bucket %q and key %q", d.s3Bucket, key) + return errors.Wrapf(err, "failed to download file for bucket %q and key %q", d.s3Bucket, f.Key) } if numBytes < 1 { @@ -532,6 +562,12 @@ type ( // Script_asm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"` // Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"` } + + awsFile struct { + Key string + Size int64 + LastModified time.Time + } ) func (a awsBlock) ToBlockHeader() (types.BlockHeader, error) {