mirror of
https://github.com/alexgo-io/gaze-indexer.git
synced 2026-04-23 11:27:22 +08:00
Compare commits
16 Commits
feature/s3
...
v0.2.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51fd1f6636 | ||
|
|
a7bc6257c4 | ||
|
|
3bb7500c87 | ||
|
|
8c92893d4a | ||
|
|
d84e30ed11 | ||
|
|
d9fa217977 | ||
|
|
709b00ec0e | ||
|
|
50ae103502 | ||
|
|
c0242bd555 | ||
|
|
6d4f1d0e87 | ||
|
|
b9fac74026 | ||
|
|
62ecd7ea49 | ||
|
|
f8fbd67bd8 | ||
|
|
cc2649dd64 | ||
|
|
d96370454b | ||
|
|
371d1fe008 |
18
.dockerignore
Normal file
18
.dockerignore
Normal file
@@ -0,0 +1,18 @@
|
||||
.git
|
||||
.gitignore
|
||||
.github
|
||||
.vscode
|
||||
**/*.md
|
||||
**/*.log
|
||||
.DS_Store
|
||||
|
||||
# Docker
|
||||
Dockerfile
|
||||
.dockerignore
|
||||
docker-compose.yml
|
||||
|
||||
# Go
|
||||
.golangci.yaml
|
||||
cmd.local
|
||||
config.*.y*ml
|
||||
config.y*ml
|
||||
13
Dockerfile
13
Dockerfile
@@ -3,15 +3,15 @@ FROM golang:1.22 as builder
|
||||
WORKDIR /app
|
||||
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
RUN --mount=type=cache,target=/go/pkg/mod/ go mod download
|
||||
|
||||
COPY ./ ./
|
||||
|
||||
ENV GOOS=linux
|
||||
ENV CGO_ENABLED=0
|
||||
|
||||
RUN go build \
|
||||
-o main ./main.go
|
||||
RUN --mount=type=cache,target=/go/pkg/mod/ \
|
||||
go build -o main ./main.go
|
||||
|
||||
FROM alpine:latest
|
||||
|
||||
@@ -19,9 +19,10 @@ WORKDIR /app
|
||||
|
||||
RUN apk --no-cache add ca-certificates tzdata
|
||||
|
||||
|
||||
COPY --from=builder /app/main .
|
||||
COPY --from=builder /app/modules ./modules
|
||||
|
||||
# You can set `TZ` environment variable to change the timezone
|
||||
# You can set TZ identifier to change the timezone, See https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List
|
||||
# ENV TZ=US/Central
|
||||
|
||||
CMD ["/app/main", "run"]
|
||||
ENTRYPOINT ["/app/main"]
|
||||
|
||||
10
README.md
10
README.md
@@ -25,7 +25,7 @@ This allows developers to focus on what **truly** matters: Meta-protocol indexin
|
||||
### 1. Runes
|
||||
|
||||
The Runes Indexer is our first meta-protocol indexer. It indexes Runes states, transactions, runestones, and balances using Bitcoin transactions.
|
||||
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://documenter.getpostman.com/view/28396285/2sA3Bn7Cxr) for full details.
|
||||
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://api-docs.gaze.network) for full details.
|
||||
|
||||
## Installation
|
||||
|
||||
@@ -51,8 +51,6 @@ Here is our minimum database disk space requirement for each module.
|
||||
| ------ | -------------------------- | ---------------------------- |
|
||||
| Runes | 10 GB | 150 GB |
|
||||
|
||||
Here is our minimum database disk space requirement for each module.
|
||||
|
||||
#### 4. Prepare `config.yaml` file.
|
||||
|
||||
```yaml
|
||||
@@ -88,7 +86,7 @@ modules:
|
||||
# Configuration options for Runes module. Can be removed if not used.
|
||||
runes:
|
||||
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
|
||||
datasource: "database" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node".
|
||||
datasource: "bitcoin-node" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node".
|
||||
api_handlers: # API handlers to enable. current supported handlers: "http"
|
||||
- http
|
||||
postgres:
|
||||
@@ -108,14 +106,14 @@ We will be using `docker-compose` for our installation guide. Make sure the `doc
|
||||
# docker-compose.yaml
|
||||
services:
|
||||
gaze-indexer:
|
||||
image: ghcr.io/gaze-network/gaze-indexer:v1.0.0
|
||||
image: ghcr.io/gaze-network/gaze-indexer:v0.2.1
|
||||
container_name: gaze-indexer
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- 8080:8080 # Expose HTTP server port to host
|
||||
volumes:
|
||||
- "./config.yaml:/app/config.yaml" # mount config.yaml file to the container as "/app/config.yaml"
|
||||
command: ["/app/main", "run", "--runes"] # Put module flags after "run" commands to select which modules to run.
|
||||
command: ["/app/main", "run", "--modules", "runes"] # Put module flags after "run" commands to select which modules to run.
|
||||
```
|
||||
|
||||
### Install from source
|
||||
|
||||
@@ -22,10 +22,15 @@ import (
|
||||
"github.com/gaze-network/indexer-network/pkg/errorhandler"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
|
||||
"github.com/gaze-network/indexer-network/pkg/middleware/requestlogger"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclient"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/fiber/v2/middleware/compress"
|
||||
"github.com/gofiber/fiber/v2/middleware/cors"
|
||||
"github.com/gofiber/fiber/v2/middleware/favicon"
|
||||
fiberrecover "github.com/gofiber/fiber/v2/middleware/recover"
|
||||
"github.com/gofiber/fiber/v2/middleware/requestid"
|
||||
"github.com/samber/do/v2"
|
||||
"github.com/samber/lo"
|
||||
"github.com/spf13/cobra"
|
||||
@@ -135,6 +140,14 @@ func runHandler(cmd *cobra.Command, _ []string) error {
|
||||
ErrorHandler: errorhandler.NewHTTPErrorHandler(),
|
||||
})
|
||||
app.
|
||||
Use(favicon.New()).
|
||||
Use(cors.New()).
|
||||
Use(requestid.New()).
|
||||
Use(requestcontext.New(
|
||||
requestcontext.WithRequestId(),
|
||||
requestcontext.WithClientIP(conf.HTTPServer.RequestIP),
|
||||
)).
|
||||
Use(requestlogger.New(conf.HTTPServer.Logger)).
|
||||
Use(fiberrecover.New(fiberrecover.Config{
|
||||
EnableStackTrace: true,
|
||||
StackTraceHandler: func(c *fiber.Ctx, e interface{}) {
|
||||
|
||||
@@ -23,6 +23,14 @@ reporting:
|
||||
# HTTP server configuration options.
|
||||
http_server:
|
||||
port: 8080 # Port to run the HTTP server on for modules with HTTP API handlers.
|
||||
logger:
|
||||
disable: false # disable logger if logger level is `INFO`
|
||||
request_header: false
|
||||
request_query: false
|
||||
requestip: # Client IP extraction configuration options. This is unnecessary if you don't care about the real client IP or if you're not using a reverse proxy.
|
||||
trusted_proxies_ip: # Cloudflare, GCP Public LB. See: server/internal/middleware/requestcontext/PROXY-IP.md
|
||||
trusted_proxies_header: # X-Real-IP, CF-Connecting-IP
|
||||
enable_reject_malformed_request: false # return 403 if request is malformed (invalid IP)
|
||||
|
||||
# Meta-protocol modules configuration options.
|
||||
modules:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
package constants
|
||||
|
||||
const (
|
||||
Version = "v0.0.1"
|
||||
Version = "v0.2.1"
|
||||
)
|
||||
|
||||
@@ -1,688 +0,0 @@
|
||||
// AWS Public Blockchain Datasource
|
||||
// - https://registry.opendata.aws/aws-public-blockchain
|
||||
// - https://github.com/aws-solutions-library-samples/guidance-for-digital-assets-on-aws
|
||||
//
|
||||
// To setup your own data source, see: https://github.com/aws-solutions-library-samples/guidance-for-digital-assets-on-aws/blob/main/analytics/producer/README.md
|
||||
package datasources
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"math"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/common"
|
||||
"github.com/gaze-network/indexer-network/common/errs"
|
||||
"github.com/gaze-network/indexer-network/core/types"
|
||||
"github.com/gaze-network/indexer-network/internal/subscription"
|
||||
"github.com/gaze-network/indexer-network/pkg/btcutils"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
"github.com/gaze-network/indexer-network/pkg/parquetutils"
|
||||
"github.com/samber/lo"
|
||||
parquettypes "github.com/xitongsys/parquet-go/types"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
awsPublicDataS3Region = "us-east-2"
|
||||
awsPublicDataS3Bucket = "aws-public-blockchain"
|
||||
|
||||
defaultAWSPublicDataDownloadConcurrency = 8
|
||||
)
|
||||
|
||||
var firstBitcoinTimestamp = time.Date(2009, time.January, 3, 18, 15, 5, 0, time.UTC)
|
||||
|
||||
// Make sure to implement the BitcoinDatasource interface
|
||||
var _ Datasource[*types.Block] = (*AWSPublicDataDatasource)(nil)
|
||||
|
||||
type AWSPublicDataDatasourceConfig struct {
|
||||
// The number of goroutines to spin up in parallel when downloading parts.
|
||||
// Concurrency of 1 will download the parts sequentially.
|
||||
// Default is 8.
|
||||
//
|
||||
// CAUTION: High concurrency with low part size can reduce the time to download file,
|
||||
// but it can also increase the memory usage.
|
||||
DownloadConcurrency int
|
||||
|
||||
// The size (in bytes) to request from S3 for each part.
|
||||
// Default is depend on the concurrency and file size (part size = file size / concurrency).
|
||||
//
|
||||
// CAUTION: High concurrency with low part size can reduce the time to download file,
|
||||
// but it can also increase the memory usage.
|
||||
DownloadPartSize int64 `mapstructure:"download_part_size"`
|
||||
}
|
||||
|
||||
type AWSPublicDataDatasource struct {
|
||||
btcDatasource Datasource[*types.Block]
|
||||
s3Client *s3.Client
|
||||
s3Bucket string
|
||||
config AWSPublicDataDatasourceConfig
|
||||
}
|
||||
|
||||
func NewAWSPublicData(btcDatasource Datasource[*types.Block], conf AWSPublicDataDatasourceConfig) *AWSPublicDataDatasource {
|
||||
sdkConfig, err := config.LoadDefaultConfig(context.Background())
|
||||
if err != nil {
|
||||
logger.Panic("Can't load AWS SDK user config", slogx.Error(err), slog.String("package", "datasources"))
|
||||
}
|
||||
|
||||
// TODO: support user defined config (self-hosted s3 bucket)
|
||||
s3client := s3.NewFromConfig(sdkConfig, func(o *s3.Options) {
|
||||
o.Region = awsPublicDataS3Region
|
||||
o.Credentials = aws.AnonymousCredentials{}
|
||||
})
|
||||
|
||||
if conf.DownloadConcurrency <= 0 {
|
||||
conf.DownloadConcurrency = defaultAWSPublicDataDownloadConcurrency
|
||||
}
|
||||
|
||||
return &AWSPublicDataDatasource{
|
||||
btcDatasource: btcDatasource,
|
||||
s3Client: s3client,
|
||||
s3Bucket: awsPublicDataS3Bucket,
|
||||
config: conf,
|
||||
}
|
||||
}
|
||||
|
||||
func (d AWSPublicDataDatasource) Name() string {
|
||||
return fmt.Sprintf("aws_public_data/%s", d.btcDatasource.Name())
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
|
||||
ch := make(chan []*types.Block)
|
||||
subscription, err := d.FetchAsync(ctx, from, to, ch)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
defer subscription.Unsubscribe()
|
||||
|
||||
blocks := make([]*types.Block, 0)
|
||||
for {
|
||||
select {
|
||||
case b, ok := <-ch:
|
||||
if !ok {
|
||||
return blocks, nil
|
||||
}
|
||||
blocks = append(blocks, b...)
|
||||
case <-subscription.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, errors.Wrap(err, "context done")
|
||||
}
|
||||
return blocks, nil
|
||||
case err := <-subscription.Err():
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "got error while fetch async")
|
||||
}
|
||||
return blocks, nil
|
||||
case <-ctx.Done():
|
||||
return nil, errors.Wrap(ctx.Err(), "context done")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
|
||||
ctx = logger.WithContext(ctx,
|
||||
slogx.String("package", "datasources"),
|
||||
slogx.String("datasource", d.Name()),
|
||||
)
|
||||
|
||||
start, end, skip, err := d.prepareRange(ctx, from, to)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to prepare fetch range")
|
||||
}
|
||||
|
||||
subscription := subscription.NewSubscription(ch)
|
||||
if skip {
|
||||
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unsubscribe")
|
||||
}
|
||||
return subscription.Client(), nil
|
||||
}
|
||||
|
||||
startFiles, err := d.listBlocksFilesByDate(ctx, start.Timestamp)
|
||||
if err != nil {
|
||||
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unsubscribe")
|
||||
}
|
||||
return nil, errors.Wrap(err, "failed to list files by date")
|
||||
}
|
||||
|
||||
// supported only merged blocks files
|
||||
startFiles = lo.Filter(startFiles, func(file awsFile, _ int) bool {
|
||||
return strings.Contains(file.Key, "part-")
|
||||
})
|
||||
|
||||
// use other datasource instead of s3 if there's no supported data
|
||||
if len(startFiles) == 0 {
|
||||
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unsubscribe")
|
||||
}
|
||||
s, err := d.btcDatasource.FetchAsync(ctx, start.Height, end.Height, ch)
|
||||
return s, errors.WithStack(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
// add a bit delay to prevent shutdown before client receive all blocks
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
subscription.Unsubscribe()
|
||||
}()
|
||||
// loop through each day until reach the end of supported data or within end block date
|
||||
for ts := start.Timestamp; ts.Before(end.Timestamp.Round(24*time.Hour)) && ts.Before(time.Now()); ts = ts.Add(24 * time.Hour) {
|
||||
ctx := logger.WithContext(ctx,
|
||||
slogx.Time("date", ts),
|
||||
slogx.Int64("date_unix", ts.Unix()),
|
||||
)
|
||||
|
||||
logger.DebugContext(ctx, "Fetching data from AWS S3", slogx.Int64("start", start.Height), slogx.Int64("end", end.Height))
|
||||
|
||||
allBlocksFiles, err := d.listBlocksFilesByDate(ctx, ts)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to list blocks files by date from aws s3", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.WithStack(err)); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
allTxsFiles, err := d.listTxsFilesByDate(ctx, ts)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to list txs files by date from aws s3", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.WithStack(err)); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
blocksFiles := lo.Filter(allBlocksFiles, func(file awsFile, _ int) bool {
|
||||
return strings.Contains(file.Key, "part-")
|
||||
})
|
||||
txsFiles := lo.Filter(allTxsFiles, func(file awsFile, _ int) bool {
|
||||
return strings.Contains(file.Key, "part-")
|
||||
})
|
||||
|
||||
logger.DebugContext(ctx, "Found files in AWS S3 bucket",
|
||||
slogx.Int("files_blocks", len(allBlocksFiles)),
|
||||
slogx.Int("files_blocks_merged", len(blocksFiles)),
|
||||
slogx.Int("files_txs_all", len(allTxsFiles)),
|
||||
slogx.Int("files_txs_merged", len(txsFiles)),
|
||||
)
|
||||
|
||||
// Reach the end of supported data,
|
||||
// stop fetching data from AWS S3
|
||||
if len(blocksFiles) == 0 || len(txsFiles) == 0 {
|
||||
logger.DebugContext(ctx, "No blocks files found, stop fetching data from AWS S3")
|
||||
return
|
||||
}
|
||||
|
||||
// prevent unexpected error
|
||||
{
|
||||
if len(blocksFiles) != 1 {
|
||||
logger.ErrorContext(ctx, "Unexpected blocks files count, should be 1", slogx.Int("count", len(blocksFiles)))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(errs.InternalError, "unexpected blocks files count")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(txsFiles) != 1 {
|
||||
logger.ErrorContext(ctx, "Unexpected txs files count, should be 1", slogx.Int("count", len(txsFiles)))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(errs.InternalError, "unexpected txs files count")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: use concurrent stream (max 2 goroutine) to download files then sequentially read parquet files
|
||||
// to improve performance while not consuming too much memory (increase around 500 MB per goroutine)
|
||||
var (
|
||||
// TODO: create []byte pool to reduce alloc ops (reduce GC pressure)
|
||||
// TODO: use FileSystem for default buffer (can choose memory or disk buffer)
|
||||
blocksBuffer = parquetutils.NewBuffer()
|
||||
txsBuffer = parquetutils.NewBuffer()
|
||||
)
|
||||
startDownload := time.Now()
|
||||
if err := d.downloadFile(ctx, blocksFiles[0], blocksBuffer); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to download blocks file from AWS S3", slogx.Int("count", len(txsFiles)))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't download blocks file")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
if err := d.downloadFile(ctx, txsFiles[0], txsBuffer); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to download blocks file from AWS S3", slogx.Int("count", len(txsFiles)))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't download blocks file")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
logger.DebugContext(ctx, "Downloaded files from AWS S3",
|
||||
slogx.Duration("duration", time.Since(startDownload)),
|
||||
slogx.Int("sizes_blocks", len(blocksBuffer.Bytes())),
|
||||
slogx.Int("sizes_txs", len(txsBuffer.Bytes())),
|
||||
)
|
||||
|
||||
// Read parquet files
|
||||
startRead := time.Now()
|
||||
|
||||
// we can read all blocks data at once because it's small
|
||||
rawAllBlocks, err := parquetutils.ReadAll[awsBlock](blocksBuffer)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to read parquet blocks data", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't read parquet blocks data")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: We shouldn't read all txs data at once because it's very huge (up to ~1.5GB memory usage)
|
||||
// we should read it by chunk and send it to subscription client to reduce memory usage.
|
||||
// But AWS Public Dataset are not sorted by block number and index,
|
||||
// so we can't avoid reading all transactions data by skip unnecessary transactions
|
||||
// or chunk data by block number to reduce memory usage :(
|
||||
rawAllTxs, err := parquetutils.ReadAll[awsTransaction](blocksBuffer)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to read parquet txs data", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't read parquet blocks data")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
groupRawTxs := lo.GroupBy(rawAllTxs, func(tx awsTransaction) int64 {
|
||||
return tx.BlockNumber
|
||||
})
|
||||
|
||||
// filter blocks data by height range
|
||||
rawFilteredBlocks := lo.Filter(rawAllBlocks, func(block awsBlock, _ int) bool {
|
||||
return block.Number >= start.Height && block.Number <= end.Height
|
||||
})
|
||||
slices.SortFunc(rawFilteredBlocks, func(i, j awsBlock) int {
|
||||
return cmp.Compare(i.Number, j.Number)
|
||||
})
|
||||
|
||||
logger.DebugContext(ctx, "Read parquet files",
|
||||
slogx.Duration("duration", time.Since(startRead)),
|
||||
slogx.Int("total_blocks", len(rawAllBlocks)),
|
||||
slogx.Int("filtered_blocks", len(rawFilteredBlocks)),
|
||||
slogx.Int("total_txs", len(rawAllTxs)),
|
||||
slogx.Int("total_txs_grouped", len(groupRawTxs)),
|
||||
)
|
||||
|
||||
blocks := make([]*types.Block, 0, len(rawFilteredBlocks))
|
||||
for _, rawBlock := range rawFilteredBlocks {
|
||||
blockHeader, err := rawBlock.ToBlockHeader()
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to convert aws block to type block header", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't convert aws block to type block header")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
txs := make([]*types.Transaction, 0, len(groupRawTxs[blockHeader.Height]))
|
||||
for _, rawTx := range groupRawTxs[rawBlock.Number] {
|
||||
tx, err := rawTx.ToTransaction(rawBlock)
|
||||
if err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to convert aws transaction to type transaction", slogx.Error(err))
|
||||
if err := subscription.SendError(ctx, errors.Wrap(err, "can't convert aws transaction to type transaction")); err != nil {
|
||||
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
txs = append(txs, tx)
|
||||
}
|
||||
slices.SortFunc(txs, func(i, j *types.Transaction) int {
|
||||
return cmp.Compare(i.Index, j.Index)
|
||||
})
|
||||
|
||||
blocks = append(blocks, &types.Block{
|
||||
Header: blockHeader,
|
||||
Transactions: txs,
|
||||
})
|
||||
}
|
||||
|
||||
logger.DebugContext(ctx, "Send blocks to subscription client", slogx.Int("count", len(blocks)))
|
||||
if err := subscription.Send(ctx, blocks); err != nil {
|
||||
if errors.Is(err, errs.Closed) {
|
||||
logger.DebugContext(ctx, "Subscription client closed, can't send", slogx.Error(err))
|
||||
return
|
||||
}
|
||||
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
|
||||
slogx.Int64("start", blocks[0].Header.Height),
|
||||
slogx.Int64("end", blocks[len(blocks)-1].Header.Height),
|
||||
slogx.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return subscription.Client(), nil
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error) {
|
||||
header, err := d.btcDatasource.GetBlockHeader(ctx, height)
|
||||
return header, errors.WithStack(err)
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) GetCurrentBlockHeight(ctx context.Context) (int64, error) {
|
||||
height, err := d.btcDatasource.GetCurrentBlockHeight(ctx)
|
||||
return height, errors.WithStack(err)
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) prepareRange(ctx context.Context, fromHeight, toHeight int64) (startHeader, endHeader types.BlockHeader, skip bool, err error) {
|
||||
start := fromHeight
|
||||
end := toHeight
|
||||
|
||||
// get current bitcoin block height
|
||||
latestBlockHeight, err := d.btcDatasource.GetCurrentBlockHeight(ctx)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrap(err, "failed to get block count")
|
||||
}
|
||||
|
||||
// set start to genesis block height
|
||||
if start < 0 {
|
||||
start = 0
|
||||
}
|
||||
|
||||
// set end to current bitcoin block height if
|
||||
// - end is -1
|
||||
// - end is greater that current bitcoin block height
|
||||
if end < 0 || end > latestBlockHeight {
|
||||
end = latestBlockHeight
|
||||
}
|
||||
|
||||
// if start is greater than end, skip this round
|
||||
if start > end {
|
||||
return types.BlockHeader{}, types.BlockHeader{}, true, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrapf(err, "block %v", end)
|
||||
}
|
||||
|
||||
group, groupctx := errgroup.WithContext(ctx)
|
||||
group.Go(func() error {
|
||||
startHeader, err = d.GetBlockHeader(groupctx, start)
|
||||
return errors.Wrapf(err, "block %v", start)
|
||||
})
|
||||
group.Go(func() error {
|
||||
endHeader, err = d.GetBlockHeader(ctx, end)
|
||||
return errors.Wrapf(err, "block %v", end)
|
||||
})
|
||||
if err := group.Wait(); err != nil {
|
||||
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrap(err, "failed to get block header")
|
||||
}
|
||||
|
||||
return startHeader, endHeader, false, nil
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string) ([]awsFile, error) {
|
||||
result, err := d.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(d.s3Bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "can't list s3 bucket objects for bucket %q and prefix %q", d.s3Bucket, prefix)
|
||||
}
|
||||
|
||||
// filter empty keys
|
||||
objs := lo.Filter(result.Contents, func(item s3types.Object, _ int) bool { return item.Key != nil })
|
||||
return lo.Map(objs, func(item s3types.Object, _ int) awsFile {
|
||||
return awsFile{
|
||||
Key: *item.Key,
|
||||
Size: *item.Size,
|
||||
LastModified: *item.LastModified,
|
||||
}
|
||||
}), nil
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
|
||||
if date.Before(firstBitcoinTimestamp) {
|
||||
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
|
||||
}
|
||||
prefix := "v1.0/btc/blocks/date=" + date.UTC().Format(time.DateOnly)
|
||||
files, err := d.listFiles(ctx, prefix)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to list blocks files by date")
|
||||
}
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
|
||||
if date.Before(firstBitcoinTimestamp) {
|
||||
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
|
||||
}
|
||||
prefix := "v1.0/btc/transactions/date=" + date.UTC().Format(time.DateOnly)
|
||||
files, err := d.listFiles(ctx, prefix)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to list txs files by date")
|
||||
}
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (d *AWSPublicDataDatasource) downloadFile(ctx context.Context, f awsFile, w io.WriterAt) error {
|
||||
downloader := manager.NewDownloader(d.s3Client, func(md *manager.Downloader) {
|
||||
md.Concurrency = d.config.DownloadConcurrency
|
||||
md.PartSize = d.config.DownloadPartSize
|
||||
if md.PartSize <= 0 {
|
||||
md.PartSize = f.Size / int64(md.Concurrency)
|
||||
}
|
||||
})
|
||||
|
||||
numBytes, err := downloader.Download(ctx, w, &s3.GetObjectInput{
|
||||
Bucket: aws.String(d.s3Bucket),
|
||||
Key: aws.String(f.Key),
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to download file for bucket %q and key %q", d.s3Bucket, f.Key)
|
||||
}
|
||||
|
||||
if numBytes < 1 {
|
||||
return errors.Wrap(errs.NotFound, "got empty file")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: remove unused fields to reduce memory usage
|
||||
type (
|
||||
awsBlock struct {
|
||||
Hash string `parquet:"name=hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
Bits string `parquet:"name=bits, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"` // Hex string format
|
||||
PreviousBlockHash string `parquet:"name=previousblockhash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
MerkleRoot string `parquet:"name=merkle_root, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
CoinbaseParam string `parquet:"name=coinbase_param, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
Timestamp string `parquet:"name=timestamp, type=INT96, repetitiontype=OPTIONAL"`
|
||||
Number int64 `parquet:"name=number, type=INT64, repetitiontype=OPTIONAL"`
|
||||
Version int64 `parquet:"name=version, type=INT64, repetitiontype=OPTIONAL"`
|
||||
Nonce int64 `parquet:"name=nonce, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// MedianTime string `parquet:"name=mediantime, type=INT96, repetitiontype=OPTIONAL"`
|
||||
// Difficulty float64 `parquet:"name=difficulty, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
// Chainwork string `parquet:"name=chainwork, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Size int64 `parquet:"name=size, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// Weight int64 `parquet:"name=weight, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// TransactionCount int64 `parquet:"name=transaction_count, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// StrippedSize int64 `parquet:"name=stripped_size, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// Date string `parquet:"name=date, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// LastModified string `parquet:"name=last_modified, type=INT96, repetitiontype=OPTIONAL"`
|
||||
}
|
||||
awsTransaction struct {
|
||||
Hash string `parquet:"name=hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
BlockHash string `parquet:"name=block_hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
Outputs []*awsTxOutput `parquet:"name=outputs, type=LIST, repetitiontype=OPTIONAL, valuetype=STRUCT"`
|
||||
Inputs []*awsTxInput `parquet:"name=inputs, type=LIST, repetitiontype=OPTIONAL, valuetype=STRUCT"`
|
||||
Version int64 `parquet:"name=version, type=INT64, repetitiontype=OPTIONAL"`
|
||||
Size int64 `parquet:"name=size, type=INT64, repetitiontype=OPTIONAL"`
|
||||
BlockNumber int64 `parquet:"name=block_number, type=INT64, repetitiontype=OPTIONAL"`
|
||||
Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
|
||||
LockTime int64 `parquet:"name=lock_time, type=INT64, repetitiontype=OPTIONAL"`
|
||||
IsCoinbase bool `parquet:"name=is_coinbase, type=BOOLEAN, repetitiontype=OPTIONAL"`
|
||||
// VirtualSize int64 `parquet:"name=virtual_size, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// InputCount int64 `parquet:"name=input_count, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// OutputCount int64 `parquet:"name=output_count, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// OutputValue float64 `parquet:"name=output_value, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
// BlockTimestamp string `parquet:"name=block_timestamp, type=INT96, repetitiontype=OPTIONAL"`
|
||||
// Date string `parquet:"name=date, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// LastModified string `parquet:"name=last_modified, type=INT96, repetitiontype=OPTIONAL"`
|
||||
// Fee float64 `parquet:"name=fee, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
// InputValue float64 `parquet:"name=input_value, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
}
|
||||
awsTxInput struct {
|
||||
ScriptHex string `parquet:"name=script_hex, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
SpentTransactionHash string `parquet:"name=spent_transaction_hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
TxInWitness []*string `parquet:"name=txinwitness, type=LIST, repetitiontype=OPTIONAL, valuetype=BYTE_ARRAY, convertedtype=UTF8"`
|
||||
SpentOutputIndex int64 `parquet:"name=spent_output_index, type=INT64, repetitiontype=OPTIONAL"`
|
||||
Sequence int64 `parquet:"name=sequence, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// Address string `parquet:"name=address, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// RequiredSignatures int64 `parquet:"name=required_signatures, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// ScriptAsm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Value float64 `parquet:"name=value, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
}
|
||||
awsTxOutput struct {
|
||||
Script_hex string `parquet:"name=script_hex, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
Value float64 `parquet:"name=value, type=DOUBLE, repetitiontype=OPTIONAL"`
|
||||
// Address string `parquet:"name=address, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// Required_signatures int64 `parquet:"name=required_signatures, type=INT64, repetitiontype=OPTIONAL"`
|
||||
// Script_asm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
|
||||
}
|
||||
|
||||
awsFile struct {
|
||||
Key string
|
||||
Size int64
|
||||
LastModified time.Time
|
||||
}
|
||||
)
|
||||
|
||||
func (a awsBlock) ToBlockHeader() (types.BlockHeader, error) {
|
||||
hash, err := chainhash.NewHashFromStr(a.Hash)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.Wrap(err, "can't convert block hash")
|
||||
}
|
||||
prevBlockHash, err := chainhash.NewHashFromStr(a.PreviousBlockHash)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.Wrap(err, "can't convert previous block hash")
|
||||
}
|
||||
merkleRoot, err := chainhash.NewHashFromStr(a.MerkleRoot)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.Wrap(err, "can't convert merkle root")
|
||||
}
|
||||
|
||||
bits, err := strconv.ParseUint(a.Bits, 16, 32)
|
||||
if err != nil {
|
||||
return types.BlockHeader{}, errors.Wrap(err, "can't convert bits from hex str to uint32")
|
||||
}
|
||||
|
||||
return types.BlockHeader{
|
||||
Hash: *hash,
|
||||
Height: a.Number,
|
||||
Version: int32(a.Version),
|
||||
PrevBlock: *prevBlockHash,
|
||||
MerkleRoot: *merkleRoot,
|
||||
Timestamp: parquettypes.INT96ToTime(a.Timestamp),
|
||||
Bits: uint32(bits),
|
||||
Nonce: uint32(a.Nonce),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a awsTransaction) ToTransaction(block awsBlock) (*types.Transaction, error) {
|
||||
blockhash, err := chainhash.NewHashFromStr(block.Hash)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't convert block hash")
|
||||
}
|
||||
msgtx, err := a.MsgTx(block)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't convert aws tx to wire.msgtx")
|
||||
}
|
||||
return types.ParseMsgTx(msgtx, a.BlockNumber, *blockhash, uint32(a.Index)), nil
|
||||
}
|
||||
|
||||
func (a awsTransaction) MsgTx(block awsBlock) (*wire.MsgTx, error) {
|
||||
txIn := make([]*wire.TxIn, 0, len(a.Inputs))
|
||||
txOut := make([]*wire.TxOut, 0, len(a.Outputs))
|
||||
|
||||
// NOTE: coinbase tx from AWS S3 has no inputs, so we need to add it manually,
|
||||
// but we can't guarantee this data is correct especially the sequence number.
|
||||
if a.IsCoinbase && len(a.Inputs) == 0 {
|
||||
scriptsig, err := hex.DecodeString(block.CoinbaseParam)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't decode script hex")
|
||||
}
|
||||
|
||||
txIn = append(txIn, &wire.TxIn{
|
||||
PreviousOutPoint: wire.OutPoint{
|
||||
Hash: common.ZeroHash,
|
||||
Index: math.MaxUint32,
|
||||
},
|
||||
SignatureScript: scriptsig,
|
||||
Witness: btcutils.CoinbaseWitness,
|
||||
Sequence: math.MaxUint32, // most coinbase tx are using max sequence number
|
||||
})
|
||||
}
|
||||
|
||||
for _, in := range a.Inputs {
|
||||
scriptsig, err := hex.DecodeString(in.ScriptHex)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't decode script hex")
|
||||
}
|
||||
|
||||
witness, err := btcutils.WitnessFromHex(lo.Map(in.TxInWitness, func(src *string, _ int) string {
|
||||
if src == nil {
|
||||
return ""
|
||||
}
|
||||
return *src
|
||||
}))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't convert witness")
|
||||
}
|
||||
|
||||
prevOutHash, err := chainhash.NewHashFromStr(in.SpentTransactionHash)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't convert prevout hash")
|
||||
}
|
||||
|
||||
txIn = append(txIn, &wire.TxIn{
|
||||
PreviousOutPoint: wire.OutPoint{
|
||||
Hash: *prevOutHash,
|
||||
Index: uint32(in.SpentOutputIndex),
|
||||
},
|
||||
SignatureScript: scriptsig,
|
||||
Witness: witness,
|
||||
Sequence: uint32(in.Sequence),
|
||||
})
|
||||
}
|
||||
|
||||
for _, out := range a.Outputs {
|
||||
scriptpubkey, err := hex.DecodeString(out.Script_hex)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't decode script hex")
|
||||
}
|
||||
txOut = append(txOut, &wire.TxOut{
|
||||
Value: btcutils.BitcoinToSatoshi(out.Value),
|
||||
PkScript: scriptpubkey,
|
||||
})
|
||||
}
|
||||
|
||||
return &wire.MsgTx{
|
||||
Version: int32(a.Version),
|
||||
TxIn: txIn,
|
||||
TxOut: txOut,
|
||||
LockTime: uint32(a.LockTime),
|
||||
}, nil
|
||||
}
|
||||
@@ -88,7 +88,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
slogx.String("datasource", d.Name()),
|
||||
)
|
||||
|
||||
from, to, skip, err := d.prepareRange(ctx, from, to)
|
||||
from, to, skip, err := d.prepareRange(from, to)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to prepare fetch range")
|
||||
}
|
||||
@@ -212,12 +212,12 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
|
||||
return subscription.Client(), nil
|
||||
}
|
||||
|
||||
func (d *BitcoinNodeDatasource) prepareRange(ctx context.Context, fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
|
||||
func (d *BitcoinNodeDatasource) prepareRange(fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
|
||||
start = fromHeight
|
||||
end = toHeight
|
||||
|
||||
// get current bitcoin block height
|
||||
latestBlockHeight, err := d.GetCurrentBlockHeight(ctx)
|
||||
latestBlockHeight, err := d.btcclient.GetBlockCount()
|
||||
if err != nil {
|
||||
return -1, -1, false, errors.Wrap(err, "failed to get block count")
|
||||
}
|
||||
@@ -227,7 +227,7 @@ func (d *BitcoinNodeDatasource) prepareRange(ctx context.Context, fromHeight, to
|
||||
start = 0
|
||||
}
|
||||
|
||||
// set end to current bitcoin block height if d
|
||||
// set end to current bitcoin block height if
|
||||
// - end is -1
|
||||
// - end is greater that current bitcoin block height
|
||||
if end < 0 || end > latestBlockHeight {
|
||||
@@ -292,12 +292,3 @@ func (d *BitcoinNodeDatasource) GetBlockHeader(ctx context.Context, height int64
|
||||
|
||||
return types.ParseMsgBlockHeader(*block, height), nil
|
||||
}
|
||||
|
||||
// GetCurrentBlockHeight fetch current block height from Bitcoin node
|
||||
func (d *BitcoinNodeDatasource) GetCurrentBlockHeight(ctx context.Context) (int64, error) {
|
||||
height, err := d.btcclient.GetBlockCount()
|
||||
if err != nil {
|
||||
return -1, errors.Wrap(err, "failed to get block height")
|
||||
}
|
||||
return height, nil
|
||||
}
|
||||
|
||||
@@ -13,5 +13,4 @@ type Datasource[T any] interface {
|
||||
Fetch(ctx context.Context, from, to int64) ([]T, error)
|
||||
FetchAsync(ctx context.Context, from, to int64, ch chan<- []T) (*subscription.ClientSubscription[[]T], error)
|
||||
GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error)
|
||||
GetCurrentBlockHeight(ctx context.Context) (int64, error)
|
||||
}
|
||||
|
||||
@@ -91,6 +91,10 @@ func (i *Indexer[T]) Run(ctx context.Context) (err error) {
|
||||
select {
|
||||
case <-i.quit:
|
||||
logger.InfoContext(ctx, "Got quit signal, stopping indexer")
|
||||
if err := i.Processor.Shutdown(ctx); err != nil {
|
||||
logger.ErrorContext(ctx, "Failed to shutdown processor", slogx.Error(err))
|
||||
return errors.Wrap(err, "processor shutdown failed")
|
||||
}
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
@@ -204,9 +208,9 @@ func (i *Indexer[T]) process(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// validate is input is continuous and no reorg
|
||||
for i := 1; i < len(inputs); i++ {
|
||||
header := inputs[i].BlockHeader()
|
||||
prevHeader := inputs[i-1].BlockHeader()
|
||||
prevHeader := i.currentBlock
|
||||
for i, input := range inputs {
|
||||
header := input.BlockHeader()
|
||||
if header.Height != prevHeader.Height+1 {
|
||||
return errors.Wrapf(errs.InternalError, "input is not continuous, input[%d] height: %d, input[%d] height: %d", i-1, prevHeader.Height, i, header.Height)
|
||||
}
|
||||
@@ -217,6 +221,7 @@ func (i *Indexer[T]) process(ctx context.Context) (err error) {
|
||||
// end current round
|
||||
return nil
|
||||
}
|
||||
prevHeader = header
|
||||
}
|
||||
|
||||
ctx = logger.WithContext(ctx, slog.Int("total_inputs", len(inputs)))
|
||||
|
||||
@@ -29,6 +29,9 @@ type Processor[T Input] interface {
|
||||
// VerifyStates verifies the states of the indexed data and the indexer
|
||||
// to ensure the last shutdown was graceful and no missing data.
|
||||
VerifyStates(ctx context.Context) error
|
||||
|
||||
// Shutdown gracefully stops the processor. Database connections, network calls, leftover states, etc. should be closed and cleaned up here.
|
||||
Shutdown(ctx context.Context) error
|
||||
}
|
||||
|
||||
type IndexerWorker interface {
|
||||
|
||||
29
go.mod
29
go.mod
@@ -4,10 +4,6 @@ go 1.22
|
||||
|
||||
require (
|
||||
github.com/Cleverse/go-utilities/utils v0.0.0-20240119201306-d71eb577ef11
|
||||
github.com/aws/aws-sdk-go-v2 v1.23.0
|
||||
github.com/aws/aws-sdk-go-v2/config v1.25.3
|
||||
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.14.0
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.43.0
|
||||
github.com/btcsuite/btcd v0.24.0
|
||||
github.com/btcsuite/btcd/btcutil v1.1.5
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
|
||||
@@ -20,37 +16,18 @@ require (
|
||||
github.com/planxnx/concurrent-stream v0.1.5
|
||||
github.com/samber/do/v2 v2.0.0-beta.7
|
||||
github.com/samber/lo v1.39.0
|
||||
github.com/shopspring/decimal v1.4.0
|
||||
github.com/shopspring/decimal v1.3.1
|
||||
github.com/spf13/cobra v1.8.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.18.2
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/valyala/fasthttp v1.51.0
|
||||
github.com/xitongsys/parquet-go v1.6.2
|
||||
github.com/xitongsys/parquet-go-source v0.0.0-20240122235623-d6294584ab18
|
||||
go.uber.org/automaxprocs v1.5.3
|
||||
golang.org/x/sync v0.5.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/andybalholm/brotli v1.0.5 // indirect
|
||||
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
|
||||
github.com/apache/thrift v0.20.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.16.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.17.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.25.3 // indirect
|
||||
github.com/aws/smithy-go v1.17.0 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
|
||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
|
||||
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
|
||||
@@ -63,7 +40,6 @@ require (
|
||||
github.com/fsnotify/fsnotify v1.7.0 // indirect
|
||||
github.com/getsentry/sentry-go v0.18.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/google/uuid v1.5.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
@@ -73,7 +49,6 @@ require (
|
||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
|
||||
github.com/jackc/pgx v3.6.2+incompatible // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
github.com/klauspost/compress v1.17.0 // indirect
|
||||
github.com/kr/pretty v0.3.1 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
@@ -84,7 +59,6 @@ require (
|
||||
github.com/mattn/go-runewidth v0.0.15 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.16 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/rivo/uniseg v0.2.0 // indirect
|
||||
@@ -104,7 +78,6 @@ require (
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
|
||||
golang.org/x/sys v0.17.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
runesconfig "github.com/gaze-network/indexer-network/modules/runes/config"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
|
||||
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
|
||||
"github.com/gaze-network/indexer-network/pkg/middleware/requestlogger"
|
||||
"github.com/gaze-network/indexer-network/pkg/reportingclient"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/spf13/viper"
|
||||
@@ -63,7 +65,9 @@ type Modules struct {
|
||||
}
|
||||
|
||||
type HTTPServerConfig struct {
|
||||
Port int `mapstructure:"port"`
|
||||
Port int `mapstructure:"port"`
|
||||
Logger requestlogger.Config `mapstructure:"logger"`
|
||||
RequestIP requestcontext.WithClientIPConfig `mapstructure:"requestip"`
|
||||
}
|
||||
|
||||
// Parse parse the configuration from environment variables
|
||||
|
||||
@@ -31,6 +31,7 @@ type Processor struct {
|
||||
bitcoinClient btcclient.Contract
|
||||
network common.Network
|
||||
reportingClient *reportingclient.ReportingClient
|
||||
cleanupFuncs []func(context.Context) error
|
||||
|
||||
newRuneEntries map[runes.RuneId]*runes.RuneEntry
|
||||
newRuneEntryStates map[runes.RuneId]*runes.RuneEntry
|
||||
@@ -40,13 +41,14 @@ type Processor struct {
|
||||
newRuneTxs []*entity.RuneTransaction
|
||||
}
|
||||
|
||||
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient) *Processor {
|
||||
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient, cleanupFuncs []func(context.Context) error) *Processor {
|
||||
return &Processor{
|
||||
runesDg: runesDg,
|
||||
indexerInfoDg: indexerInfoDg,
|
||||
bitcoinClient: bitcoinClient,
|
||||
network: network,
|
||||
reportingClient: reportingClient,
|
||||
cleanupFuncs: cleanupFuncs,
|
||||
newRuneEntries: make(map[runes.RuneId]*runes.RuneEntry),
|
||||
newRuneEntryStates: make(map[runes.RuneId]*runes.RuneEntry),
|
||||
newOutPointBalances: make(map[wire.OutPoint][]*entity.OutPointBalance),
|
||||
@@ -228,3 +230,13 @@ func (p *Processor) RevertData(ctx context.Context, from int64) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) Shutdown(ctx context.Context) error {
|
||||
var errs []error
|
||||
for _, cleanup := range p.cleanupFuncs {
|
||||
if err := cleanup(ctx); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return errors.WithStack(errors.Join(errs...))
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
|
||||
runesDg runesdatagateway.RunesDataGateway
|
||||
indexerInfoDg runesdatagateway.IndexerInfoDataGateway
|
||||
)
|
||||
var cleanupFuncs []func(context.Context) error
|
||||
switch strings.ToLower(conf.Modules.Runes.Database) {
|
||||
case "postgresql", "postgres", "pg":
|
||||
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
|
||||
@@ -42,7 +43,10 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
|
||||
}
|
||||
return nil, errors.Wrap(err, "can't create Postgres connection pool")
|
||||
}
|
||||
defer pg.Close()
|
||||
cleanupFuncs = append(cleanupFuncs, func(ctx context.Context) error {
|
||||
pg.Close()
|
||||
return nil
|
||||
})
|
||||
runesRepo := runespostgres.NewRepository(pg)
|
||||
runesDg = runesRepo
|
||||
indexerInfoDg = runesRepo
|
||||
@@ -62,7 +66,7 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
|
||||
return nil, errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
|
||||
}
|
||||
|
||||
processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient)
|
||||
processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient, cleanupFuncs)
|
||||
if err := processor.VerifyStates(ctx); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
@@ -69,8 +69,26 @@ func ParseTag(input interface{}) (Tag, error) {
|
||||
return input, nil
|
||||
case uint128.Uint128:
|
||||
return Tag(input), nil
|
||||
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
|
||||
return Tag(uint128.From64(input.(uint64))), nil
|
||||
case int:
|
||||
return Tag(uint128.From64(uint64(input))), nil
|
||||
case int8:
|
||||
return Tag(uint128.From64(uint64(input))), nil
|
||||
case int16:
|
||||
return Tag(uint128.From64(uint64(input))), nil
|
||||
case int32:
|
||||
return Tag(uint128.From64(uint64(input))), nil
|
||||
case int64:
|
||||
return Tag(uint128.From64(uint64(input))), nil
|
||||
case uint:
|
||||
return Tag(uint128.From64(uint64(input))), nil
|
||||
case uint8:
|
||||
return Tag(uint128.From64(uint64(input))), nil
|
||||
case uint16:
|
||||
return Tag(uint128.From64(uint64(input))), nil
|
||||
case uint32:
|
||||
return Tag(uint128.From64(uint64(input))), nil
|
||||
case uint64:
|
||||
return Tag(uint128.From64(input)), nil
|
||||
case big.Int:
|
||||
u128, err := uint128.FromBig(&input)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
package btcutils
|
||||
|
||||
import "github.com/shopspring/decimal"
|
||||
|
||||
const (
|
||||
BitcoinDecimals = 8
|
||||
)
|
||||
|
||||
// satsUnit is 10^8
|
||||
var satsUnit = decimal.New(1, BitcoinDecimals)
|
||||
|
||||
// BitcoinToSatoshi converts a amount in Bitcoin format to Satoshi format.
|
||||
func BitcoinToSatoshi(v float64) int64 {
|
||||
amount := decimal.NewFromFloat(v)
|
||||
return amount.Mul(satsUnit).IntPart()
|
||||
}
|
||||
|
||||
// SatoshiToBitcoin converts a amount in Satoshi format to Bitcoin format.
|
||||
func SatoshiToBitcoin(v int64) float64 {
|
||||
return decimal.New(v, -BitcoinDecimals).InexactFloat64()
|
||||
}
|
||||
@@ -1,39 +0,0 @@
|
||||
package btcutils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSatoshiConversion(t *testing.T) {
|
||||
testcases := []struct {
|
||||
sats float64
|
||||
btc int64
|
||||
}{
|
||||
{2.29980951, 229980951},
|
||||
{1.29609085, 129609085},
|
||||
{1.2768897, 127688970},
|
||||
{0.62518296, 62518296},
|
||||
{0.29998462, 29998462},
|
||||
{0.1251, 12510000},
|
||||
{0.02016011, 2016011},
|
||||
{0.0198473, 1984730},
|
||||
{0.0051711, 517110},
|
||||
{0.0012, 120000},
|
||||
{7e-05, 7000},
|
||||
{3.835e-05, 3835},
|
||||
{1.962e-05, 1962},
|
||||
}
|
||||
for _, testcase := range testcases {
|
||||
t.Run(fmt.Sprintf("BtcToSats/%v", testcase.sats), func(t *testing.T) {
|
||||
require.NotEqual(t, testcase.btc, int64(testcase.sats*1e8), "Testcase value should have precision error")
|
||||
assert.Equal(t, testcase.btc, BitcoinToSatoshi(testcase.sats))
|
||||
})
|
||||
t.Run(fmt.Sprintf("SatsToBtc/%v", testcase.sats), func(t *testing.T) {
|
||||
assert.Equal(t, testcase.sats, SatoshiToBitcoin(testcase.btc))
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"encoding/hex"
|
||||
"strings"
|
||||
|
||||
"github.com/Cleverse/go-utilities/utils"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/cockroachdb/errors"
|
||||
)
|
||||
@@ -13,9 +12,6 @@ const (
|
||||
witnessSeparator = " "
|
||||
)
|
||||
|
||||
// CoinbaseWitness is the witness data for a coinbase transaction.
|
||||
var CoinbaseWitness = utils.Must(WitnessFromHex([]string{"0000000000000000000000000000000000000000000000000000000000000000"}))
|
||||
|
||||
// WitnessToHex formats the passed witness stack as a slice of hex-encoded strings.
|
||||
func WitnessToHex(witness wire.TxWitness) []string {
|
||||
if len(witness) == 0 {
|
||||
@@ -41,10 +37,7 @@ func WitnessToString(witness wire.TxWitness) string {
|
||||
|
||||
// WitnessFromHex parses the passed slice of hex-encoded strings into a witness stack.
|
||||
func WitnessFromHex(witnesses []string) (wire.TxWitness, error) {
|
||||
// NOTE: some witness from bitcoin node are empty and some are nil(most are nil), it's not clear why.
|
||||
// For now, we will return nil for both cases.
|
||||
if len(witnesses) == 0 {
|
||||
// return wire.TxWitness{}, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
||||
7
pkg/middleware/requestcontext/PROXY-IP.md
Normal file
7
pkg/middleware/requestcontext/PROXY-IP.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Proxies IP Range Resources
|
||||
|
||||
- Cloudflare - https://www.cloudflare.com/ips/
|
||||
- GCP Load Balancer - https://cloud.google.com/load-balancing/docs/health-check-concepts#ip-ranges
|
||||
- GCP Compute Engine, Customer-usable external IP address ranges - https://www.gstatic.com/ipranges/cloud.json
|
||||
- Other GCP Services - https://cloud.google.com/compute/docs/faq#networking
|
||||
- Other Resources - https://github.com/lord-alfred/ipranges
|
||||
21
pkg/middleware/requestcontext/errors.go
Normal file
21
pkg/middleware/requestcontext/errors.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package requestcontext
|
||||
|
||||
// requestcontextError implements error interface
|
||||
var _ error = requestcontextError{}
|
||||
|
||||
type requestcontextError struct {
|
||||
err error
|
||||
status int
|
||||
message string
|
||||
}
|
||||
|
||||
func (r requestcontextError) Error() string {
|
||||
if r.err != nil {
|
||||
return r.err.Error()
|
||||
}
|
||||
return r.message
|
||||
}
|
||||
|
||||
func (r requestcontextError) Unwrap() error {
|
||||
return r.err
|
||||
}
|
||||
44
pkg/middleware/requestcontext/requestcontext.go
Normal file
44
pkg/middleware/requestcontext/requestcontext.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package requestcontext
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
type Response struct {
|
||||
Result any `json:"result"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type Option func(ctx context.Context, c *fiber.Ctx) (context.Context, error)
|
||||
|
||||
func New(opts ...Option) fiber.Handler {
|
||||
return func(c *fiber.Ctx) error {
|
||||
var err error
|
||||
ctx := c.UserContext()
|
||||
for i, opt := range opts {
|
||||
ctx, err = opt(ctx, c)
|
||||
if err != nil {
|
||||
rErr := requestcontextError{}
|
||||
if errors.As(err, &rErr) {
|
||||
return c.Status(rErr.status).JSON(Response{Error: rErr.message})
|
||||
}
|
||||
|
||||
logger.ErrorContext(ctx, "failed to extract request context",
|
||||
err,
|
||||
slog.String("event", "requestcontext/error"),
|
||||
slog.String("module", "requestcontext"),
|
||||
slog.Int("optionIndex", i),
|
||||
)
|
||||
return c.Status(http.StatusInternalServerError).JSON(Response{Error: "internal server error"})
|
||||
}
|
||||
}
|
||||
c.SetUserContext(ctx)
|
||||
return c.Next()
|
||||
}
|
||||
}
|
||||
150
pkg/middleware/requestcontext/with_clientip.go
Normal file
150
pkg/middleware/requestcontext/with_clientip.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package requestcontext
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"net"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
type clientIPKey struct{}
|
||||
|
||||
type WithClientIPConfig struct {
|
||||
// [Optional] TrustedProxiesIP is a list of all proxies IP ranges that's between the server and the client.
|
||||
//
|
||||
// If it's provided, it will walk backwards from the last IP in `X-Forwarded-For` header
|
||||
// and use first IP that's not trusted proxy(not in the given IP ranges.)
|
||||
//
|
||||
// **If you want to use this option, you should provide all of probable proxies IP ranges.**
|
||||
//
|
||||
// This is lowest priority.
|
||||
TrustedProxiesIP []string `env:"TRUSTED_PROXIES_IP" mapstructure:"trusted_proxies_ip"`
|
||||
|
||||
// [Optional] TrustedHeader is a header name for getting client IP. (e.g. X-Real-IP, CF-Connecting-IP, etc.)
|
||||
//
|
||||
// This is highest priority, it will ignore rest of the options if it's provided.
|
||||
TrustedHeader string `env:"TRUSTED_HEADER" mapstructure:"trusted_proxies_header"`
|
||||
|
||||
// EnableRejectMalformedRequest return 403 Forbidden if the request is from proxies, but can't extract client IP
|
||||
EnableRejectMalformedRequest bool `env:"ENABLE_REJECT_MALFORMED_REQUEST" envDefault:"false" mapstructure:"enable_reject_malformed_request"`
|
||||
}
|
||||
|
||||
// WithClientIP setup client IP context with XFF Spoofing prevention support.
|
||||
//
|
||||
// If request is from proxies, it will use first IP from `X-Forwarded-For` header by default.
|
||||
func WithClientIP(config WithClientIPConfig) Option {
|
||||
var trustedProxies trustedProxy
|
||||
if len(config.TrustedProxiesIP) > 0 {
|
||||
proxy, err := newTrustedProxy(config.TrustedProxiesIP)
|
||||
if err != nil {
|
||||
logger.Panic("Failed to parse trusted proxies", err)
|
||||
}
|
||||
trustedProxies = proxy
|
||||
}
|
||||
|
||||
return func(ctx context.Context, c *fiber.Ctx) (context.Context, error) {
|
||||
// Extract client IP from given header
|
||||
if config.TrustedHeader != "" {
|
||||
headerIP := c.Get(config.TrustedHeader)
|
||||
|
||||
// validate ip from header
|
||||
if ip := net.ParseIP(headerIP); ip != nil {
|
||||
return context.WithValue(ctx, clientIPKey{}, headerIP), nil
|
||||
}
|
||||
}
|
||||
|
||||
// Extract client IP from XFF header
|
||||
rawIPs := c.IPs()
|
||||
ips := parseIPs(rawIPs)
|
||||
|
||||
// If the request is directly from client, we can use direct remote IP address
|
||||
if len(ips) == 0 {
|
||||
return context.WithValue(ctx, clientIPKey{}, c.IP()), nil
|
||||
}
|
||||
|
||||
// Walk back and find first IP that's not trusted proxy
|
||||
if len(trustedProxies) > 0 {
|
||||
for i := len(ips) - 1; i >= 0; i-- {
|
||||
if !trustedProxies.IsTrusted(ips[i]) {
|
||||
return context.WithValue(ctx, clientIPKey{}, ips[i].String()), nil
|
||||
}
|
||||
}
|
||||
|
||||
// If all IPs are trusted proxies, return first IP in XFF header
|
||||
return context.WithValue(ctx, clientIPKey{}, rawIPs[0]), nil
|
||||
}
|
||||
|
||||
// Finally, if we can't extract client IP, return forbidden
|
||||
if config.EnableRejectMalformedRequest {
|
||||
logger.WarnContext(ctx, "IP Spoofing detected, returning 403 Forbidden",
|
||||
slog.String("event", "requestcontext/ip_spoofing_detected"),
|
||||
slog.String("module", "requestcontext/with_clientip"),
|
||||
slog.String("ip", c.IP()),
|
||||
slog.Any("ips", rawIPs),
|
||||
)
|
||||
return nil, requestcontextError{
|
||||
status: fiber.StatusForbidden,
|
||||
message: "not allowed to access",
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback to first IP in XFF header
|
||||
return context.WithValue(ctx, clientIPKey{}, rawIPs[0]), nil
|
||||
}
|
||||
}
|
||||
|
||||
// GetClientIP get clientIP from context. If not found, return empty string
|
||||
//
|
||||
// Warning: Request context should be setup before using this function
|
||||
func GetClientIP(ctx context.Context) string {
|
||||
if ip, ok := ctx.Value(clientIPKey{}).(string); ok {
|
||||
return ip
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type trustedProxy []*net.IPNet
|
||||
|
||||
// newTrustedProxy create a new trusted proxies instance for preventing IP spoofing (XFF Attacks)
|
||||
func newTrustedProxy(ranges []string) (trustedProxy, error) {
|
||||
nets, err := parseCIDRs(ranges)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
return trustedProxy(nets), nil
|
||||
}
|
||||
|
||||
func (t trustedProxy) IsTrusted(ip net.IP) bool {
|
||||
if ip == nil {
|
||||
return false
|
||||
}
|
||||
for _, r := range t {
|
||||
if r.Contains(ip) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func parseCIDRs(ranges []string) ([]*net.IPNet, error) {
|
||||
nets := make([]*net.IPNet, 0, len(ranges))
|
||||
for _, r := range ranges {
|
||||
_, ipnet, err := net.ParseCIDR(r)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to parse CIDR for %q", r)
|
||||
}
|
||||
nets = append(nets, ipnet)
|
||||
}
|
||||
return nets, nil
|
||||
}
|
||||
|
||||
func parseIPs(ranges []string) []net.IP {
|
||||
ip := make([]net.IP, 0, len(ranges))
|
||||
for _, r := range ranges {
|
||||
ip = append(ip, net.ParseIP(r))
|
||||
}
|
||||
return ip
|
||||
}
|
||||
47
pkg/middleware/requestcontext/with_requestid.go
Normal file
47
pkg/middleware/requestcontext/with_requestid.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package requestcontext
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/fiber/v2/middleware/requestid"
|
||||
fiberutils "github.com/gofiber/fiber/v2/utils"
|
||||
)
|
||||
|
||||
type requestIdKey struct{}
|
||||
|
||||
// GetRequestId get requestId from context. If not found, return empty string
|
||||
//
|
||||
// Warning: Request context should be setup before using this function
|
||||
func GetRequestId(ctx context.Context) string {
|
||||
if id, ok := ctx.Value(requestIdKey{}).(string); ok {
|
||||
return id
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func WithRequestId() Option {
|
||||
return func(ctx context.Context, c *fiber.Ctx) (context.Context, error) {
|
||||
// Try to get id from fiber context.
|
||||
requestId, ok := c.Locals(requestid.ConfigDefault.ContextKey).(string)
|
||||
if !ok || requestId == "" {
|
||||
// Try to get id from request, else we generate one
|
||||
requestId = c.Get(requestid.ConfigDefault.Header, fiberutils.UUID())
|
||||
|
||||
// Set new id to response header
|
||||
c.Set(requestid.ConfigDefault.Header, requestId)
|
||||
|
||||
// Add the request ID to locals (fasthttp UserValue storage)
|
||||
c.Locals(requestid.ConfigDefault.ContextKey, requestId)
|
||||
}
|
||||
|
||||
// Add the request ID to context
|
||||
ctx = context.WithValue(ctx, requestIdKey{}, requestId)
|
||||
|
||||
// Add the requuest ID to context logger
|
||||
ctx = logger.WithContext(ctx, "requestId", requestId)
|
||||
|
||||
return ctx, nil
|
||||
}
|
||||
}
|
||||
115
pkg/middleware/requestlogger/requestlogger.go
Normal file
115
pkg/middleware/requestlogger/requestlogger.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package requestlogger
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/gaze-network/indexer-network/pkg/logger"
|
||||
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
WithRequestHeader bool `env:"REQUEST_HEADER" envDefault:"false" mapstructure:"request_header"`
|
||||
WithRequestQuery bool `env:"REQUEST_QUERY" envDefault:"false" mapstructure:"request_query"`
|
||||
Disable bool `env:"DISABLE" envDefault:"false" mapstructure:"disable"` // Disable logger level `INFO`
|
||||
HiddenRequestHeaders []string `env:"HIDDEN_REQUEST_HEADERS" mapstructure:"hidden_request_headers"`
|
||||
}
|
||||
|
||||
// New setup request context and information
|
||||
func New(config Config) fiber.Handler {
|
||||
hiddenRequestHeaders := make(map[string]struct{}, len(config.HiddenRequestHeaders))
|
||||
for _, header := range config.HiddenRequestHeaders {
|
||||
hiddenRequestHeaders[strings.TrimSpace(strings.ToLower(header))] = struct{}{}
|
||||
}
|
||||
return func(c *fiber.Ctx) error {
|
||||
start := time.Now()
|
||||
|
||||
// Continue stack
|
||||
err := c.Next()
|
||||
|
||||
end := time.Now()
|
||||
latency := end.Sub(start)
|
||||
status := c.Response().StatusCode()
|
||||
|
||||
baseAttrs := []slog.Attr{
|
||||
slog.String("event", "api_request"),
|
||||
slog.Int64("latency", latency.Milliseconds()),
|
||||
slog.String("latencyHuman", latency.String()),
|
||||
}
|
||||
|
||||
// prep request attributes
|
||||
requestAttributes := []slog.Attr{
|
||||
slog.Time("time", start),
|
||||
slog.String("method", c.Method()),
|
||||
slog.String("host", c.Hostname()),
|
||||
slog.String("path", c.Path()),
|
||||
slog.String("route", c.Route().Path),
|
||||
slog.String("ip", requestcontext.GetClientIP(c.UserContext())),
|
||||
slog.String("remoteIP", c.Context().RemoteIP().String()),
|
||||
slog.Any("x-forwarded-for", c.IPs()),
|
||||
slog.String("user-agent", string(c.Context().UserAgent())),
|
||||
slog.Any("params", c.AllParams()),
|
||||
slog.Int("length", len((c.Body()))),
|
||||
}
|
||||
|
||||
// prep response attributes
|
||||
responseAttributes := []slog.Attr{
|
||||
slog.Time("time", end),
|
||||
slog.Int("status", status),
|
||||
slog.Int("length", len(c.Response().Body())),
|
||||
}
|
||||
|
||||
// request query
|
||||
if config.WithRequestQuery {
|
||||
requestAttributes = append(requestAttributes, slog.String("query", string(c.Request().URI().QueryString())))
|
||||
}
|
||||
|
||||
// request headers
|
||||
if config.WithRequestHeader {
|
||||
kv := []any{}
|
||||
|
||||
for k, v := range c.GetReqHeaders() {
|
||||
if _, found := hiddenRequestHeaders[strings.ToLower(k)]; found {
|
||||
continue
|
||||
}
|
||||
kv = append(kv, slog.Any(k, v))
|
||||
}
|
||||
|
||||
requestAttributes = append(requestAttributes, slog.Group("header", kv...))
|
||||
}
|
||||
|
||||
level := slog.LevelInfo
|
||||
if err != nil || status >= http.StatusInternalServerError {
|
||||
level = slog.LevelError
|
||||
|
||||
// error attributes
|
||||
logErr := err
|
||||
if logErr == nil {
|
||||
logErr = fiber.NewError(status)
|
||||
}
|
||||
baseAttrs = append(baseAttrs, slog.Any("error", logErr))
|
||||
}
|
||||
|
||||
if config.Disable && level == slog.LevelInfo {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
logger.LogAttrs(c.UserContext(), level, "Request Completed", append([]slog.Attr{
|
||||
{
|
||||
Key: "request",
|
||||
Value: slog.GroupValue(requestAttributes...),
|
||||
},
|
||||
{
|
||||
Key: "response",
|
||||
Value: slog.GroupValue(responseAttributes...),
|
||||
},
|
||||
}, baseAttrs...)...,
|
||||
)
|
||||
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
@@ -1,133 +0,0 @@
|
||||
// nolint: wrapcheck
|
||||
package parquetutils
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/xitongsys/parquet-go/source"
|
||||
)
|
||||
|
||||
var (
|
||||
// Make sure Buffer implements the ParquetFile interface.
|
||||
_ source.ParquetFile = (*Buffer)(nil)
|
||||
|
||||
// Make sure Buffer implements the io.WriterAt interface.
|
||||
_ io.WriterAt = (*Buffer)(nil)
|
||||
)
|
||||
|
||||
// Buffer allows reading parquet messages from a memory buffer.
|
||||
type Buffer struct {
|
||||
buf []byte
|
||||
loc int
|
||||
m sync.Mutex
|
||||
}
|
||||
|
||||
// NewBuffer creates a new in memory parquet buffer.
|
||||
func NewBuffer() *Buffer {
|
||||
return &Buffer{buf: make([]byte, 0, 512)}
|
||||
}
|
||||
|
||||
// NewBufferFrom creates new in memory parquet buffer from the given bytes.
|
||||
// It uses the provided slice as its buffer.
|
||||
func NewBufferFrom(s []byte) *Buffer {
|
||||
return &Buffer{
|
||||
buf: s,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Buffer) Create(string) (source.ParquetFile, error) {
|
||||
return &Buffer{buf: make([]byte, 0, 512)}, nil
|
||||
}
|
||||
|
||||
func (b *Buffer) Open(string) (source.ParquetFile, error) {
|
||||
return NewBufferFrom(b.Bytes()), nil
|
||||
}
|
||||
|
||||
// Seek seeks in the underlying memory buffer.
|
||||
func (b *Buffer) Seek(offset int64, whence int) (int64, error) {
|
||||
newLoc := b.loc
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
newLoc = int(offset)
|
||||
case io.SeekCurrent:
|
||||
newLoc += int(offset)
|
||||
case io.SeekEnd:
|
||||
newLoc = len(b.buf) + int(offset)
|
||||
default:
|
||||
return int64(b.loc), errors.New("Seek: invalid whence")
|
||||
}
|
||||
|
||||
if newLoc < 0 {
|
||||
return int64(b.loc), errors.New("Seek: invalid offset")
|
||||
}
|
||||
|
||||
if newLoc > len(b.buf) {
|
||||
newLoc = len(b.buf)
|
||||
}
|
||||
|
||||
b.loc = newLoc
|
||||
return int64(b.loc), nil
|
||||
}
|
||||
|
||||
// Read reads data form BufferFile into p.
|
||||
func (b *Buffer) Read(p []byte) (n int, err error) {
|
||||
n = copy(p, b.buf[b.loc:len(b.buf)])
|
||||
b.loc += n
|
||||
|
||||
if b.loc == len(b.buf) {
|
||||
return n, io.EOF
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Write writes data from p into BufferFile.
|
||||
func (b *Buffer) Write(p []byte) (n int, err error) {
|
||||
n, err = b.WriteAt(p, int64(b.loc))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
b.loc += n
|
||||
return
|
||||
}
|
||||
|
||||
// WriteAt writes a slice of bytes to a buffer starting at the position provided
|
||||
// The number of bytes written will be returned, or error. Can overwrite previous
|
||||
// written slices if the write ats overlap.
|
||||
func (b *Buffer) WriteAt(p []byte, pos int64) (n int, err error) {
|
||||
b.m.Lock()
|
||||
defer b.m.Unlock()
|
||||
pLen := len(p)
|
||||
expLen := pos + int64(pLen)
|
||||
if int64(len(b.buf)) < expLen {
|
||||
if int64(cap(b.buf)) < expLen {
|
||||
newBuf := make([]byte, expLen)
|
||||
copy(newBuf, b.buf)
|
||||
b.buf = newBuf
|
||||
}
|
||||
b.buf = b.buf[:expLen]
|
||||
}
|
||||
copy(b.buf[pos:], p)
|
||||
return pLen, nil
|
||||
}
|
||||
|
||||
// Close is a no-op for a memory buffer.
|
||||
func (*Buffer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Bytes returns the underlying buffer bytes.
|
||||
func (b *Buffer) Bytes() []byte {
|
||||
return b.buf
|
||||
}
|
||||
|
||||
// Reset resets the buffer to be empty,
|
||||
// but it retains the underlying storage for use by future writes.
|
||||
func (b *Buffer) Reset() {
|
||||
b.m.Lock()
|
||||
defer b.m.Unlock()
|
||||
b.buf = b.buf[:0]
|
||||
b.loc = 0
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
package parquetutils
|
||||
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/xitongsys/parquet-go/reader"
|
||||
"github.com/xitongsys/parquet-go/source"
|
||||
)
|
||||
|
||||
// ReaderConcurrency parallel number of file readers.
|
||||
var ReaderConcurrency int64 = 8
|
||||
|
||||
// ReadAll reads all records from the parquet file.
|
||||
func ReadAll[T any](sourceFile source.ParquetFile) ([]T, error) {
|
||||
r, err := reader.NewParquetReader(sourceFile, new(T), ReaderConcurrency)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create parquet reader")
|
||||
}
|
||||
defer r.ReadStop()
|
||||
|
||||
data := make([]T, r.GetNumRows())
|
||||
if err = r.Read(&data); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to read parquet data")
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
Reference in New Issue
Block a user