Compare commits

..

16 Commits

Author SHA1 Message Date
Thanee Charattrakool
51fd1f6636 feat: move requestip config to http config (#25) 2024-06-12 22:08:03 +07:00
Thanee Charattrakool
a7bc6257c4 feat(api): add request context and logger middleware (#24)
* feat(api): add request context and logger middleware

* feat(api): add cors and favicon middleware

* fix: solve wrapcheck linter warning

* feat: configurable hidden request headers
2024-06-12 21:47:29 +07:00
gazenw
3bb7500c87 feat: update docker version 2024-06-07 13:55:55 +07:00
Gaze
8c92893d4a feat: release v0.2.1 2024-05-31 01:16:34 +07:00
Nut Pinyo
d84e30ed11 fix: implement Shutdown() for processors (#22) 2024-05-31 01:13:12 +07:00
Thanee Charattrakool
d9fa217977 feat: use current indexed block for first prev block (#23)
* feat: use current indexed block for first prev block

* fix: forgot to set next prev header
2024-05-31 01:11:37 +07:00
Thanee Charattrakool
709b00ec0e build: add Docker cache mound for Go modules (#21)
* build: add cache mount for go modules

* doc(docker): update TZ description

* build: use entrypoint instead cmd exec

* build: add dockerignore

* build: add modules dir to image for migration command

* build: update dockerignore

* doc: fix typo

Co-authored-by: gazenw <163862510+gazenw@users.noreply.github.com>

---------

Co-authored-by: gazenw <163862510+gazenw@users.noreply.github.com>
2024-05-23 17:10:03 +07:00
gazenw
50ae103502 doc: update docker compose example 2024-05-21 14:44:59 +07:00
gazenw
c0242bd555 Update README.md 2024-05-20 18:37:32 +07:00
gazenw
6d4f1d0e87 Release v0.2.0
Release v0.2.0
2024-05-16 14:50:03 +07:00
Gaze
b9fac74026 Merge remote-tracking branch 'origin/main' into develop
# Conflicts:
#	README.md
#	cmd/cmd_run.go
2024-05-16 14:37:37 +07:00
Nut Pinyo
62ecd7ea49 fix: runes tag parsing (#19) 2024-05-16 13:54:13 +07:00
gazenw
f8fbd67bd8 fix: invalid pgx version (#18)
Co-authored-by: Planxnx <planxnx@users.noreply.github.com>
2024-05-15 03:17:54 +07:00
gazenw
cc2649dd64 Update README.md: fix datasource 2024-05-13 14:19:32 +07:00
gazenw
d96370454b Remove bitcoin module (#15)
* fix: remove bitcoin module

* fix: remove more config
2024-05-13 14:18:45 +07:00
gazenw
371d1fe008 doc: update README.md 2024-05-08 14:13:21 +07:00
28 changed files with 497 additions and 1979 deletions

18
.dockerignore Normal file
View File

@@ -0,0 +1,18 @@
.git
.gitignore
.github
.vscode
**/*.md
**/*.log
.DS_Store
# Docker
Dockerfile
.dockerignore
docker-compose.yml
# Go
.golangci.yaml
cmd.local
config.*.y*ml
config.y*ml

View File

@@ -3,15 +3,15 @@ FROM golang:1.22 as builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
RUN --mount=type=cache,target=/go/pkg/mod/ go mod download
COPY ./ ./
ENV GOOS=linux
ENV CGO_ENABLED=0
RUN go build \
-o main ./main.go
RUN --mount=type=cache,target=/go/pkg/mod/ \
go build -o main ./main.go
FROM alpine:latest
@@ -19,9 +19,10 @@ WORKDIR /app
RUN apk --no-cache add ca-certificates tzdata
COPY --from=builder /app/main .
COPY --from=builder /app/modules ./modules
# You can set `TZ` environment variable to change the timezone
# You can set TZ identifier to change the timezone, See https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List
# ENV TZ=US/Central
CMD ["/app/main", "run"]
ENTRYPOINT ["/app/main"]

View File

@@ -25,7 +25,7 @@ This allows developers to focus on what **truly** matters: Meta-protocol indexin
### 1. Runes
The Runes Indexer is our first meta-protocol indexer. It indexes Runes states, transactions, runestones, and balances using Bitcoin transactions.
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://documenter.getpostman.com/view/28396285/2sA3Bn7Cxr) for full details.
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://api-docs.gaze.network) for full details.
## Installation
@@ -51,8 +51,6 @@ Here is our minimum database disk space requirement for each module.
| ------ | -------------------------- | ---------------------------- |
| Runes | 10 GB | 150 GB |
Here is our minimum database disk space requirement for each module.
#### 4. Prepare `config.yaml` file.
```yaml
@@ -88,7 +86,7 @@ modules:
# Configuration options for Runes module. Can be removed if not used.
runes:
database: "postgres" # Database to store Runes data. current supported databases: "postgres"
datasource: "database" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node".
datasource: "bitcoin-node" # Data source to be used for Bitcoin data. current supported data sources: "bitcoin-node".
api_handlers: # API handlers to enable. current supported handlers: "http"
- http
postgres:
@@ -108,14 +106,14 @@ We will be using `docker-compose` for our installation guide. Make sure the `doc
# docker-compose.yaml
services:
gaze-indexer:
image: ghcr.io/gaze-network/gaze-indexer:v1.0.0
image: ghcr.io/gaze-network/gaze-indexer:v0.2.1
container_name: gaze-indexer
restart: unless-stopped
ports:
- 8080:8080 # Expose HTTP server port to host
volumes:
- "./config.yaml:/app/config.yaml" # mount config.yaml file to the container as "/app/config.yaml"
command: ["/app/main", "run", "--runes"] # Put module flags after "run" commands to select which modules to run.
command: ["/app/main", "run", "--modules", "runes"] # Put module flags after "run" commands to select which modules to run.
```
### Install from source

View File

@@ -22,10 +22,15 @@ import (
"github.com/gaze-network/indexer-network/pkg/errorhandler"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
"github.com/gaze-network/indexer-network/pkg/middleware/requestlogger"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/compress"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/favicon"
fiberrecover "github.com/gofiber/fiber/v2/middleware/recover"
"github.com/gofiber/fiber/v2/middleware/requestid"
"github.com/samber/do/v2"
"github.com/samber/lo"
"github.com/spf13/cobra"
@@ -135,6 +140,14 @@ func runHandler(cmd *cobra.Command, _ []string) error {
ErrorHandler: errorhandler.NewHTTPErrorHandler(),
})
app.
Use(favicon.New()).
Use(cors.New()).
Use(requestid.New()).
Use(requestcontext.New(
requestcontext.WithRequestId(),
requestcontext.WithClientIP(conf.HTTPServer.RequestIP),
)).
Use(requestlogger.New(conf.HTTPServer.Logger)).
Use(fiberrecover.New(fiberrecover.Config{
EnableStackTrace: true,
StackTraceHandler: func(c *fiber.Ctx, e interface{}) {

View File

@@ -23,6 +23,14 @@ reporting:
# HTTP server configuration options.
http_server:
port: 8080 # Port to run the HTTP server on for modules with HTTP API handlers.
logger:
disable: false # disable logger if logger level is `INFO`
request_header: false
request_query: false
requestip: # Client IP extraction configuration options. This is unnecessary if you don't care about the real client IP or if you're not using a reverse proxy.
trusted_proxies_ip: # Cloudflare, GCP Public LB. See: server/internal/middleware/requestcontext/PROXY-IP.md
trusted_proxies_header: # X-Real-IP, CF-Connecting-IP
enable_reject_malformed_request: false # return 403 if request is malformed (invalid IP)
# Meta-protocol modules configuration options.
modules:

View File

@@ -1,5 +1,5 @@
package constants
const (
Version = "v0.0.1"
Version = "v0.2.1"
)

View File

@@ -1,688 +0,0 @@
// AWS Public Blockchain Datasource
// - https://registry.opendata.aws/aws-public-blockchain
// - https://github.com/aws-solutions-library-samples/guidance-for-digital-assets-on-aws
//
// To setup your own data source, see: https://github.com/aws-solutions-library-samples/guidance-for-digital-assets-on-aws/blob/main/analytics/producer/README.md
package datasources
import (
"cmp"
"context"
"encoding/hex"
"fmt"
"io"
"log/slog"
"math"
"slices"
"strconv"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/subscription"
"github.com/gaze-network/indexer-network/pkg/btcutils"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/parquetutils"
"github.com/samber/lo"
parquettypes "github.com/xitongsys/parquet-go/types"
"golang.org/x/sync/errgroup"
)
const (
awsPublicDataS3Region = "us-east-2"
awsPublicDataS3Bucket = "aws-public-blockchain"
defaultAWSPublicDataDownloadConcurrency = 8
)
var firstBitcoinTimestamp = time.Date(2009, time.January, 3, 18, 15, 5, 0, time.UTC)
// Make sure to implement the BitcoinDatasource interface
var _ Datasource[*types.Block] = (*AWSPublicDataDatasource)(nil)
type AWSPublicDataDatasourceConfig struct {
// The number of goroutines to spin up in parallel when downloading parts.
// Concurrency of 1 will download the parts sequentially.
// Default is 8.
//
// CAUTION: High concurrency with low part size can reduce the time to download file,
// but it can also increase the memory usage.
DownloadConcurrency int
// The size (in bytes) to request from S3 for each part.
// Default is depend on the concurrency and file size (part size = file size / concurrency).
//
// CAUTION: High concurrency with low part size can reduce the time to download file,
// but it can also increase the memory usage.
DownloadPartSize int64 `mapstructure:"download_part_size"`
}
type AWSPublicDataDatasource struct {
btcDatasource Datasource[*types.Block]
s3Client *s3.Client
s3Bucket string
config AWSPublicDataDatasourceConfig
}
func NewAWSPublicData(btcDatasource Datasource[*types.Block], conf AWSPublicDataDatasourceConfig) *AWSPublicDataDatasource {
sdkConfig, err := config.LoadDefaultConfig(context.Background())
if err != nil {
logger.Panic("Can't load AWS SDK user config", slogx.Error(err), slog.String("package", "datasources"))
}
// TODO: support user defined config (self-hosted s3 bucket)
s3client := s3.NewFromConfig(sdkConfig, func(o *s3.Options) {
o.Region = awsPublicDataS3Region
o.Credentials = aws.AnonymousCredentials{}
})
if conf.DownloadConcurrency <= 0 {
conf.DownloadConcurrency = defaultAWSPublicDataDownloadConcurrency
}
return &AWSPublicDataDatasource{
btcDatasource: btcDatasource,
s3Client: s3client,
s3Bucket: awsPublicDataS3Bucket,
config: conf,
}
}
func (d AWSPublicDataDatasource) Name() string {
return fmt.Sprintf("aws_public_data/%s", d.btcDatasource.Name())
}
func (d *AWSPublicDataDatasource) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
ch := make(chan []*types.Block)
subscription, err := d.FetchAsync(ctx, from, to, ch)
if err != nil {
return nil, errors.WithStack(err)
}
defer subscription.Unsubscribe()
blocks := make([]*types.Block, 0)
for {
select {
case b, ok := <-ch:
if !ok {
return blocks, nil
}
blocks = append(blocks, b...)
case <-subscription.Done():
if err := ctx.Err(); err != nil {
return nil, errors.Wrap(err, "context done")
}
return blocks, nil
case err := <-subscription.Err():
if err != nil {
return nil, errors.Wrap(err, "got error while fetch async")
}
return blocks, nil
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "context done")
}
}
}
func (d *AWSPublicDataDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
ctx = logger.WithContext(ctx,
slogx.String("package", "datasources"),
slogx.String("datasource", d.Name()),
)
start, end, skip, err := d.prepareRange(ctx, from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
}
subscription := subscription.NewSubscription(ch)
if skip {
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
return subscription.Client(), nil
}
startFiles, err := d.listBlocksFilesByDate(ctx, start.Timestamp)
if err != nil {
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
return nil, errors.Wrap(err, "failed to list files by date")
}
// supported only merged blocks files
startFiles = lo.Filter(startFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
// use other datasource instead of s3 if there's no supported data
if len(startFiles) == 0 {
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
s, err := d.btcDatasource.FetchAsync(ctx, start.Height, end.Height, ch)
return s, errors.WithStack(err)
}
go func() {
defer func() {
// add a bit delay to prevent shutdown before client receive all blocks
time.Sleep(100 * time.Millisecond)
subscription.Unsubscribe()
}()
// loop through each day until reach the end of supported data or within end block date
for ts := start.Timestamp; ts.Before(end.Timestamp.Round(24*time.Hour)) && ts.Before(time.Now()); ts = ts.Add(24 * time.Hour) {
ctx := logger.WithContext(ctx,
slogx.Time("date", ts),
slogx.Int64("date_unix", ts.Unix()),
)
logger.DebugContext(ctx, "Fetching data from AWS S3", slogx.Int64("start", start.Height), slogx.Int64("end", end.Height))
allBlocksFiles, err := d.listBlocksFilesByDate(ctx, ts)
if err != nil {
logger.ErrorContext(ctx, "Failed to list blocks files by date from aws s3", slogx.Error(err))
if err := subscription.SendError(ctx, errors.WithStack(err)); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
allTxsFiles, err := d.listTxsFilesByDate(ctx, ts)
if err != nil {
logger.ErrorContext(ctx, "Failed to list txs files by date from aws s3", slogx.Error(err))
if err := subscription.SendError(ctx, errors.WithStack(err)); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
blocksFiles := lo.Filter(allBlocksFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
txsFiles := lo.Filter(allTxsFiles, func(file awsFile, _ int) bool {
return strings.Contains(file.Key, "part-")
})
logger.DebugContext(ctx, "Found files in AWS S3 bucket",
slogx.Int("files_blocks", len(allBlocksFiles)),
slogx.Int("files_blocks_merged", len(blocksFiles)),
slogx.Int("files_txs_all", len(allTxsFiles)),
slogx.Int("files_txs_merged", len(txsFiles)),
)
// Reach the end of supported data,
// stop fetching data from AWS S3
if len(blocksFiles) == 0 || len(txsFiles) == 0 {
logger.DebugContext(ctx, "No blocks files found, stop fetching data from AWS S3")
return
}
// prevent unexpected error
{
if len(blocksFiles) != 1 {
logger.ErrorContext(ctx, "Unexpected blocks files count, should be 1", slogx.Int("count", len(blocksFiles)))
if err := subscription.SendError(ctx, errors.Wrap(errs.InternalError, "unexpected blocks files count")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
if len(txsFiles) != 1 {
logger.ErrorContext(ctx, "Unexpected txs files count, should be 1", slogx.Int("count", len(txsFiles)))
if err := subscription.SendError(ctx, errors.Wrap(errs.InternalError, "unexpected txs files count")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
}
// TODO: use concurrent stream (max 2 goroutine) to download files then sequentially read parquet files
// to improve performance while not consuming too much memory (increase around 500 MB per goroutine)
var (
// TODO: create []byte pool to reduce alloc ops (reduce GC pressure)
// TODO: use FileSystem for default buffer (can choose memory or disk buffer)
blocksBuffer = parquetutils.NewBuffer()
txsBuffer = parquetutils.NewBuffer()
)
startDownload := time.Now()
if err := d.downloadFile(ctx, blocksFiles[0], blocksBuffer); err != nil {
logger.ErrorContext(ctx, "Failed to download blocks file from AWS S3", slogx.Int("count", len(txsFiles)))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't download blocks file")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
if err := d.downloadFile(ctx, txsFiles[0], txsBuffer); err != nil {
logger.ErrorContext(ctx, "Failed to download blocks file from AWS S3", slogx.Int("count", len(txsFiles)))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't download blocks file")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
logger.DebugContext(ctx, "Downloaded files from AWS S3",
slogx.Duration("duration", time.Since(startDownload)),
slogx.Int("sizes_blocks", len(blocksBuffer.Bytes())),
slogx.Int("sizes_txs", len(txsBuffer.Bytes())),
)
// Read parquet files
startRead := time.Now()
// we can read all blocks data at once because it's small
rawAllBlocks, err := parquetutils.ReadAll[awsBlock](blocksBuffer)
if err != nil {
logger.ErrorContext(ctx, "Failed to read parquet blocks data", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't read parquet blocks data")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
// NOTE: We shouldn't read all txs data at once because it's very huge (up to ~1.5GB memory usage)
// we should read it by chunk and send it to subscription client to reduce memory usage.
// But AWS Public Dataset are not sorted by block number and index,
// so we can't avoid reading all transactions data by skip unnecessary transactions
// or chunk data by block number to reduce memory usage :(
rawAllTxs, err := parquetutils.ReadAll[awsTransaction](blocksBuffer)
if err != nil {
logger.ErrorContext(ctx, "Failed to read parquet txs data", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't read parquet blocks data")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
}
groupRawTxs := lo.GroupBy(rawAllTxs, func(tx awsTransaction) int64 {
return tx.BlockNumber
})
// filter blocks data by height range
rawFilteredBlocks := lo.Filter(rawAllBlocks, func(block awsBlock, _ int) bool {
return block.Number >= start.Height && block.Number <= end.Height
})
slices.SortFunc(rawFilteredBlocks, func(i, j awsBlock) int {
return cmp.Compare(i.Number, j.Number)
})
logger.DebugContext(ctx, "Read parquet files",
slogx.Duration("duration", time.Since(startRead)),
slogx.Int("total_blocks", len(rawAllBlocks)),
slogx.Int("filtered_blocks", len(rawFilteredBlocks)),
slogx.Int("total_txs", len(rawAllTxs)),
slogx.Int("total_txs_grouped", len(groupRawTxs)),
)
blocks := make([]*types.Block, 0, len(rawFilteredBlocks))
for _, rawBlock := range rawFilteredBlocks {
blockHeader, err := rawBlock.ToBlockHeader()
if err != nil {
logger.ErrorContext(ctx, "Failed to convert aws block to type block header", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't convert aws block to type block header")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
txs := make([]*types.Transaction, 0, len(groupRawTxs[blockHeader.Height]))
for _, rawTx := range groupRawTxs[rawBlock.Number] {
tx, err := rawTx.ToTransaction(rawBlock)
if err != nil {
logger.ErrorContext(ctx, "Failed to convert aws transaction to type transaction", slogx.Error(err))
if err := subscription.SendError(ctx, errors.Wrap(err, "can't convert aws transaction to type transaction")); err != nil {
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return
}
txs = append(txs, tx)
}
slices.SortFunc(txs, func(i, j *types.Transaction) int {
return cmp.Compare(i.Index, j.Index)
})
blocks = append(blocks, &types.Block{
Header: blockHeader,
Transactions: txs,
})
}
logger.DebugContext(ctx, "Send blocks to subscription client", slogx.Int("count", len(blocks)))
if err := subscription.Send(ctx, blocks); err != nil {
if errors.Is(err, errs.Closed) {
logger.DebugContext(ctx, "Subscription client closed, can't send", slogx.Error(err))
return
}
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
slogx.Int64("start", blocks[0].Header.Height),
slogx.Int64("end", blocks[len(blocks)-1].Header.Height),
slogx.Error(err),
)
}
}
}()
return subscription.Client(), nil
}
func (d *AWSPublicDataDatasource) GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error) {
header, err := d.btcDatasource.GetBlockHeader(ctx, height)
return header, errors.WithStack(err)
}
func (d *AWSPublicDataDatasource) GetCurrentBlockHeight(ctx context.Context) (int64, error) {
height, err := d.btcDatasource.GetCurrentBlockHeight(ctx)
return height, errors.WithStack(err)
}
func (d *AWSPublicDataDatasource) prepareRange(ctx context.Context, fromHeight, toHeight int64) (startHeader, endHeader types.BlockHeader, skip bool, err error) {
start := fromHeight
end := toHeight
// get current bitcoin block height
latestBlockHeight, err := d.btcDatasource.GetCurrentBlockHeight(ctx)
if err != nil {
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrap(err, "failed to get block count")
}
// set start to genesis block height
if start < 0 {
start = 0
}
// set end to current bitcoin block height if
// - end is -1
// - end is greater that current bitcoin block height
if end < 0 || end > latestBlockHeight {
end = latestBlockHeight
}
// if start is greater than end, skip this round
if start > end {
return types.BlockHeader{}, types.BlockHeader{}, true, nil
}
if err != nil {
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrapf(err, "block %v", end)
}
group, groupctx := errgroup.WithContext(ctx)
group.Go(func() error {
startHeader, err = d.GetBlockHeader(groupctx, start)
return errors.Wrapf(err, "block %v", start)
})
group.Go(func() error {
endHeader, err = d.GetBlockHeader(ctx, end)
return errors.Wrapf(err, "block %v", end)
})
if err := group.Wait(); err != nil {
return types.BlockHeader{}, types.BlockHeader{}, false, errors.Wrap(err, "failed to get block header")
}
return startHeader, endHeader, false, nil
}
func (d *AWSPublicDataDatasource) listFiles(ctx context.Context, prefix string) ([]awsFile, error) {
result, err := d.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.s3Bucket),
Prefix: aws.String(prefix),
})
if err != nil {
return nil, errors.Wrapf(err, "can't list s3 bucket objects for bucket %q and prefix %q", d.s3Bucket, prefix)
}
// filter empty keys
objs := lo.Filter(result.Contents, func(item s3types.Object, _ int) bool { return item.Key != nil })
return lo.Map(objs, func(item s3types.Object, _ int) awsFile {
return awsFile{
Key: *item.Key,
Size: *item.Size,
LastModified: *item.LastModified,
}
}), nil
}
func (d *AWSPublicDataDatasource) listBlocksFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
if date.Before(firstBitcoinTimestamp) {
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
}
prefix := "v1.0/btc/blocks/date=" + date.UTC().Format(time.DateOnly)
files, err := d.listFiles(ctx, prefix)
if err != nil {
return nil, errors.Wrap(err, "failed to list blocks files by date")
}
return files, nil
}
func (d *AWSPublicDataDatasource) listTxsFilesByDate(ctx context.Context, date time.Time) ([]awsFile, error) {
if date.Before(firstBitcoinTimestamp) {
return nil, errors.Wrapf(errs.InvalidArgument, "date %v is before first bitcoin timestamp %v", date, firstBitcoinTimestamp)
}
prefix := "v1.0/btc/transactions/date=" + date.UTC().Format(time.DateOnly)
files, err := d.listFiles(ctx, prefix)
if err != nil {
return nil, errors.Wrap(err, "failed to list txs files by date")
}
return files, nil
}
func (d *AWSPublicDataDatasource) downloadFile(ctx context.Context, f awsFile, w io.WriterAt) error {
downloader := manager.NewDownloader(d.s3Client, func(md *manager.Downloader) {
md.Concurrency = d.config.DownloadConcurrency
md.PartSize = d.config.DownloadPartSize
if md.PartSize <= 0 {
md.PartSize = f.Size / int64(md.Concurrency)
}
})
numBytes, err := downloader.Download(ctx, w, &s3.GetObjectInput{
Bucket: aws.String(d.s3Bucket),
Key: aws.String(f.Key),
})
if err != nil {
return errors.Wrapf(err, "failed to download file for bucket %q and key %q", d.s3Bucket, f.Key)
}
if numBytes < 1 {
return errors.Wrap(errs.NotFound, "got empty file")
}
return nil
}
// TODO: remove unused fields to reduce memory usage
type (
awsBlock struct {
Hash string `parquet:"name=hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Bits string `parquet:"name=bits, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"` // Hex string format
PreviousBlockHash string `parquet:"name=previousblockhash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
MerkleRoot string `parquet:"name=merkle_root, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
CoinbaseParam string `parquet:"name=coinbase_param, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Timestamp string `parquet:"name=timestamp, type=INT96, repetitiontype=OPTIONAL"`
Number int64 `parquet:"name=number, type=INT64, repetitiontype=OPTIONAL"`
Version int64 `parquet:"name=version, type=INT64, repetitiontype=OPTIONAL"`
Nonce int64 `parquet:"name=nonce, type=INT64, repetitiontype=OPTIONAL"`
// MedianTime string `parquet:"name=mediantime, type=INT96, repetitiontype=OPTIONAL"`
// Difficulty float64 `parquet:"name=difficulty, type=DOUBLE, repetitiontype=OPTIONAL"`
// Chainwork string `parquet:"name=chainwork, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Size int64 `parquet:"name=size, type=INT64, repetitiontype=OPTIONAL"`
// Weight int64 `parquet:"name=weight, type=INT64, repetitiontype=OPTIONAL"`
// TransactionCount int64 `parquet:"name=transaction_count, type=INT64, repetitiontype=OPTIONAL"`
// StrippedSize int64 `parquet:"name=stripped_size, type=INT64, repetitiontype=OPTIONAL"`
// Date string `parquet:"name=date, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// LastModified string `parquet:"name=last_modified, type=INT96, repetitiontype=OPTIONAL"`
}
awsTransaction struct {
Hash string `parquet:"name=hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
BlockHash string `parquet:"name=block_hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Outputs []*awsTxOutput `parquet:"name=outputs, type=LIST, repetitiontype=OPTIONAL, valuetype=STRUCT"`
Inputs []*awsTxInput `parquet:"name=inputs, type=LIST, repetitiontype=OPTIONAL, valuetype=STRUCT"`
Version int64 `parquet:"name=version, type=INT64, repetitiontype=OPTIONAL"`
Size int64 `parquet:"name=size, type=INT64, repetitiontype=OPTIONAL"`
BlockNumber int64 `parquet:"name=block_number, type=INT64, repetitiontype=OPTIONAL"`
Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
LockTime int64 `parquet:"name=lock_time, type=INT64, repetitiontype=OPTIONAL"`
IsCoinbase bool `parquet:"name=is_coinbase, type=BOOLEAN, repetitiontype=OPTIONAL"`
// VirtualSize int64 `parquet:"name=virtual_size, type=INT64, repetitiontype=OPTIONAL"`
// InputCount int64 `parquet:"name=input_count, type=INT64, repetitiontype=OPTIONAL"`
// OutputCount int64 `parquet:"name=output_count, type=INT64, repetitiontype=OPTIONAL"`
// OutputValue float64 `parquet:"name=output_value, type=DOUBLE, repetitiontype=OPTIONAL"`
// BlockTimestamp string `parquet:"name=block_timestamp, type=INT96, repetitiontype=OPTIONAL"`
// Date string `parquet:"name=date, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// LastModified string `parquet:"name=last_modified, type=INT96, repetitiontype=OPTIONAL"`
// Fee float64 `parquet:"name=fee, type=DOUBLE, repetitiontype=OPTIONAL"`
// InputValue float64 `parquet:"name=input_value, type=DOUBLE, repetitiontype=OPTIONAL"`
}
awsTxInput struct {
ScriptHex string `parquet:"name=script_hex, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
SpentTransactionHash string `parquet:"name=spent_transaction_hash, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
TxInWitness []*string `parquet:"name=txinwitness, type=LIST, repetitiontype=OPTIONAL, valuetype=BYTE_ARRAY, convertedtype=UTF8"`
SpentOutputIndex int64 `parquet:"name=spent_output_index, type=INT64, repetitiontype=OPTIONAL"`
Sequence int64 `parquet:"name=sequence, type=INT64, repetitiontype=OPTIONAL"`
// Address string `parquet:"name=address, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
// RequiredSignatures int64 `parquet:"name=required_signatures, type=INT64, repetitiontype=OPTIONAL"`
// ScriptAsm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Value float64 `parquet:"name=value, type=DOUBLE, repetitiontype=OPTIONAL"`
}
awsTxOutput struct {
Script_hex string `parquet:"name=script_hex, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
Value float64 `parquet:"name=value, type=DOUBLE, repetitiontype=OPTIONAL"`
// Address string `parquet:"name=address, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Index int64 `parquet:"name=index, type=INT64, repetitiontype=OPTIONAL"`
// Required_signatures int64 `parquet:"name=required_signatures, type=INT64, repetitiontype=OPTIONAL"`
// Script_asm string `parquet:"name=script_asm, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
// Type string `parquet:"name=type, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"`
}
awsFile struct {
Key string
Size int64
LastModified time.Time
}
)
func (a awsBlock) ToBlockHeader() (types.BlockHeader, error) {
hash, err := chainhash.NewHashFromStr(a.Hash)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert block hash")
}
prevBlockHash, err := chainhash.NewHashFromStr(a.PreviousBlockHash)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert previous block hash")
}
merkleRoot, err := chainhash.NewHashFromStr(a.MerkleRoot)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert merkle root")
}
bits, err := strconv.ParseUint(a.Bits, 16, 32)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "can't convert bits from hex str to uint32")
}
return types.BlockHeader{
Hash: *hash,
Height: a.Number,
Version: int32(a.Version),
PrevBlock: *prevBlockHash,
MerkleRoot: *merkleRoot,
Timestamp: parquettypes.INT96ToTime(a.Timestamp),
Bits: uint32(bits),
Nonce: uint32(a.Nonce),
}, nil
}
func (a awsTransaction) ToTransaction(block awsBlock) (*types.Transaction, error) {
blockhash, err := chainhash.NewHashFromStr(block.Hash)
if err != nil {
return nil, errors.Wrap(err, "can't convert block hash")
}
msgtx, err := a.MsgTx(block)
if err != nil {
return nil, errors.Wrap(err, "can't convert aws tx to wire.msgtx")
}
return types.ParseMsgTx(msgtx, a.BlockNumber, *blockhash, uint32(a.Index)), nil
}
func (a awsTransaction) MsgTx(block awsBlock) (*wire.MsgTx, error) {
txIn := make([]*wire.TxIn, 0, len(a.Inputs))
txOut := make([]*wire.TxOut, 0, len(a.Outputs))
// NOTE: coinbase tx from AWS S3 has no inputs, so we need to add it manually,
// but we can't guarantee this data is correct especially the sequence number.
if a.IsCoinbase && len(a.Inputs) == 0 {
scriptsig, err := hex.DecodeString(block.CoinbaseParam)
if err != nil {
return nil, errors.Wrap(err, "can't decode script hex")
}
txIn = append(txIn, &wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Hash: common.ZeroHash,
Index: math.MaxUint32,
},
SignatureScript: scriptsig,
Witness: btcutils.CoinbaseWitness,
Sequence: math.MaxUint32, // most coinbase tx are using max sequence number
})
}
for _, in := range a.Inputs {
scriptsig, err := hex.DecodeString(in.ScriptHex)
if err != nil {
return nil, errors.Wrap(err, "can't decode script hex")
}
witness, err := btcutils.WitnessFromHex(lo.Map(in.TxInWitness, func(src *string, _ int) string {
if src == nil {
return ""
}
return *src
}))
if err != nil {
return nil, errors.Wrap(err, "can't convert witness")
}
prevOutHash, err := chainhash.NewHashFromStr(in.SpentTransactionHash)
if err != nil {
return nil, errors.Wrap(err, "can't convert prevout hash")
}
txIn = append(txIn, &wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Hash: *prevOutHash,
Index: uint32(in.SpentOutputIndex),
},
SignatureScript: scriptsig,
Witness: witness,
Sequence: uint32(in.Sequence),
})
}
for _, out := range a.Outputs {
scriptpubkey, err := hex.DecodeString(out.Script_hex)
if err != nil {
return nil, errors.Wrap(err, "can't decode script hex")
}
txOut = append(txOut, &wire.TxOut{
Value: btcutils.BitcoinToSatoshi(out.Value),
PkScript: scriptpubkey,
})
}
return &wire.MsgTx{
Version: int32(a.Version),
TxIn: txIn,
TxOut: txOut,
LockTime: uint32(a.LockTime),
}, nil
}

View File

@@ -88,7 +88,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
slogx.String("datasource", d.Name()),
)
from, to, skip, err := d.prepareRange(ctx, from, to)
from, to, skip, err := d.prepareRange(from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
}
@@ -212,12 +212,12 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
return subscription.Client(), nil
}
func (d *BitcoinNodeDatasource) prepareRange(ctx context.Context, fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
func (d *BitcoinNodeDatasource) prepareRange(fromHeight, toHeight int64) (start, end int64, skip bool, err error) {
start = fromHeight
end = toHeight
// get current bitcoin block height
latestBlockHeight, err := d.GetCurrentBlockHeight(ctx)
latestBlockHeight, err := d.btcclient.GetBlockCount()
if err != nil {
return -1, -1, false, errors.Wrap(err, "failed to get block count")
}
@@ -227,7 +227,7 @@ func (d *BitcoinNodeDatasource) prepareRange(ctx context.Context, fromHeight, to
start = 0
}
// set end to current bitcoin block height if d
// set end to current bitcoin block height if
// - end is -1
// - end is greater that current bitcoin block height
if end < 0 || end > latestBlockHeight {
@@ -292,12 +292,3 @@ func (d *BitcoinNodeDatasource) GetBlockHeader(ctx context.Context, height int64
return types.ParseMsgBlockHeader(*block, height), nil
}
// GetCurrentBlockHeight fetch current block height from Bitcoin node
func (d *BitcoinNodeDatasource) GetCurrentBlockHeight(ctx context.Context) (int64, error) {
height, err := d.btcclient.GetBlockCount()
if err != nil {
return -1, errors.Wrap(err, "failed to get block height")
}
return height, nil
}

View File

@@ -13,5 +13,4 @@ type Datasource[T any] interface {
Fetch(ctx context.Context, from, to int64) ([]T, error)
FetchAsync(ctx context.Context, from, to int64, ch chan<- []T) (*subscription.ClientSubscription[[]T], error)
GetBlockHeader(ctx context.Context, height int64) (types.BlockHeader, error)
GetCurrentBlockHeight(ctx context.Context) (int64, error)
}

View File

@@ -91,6 +91,10 @@ func (i *Indexer[T]) Run(ctx context.Context) (err error) {
select {
case <-i.quit:
logger.InfoContext(ctx, "Got quit signal, stopping indexer")
if err := i.Processor.Shutdown(ctx); err != nil {
logger.ErrorContext(ctx, "Failed to shutdown processor", slogx.Error(err))
return errors.Wrap(err, "processor shutdown failed")
}
return nil
case <-ctx.Done():
return nil
@@ -204,9 +208,9 @@ func (i *Indexer[T]) process(ctx context.Context) (err error) {
}
// validate is input is continuous and no reorg
for i := 1; i < len(inputs); i++ {
header := inputs[i].BlockHeader()
prevHeader := inputs[i-1].BlockHeader()
prevHeader := i.currentBlock
for i, input := range inputs {
header := input.BlockHeader()
if header.Height != prevHeader.Height+1 {
return errors.Wrapf(errs.InternalError, "input is not continuous, input[%d] height: %d, input[%d] height: %d", i-1, prevHeader.Height, i, header.Height)
}
@@ -217,6 +221,7 @@ func (i *Indexer[T]) process(ctx context.Context) (err error) {
// end current round
return nil
}
prevHeader = header
}
ctx = logger.WithContext(ctx, slog.Int("total_inputs", len(inputs)))

View File

@@ -29,6 +29,9 @@ type Processor[T Input] interface {
// VerifyStates verifies the states of the indexed data and the indexer
// to ensure the last shutdown was graceful and no missing data.
VerifyStates(ctx context.Context) error
// Shutdown gracefully stops the processor. Database connections, network calls, leftover states, etc. should be closed and cleaned up here.
Shutdown(ctx context.Context) error
}
type IndexerWorker interface {

29
go.mod
View File

@@ -4,10 +4,6 @@ go 1.22
require (
github.com/Cleverse/go-utilities/utils v0.0.0-20240119201306-d71eb577ef11
github.com/aws/aws-sdk-go-v2 v1.23.0
github.com/aws/aws-sdk-go-v2/config v1.25.3
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.14.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.43.0
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/btcutil v1.1.5
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
@@ -20,37 +16,18 @@ require (
github.com/planxnx/concurrent-stream v0.1.5
github.com/samber/do/v2 v2.0.0-beta.7
github.com/samber/lo v1.39.0
github.com/shopspring/decimal v1.4.0
github.com/shopspring/decimal v1.3.1
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.8.4
github.com/valyala/fasthttp v1.51.0
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20240122235623-d6294584ab18
go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.5.0
)
require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.20.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.16.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.17.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.25.3 // indirect
github.com/aws/smithy-go v1.17.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
@@ -63,7 +40,6 @@ require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
@@ -73,7 +49,6 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx v3.6.2+incompatible // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
@@ -84,7 +59,6 @@ require (
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.16 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
@@ -104,7 +78,6 @@ require (
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

1003
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -11,6 +11,8 @@ import (
runesconfig "github.com/gaze-network/indexer-network/modules/runes/config"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
"github.com/gaze-network/indexer-network/pkg/middleware/requestlogger"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/spf13/pflag"
"github.com/spf13/viper"
@@ -63,7 +65,9 @@ type Modules struct {
}
type HTTPServerConfig struct {
Port int `mapstructure:"port"`
Port int `mapstructure:"port"`
Logger requestlogger.Config `mapstructure:"logger"`
RequestIP requestcontext.WithClientIPConfig `mapstructure:"requestip"`
}
// Parse parse the configuration from environment variables

View File

@@ -31,6 +31,7 @@ type Processor struct {
bitcoinClient btcclient.Contract
network common.Network
reportingClient *reportingclient.ReportingClient
cleanupFuncs []func(context.Context) error
newRuneEntries map[runes.RuneId]*runes.RuneEntry
newRuneEntryStates map[runes.RuneId]*runes.RuneEntry
@@ -40,13 +41,14 @@ type Processor struct {
newRuneTxs []*entity.RuneTransaction
}
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient) *Processor {
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient, cleanupFuncs []func(context.Context) error) *Processor {
return &Processor{
runesDg: runesDg,
indexerInfoDg: indexerInfoDg,
bitcoinClient: bitcoinClient,
network: network,
reportingClient: reportingClient,
cleanupFuncs: cleanupFuncs,
newRuneEntries: make(map[runes.RuneId]*runes.RuneEntry),
newRuneEntryStates: make(map[runes.RuneId]*runes.RuneEntry),
newOutPointBalances: make(map[wire.OutPoint][]*entity.OutPointBalance),
@@ -228,3 +230,13 @@ func (p *Processor) RevertData(ctx context.Context, from int64) error {
}
return nil
}
func (p *Processor) Shutdown(ctx context.Context) error {
var errs []error
for _, cleanup := range p.cleanupFuncs {
if err := cleanup(ctx); err != nil {
errs = append(errs, err)
}
}
return errors.WithStack(errors.Join(errs...))
}

View File

@@ -33,6 +33,7 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
runesDg runesdatagateway.RunesDataGateway
indexerInfoDg runesdatagateway.IndexerInfoDataGateway
)
var cleanupFuncs []func(context.Context) error
switch strings.ToLower(conf.Modules.Runes.Database) {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
@@ -42,7 +43,10 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
}
return nil, errors.Wrap(err, "can't create Postgres connection pool")
}
defer pg.Close()
cleanupFuncs = append(cleanupFuncs, func(ctx context.Context) error {
pg.Close()
return nil
})
runesRepo := runespostgres.NewRepository(pg)
runesDg = runesRepo
indexerInfoDg = runesRepo
@@ -62,7 +66,7 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
return nil, errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
}
processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient)
processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient, cleanupFuncs)
if err := processor.VerifyStates(ctx); err != nil {
return nil, errors.WithStack(err)
}

View File

@@ -69,8 +69,26 @@ func ParseTag(input interface{}) (Tag, error) {
return input, nil
case uint128.Uint128:
return Tag(input), nil
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
return Tag(uint128.From64(input.(uint64))), nil
case int:
return Tag(uint128.From64(uint64(input))), nil
case int8:
return Tag(uint128.From64(uint64(input))), nil
case int16:
return Tag(uint128.From64(uint64(input))), nil
case int32:
return Tag(uint128.From64(uint64(input))), nil
case int64:
return Tag(uint128.From64(uint64(input))), nil
case uint:
return Tag(uint128.From64(uint64(input))), nil
case uint8:
return Tag(uint128.From64(uint64(input))), nil
case uint16:
return Tag(uint128.From64(uint64(input))), nil
case uint32:
return Tag(uint128.From64(uint64(input))), nil
case uint64:
return Tag(uint128.From64(input)), nil
case big.Int:
u128, err := uint128.FromBig(&input)
if err != nil {

View File

@@ -1,21 +0,0 @@
package btcutils
import "github.com/shopspring/decimal"
const (
BitcoinDecimals = 8
)
// satsUnit is 10^8
var satsUnit = decimal.New(1, BitcoinDecimals)
// BitcoinToSatoshi converts a amount in Bitcoin format to Satoshi format.
func BitcoinToSatoshi(v float64) int64 {
amount := decimal.NewFromFloat(v)
return amount.Mul(satsUnit).IntPart()
}
// SatoshiToBitcoin converts a amount in Satoshi format to Bitcoin format.
func SatoshiToBitcoin(v int64) float64 {
return decimal.New(v, -BitcoinDecimals).InexactFloat64()
}

View File

@@ -1,39 +0,0 @@
package btcutils
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSatoshiConversion(t *testing.T) {
testcases := []struct {
sats float64
btc int64
}{
{2.29980951, 229980951},
{1.29609085, 129609085},
{1.2768897, 127688970},
{0.62518296, 62518296},
{0.29998462, 29998462},
{0.1251, 12510000},
{0.02016011, 2016011},
{0.0198473, 1984730},
{0.0051711, 517110},
{0.0012, 120000},
{7e-05, 7000},
{3.835e-05, 3835},
{1.962e-05, 1962},
}
for _, testcase := range testcases {
t.Run(fmt.Sprintf("BtcToSats/%v", testcase.sats), func(t *testing.T) {
require.NotEqual(t, testcase.btc, int64(testcase.sats*1e8), "Testcase value should have precision error")
assert.Equal(t, testcase.btc, BitcoinToSatoshi(testcase.sats))
})
t.Run(fmt.Sprintf("SatsToBtc/%v", testcase.sats), func(t *testing.T) {
assert.Equal(t, testcase.sats, SatoshiToBitcoin(testcase.btc))
})
}
}

View File

@@ -4,7 +4,6 @@ import (
"encoding/hex"
"strings"
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
)
@@ -13,9 +12,6 @@ const (
witnessSeparator = " "
)
// CoinbaseWitness is the witness data for a coinbase transaction.
var CoinbaseWitness = utils.Must(WitnessFromHex([]string{"0000000000000000000000000000000000000000000000000000000000000000"}))
// WitnessToHex formats the passed witness stack as a slice of hex-encoded strings.
func WitnessToHex(witness wire.TxWitness) []string {
if len(witness) == 0 {
@@ -41,10 +37,7 @@ func WitnessToString(witness wire.TxWitness) string {
// WitnessFromHex parses the passed slice of hex-encoded strings into a witness stack.
func WitnessFromHex(witnesses []string) (wire.TxWitness, error) {
// NOTE: some witness from bitcoin node are empty and some are nil(most are nil), it's not clear why.
// For now, we will return nil for both cases.
if len(witnesses) == 0 {
// return wire.TxWitness{}, nil
return nil, nil
}

View File

@@ -0,0 +1,7 @@
# Proxies IP Range Resources
- Cloudflare - https://www.cloudflare.com/ips/
- GCP Load Balancer - https://cloud.google.com/load-balancing/docs/health-check-concepts#ip-ranges
- GCP Compute Engine, Customer-usable external IP address ranges - https://www.gstatic.com/ipranges/cloud.json
- Other GCP Services - https://cloud.google.com/compute/docs/faq#networking
- Other Resources - https://github.com/lord-alfred/ipranges

View File

@@ -0,0 +1,21 @@
package requestcontext
// requestcontextError implements error interface
var _ error = requestcontextError{}
type requestcontextError struct {
err error
status int
message string
}
func (r requestcontextError) Error() string {
if r.err != nil {
return r.err.Error()
}
return r.message
}
func (r requestcontextError) Unwrap() error {
return r.err
}

View File

@@ -0,0 +1,44 @@
package requestcontext
import (
"context"
"log/slog"
"net/http"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gofiber/fiber/v2"
)
type Response struct {
Result any `json:"result"`
Error string `json:"error,omitempty"`
}
type Option func(ctx context.Context, c *fiber.Ctx) (context.Context, error)
func New(opts ...Option) fiber.Handler {
return func(c *fiber.Ctx) error {
var err error
ctx := c.UserContext()
for i, opt := range opts {
ctx, err = opt(ctx, c)
if err != nil {
rErr := requestcontextError{}
if errors.As(err, &rErr) {
return c.Status(rErr.status).JSON(Response{Error: rErr.message})
}
logger.ErrorContext(ctx, "failed to extract request context",
err,
slog.String("event", "requestcontext/error"),
slog.String("module", "requestcontext"),
slog.Int("optionIndex", i),
)
return c.Status(http.StatusInternalServerError).JSON(Response{Error: "internal server error"})
}
}
c.SetUserContext(ctx)
return c.Next()
}
}

View File

@@ -0,0 +1,150 @@
package requestcontext
import (
"context"
"log/slog"
"net"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gofiber/fiber/v2"
)
type clientIPKey struct{}
type WithClientIPConfig struct {
// [Optional] TrustedProxiesIP is a list of all proxies IP ranges that's between the server and the client.
//
// If it's provided, it will walk backwards from the last IP in `X-Forwarded-For` header
// and use first IP that's not trusted proxy(not in the given IP ranges.)
//
// **If you want to use this option, you should provide all of probable proxies IP ranges.**
//
// This is lowest priority.
TrustedProxiesIP []string `env:"TRUSTED_PROXIES_IP" mapstructure:"trusted_proxies_ip"`
// [Optional] TrustedHeader is a header name for getting client IP. (e.g. X-Real-IP, CF-Connecting-IP, etc.)
//
// This is highest priority, it will ignore rest of the options if it's provided.
TrustedHeader string `env:"TRUSTED_HEADER" mapstructure:"trusted_proxies_header"`
// EnableRejectMalformedRequest return 403 Forbidden if the request is from proxies, but can't extract client IP
EnableRejectMalformedRequest bool `env:"ENABLE_REJECT_MALFORMED_REQUEST" envDefault:"false" mapstructure:"enable_reject_malformed_request"`
}
// WithClientIP setup client IP context with XFF Spoofing prevention support.
//
// If request is from proxies, it will use first IP from `X-Forwarded-For` header by default.
func WithClientIP(config WithClientIPConfig) Option {
var trustedProxies trustedProxy
if len(config.TrustedProxiesIP) > 0 {
proxy, err := newTrustedProxy(config.TrustedProxiesIP)
if err != nil {
logger.Panic("Failed to parse trusted proxies", err)
}
trustedProxies = proxy
}
return func(ctx context.Context, c *fiber.Ctx) (context.Context, error) {
// Extract client IP from given header
if config.TrustedHeader != "" {
headerIP := c.Get(config.TrustedHeader)
// validate ip from header
if ip := net.ParseIP(headerIP); ip != nil {
return context.WithValue(ctx, clientIPKey{}, headerIP), nil
}
}
// Extract client IP from XFF header
rawIPs := c.IPs()
ips := parseIPs(rawIPs)
// If the request is directly from client, we can use direct remote IP address
if len(ips) == 0 {
return context.WithValue(ctx, clientIPKey{}, c.IP()), nil
}
// Walk back and find first IP that's not trusted proxy
if len(trustedProxies) > 0 {
for i := len(ips) - 1; i >= 0; i-- {
if !trustedProxies.IsTrusted(ips[i]) {
return context.WithValue(ctx, clientIPKey{}, ips[i].String()), nil
}
}
// If all IPs are trusted proxies, return first IP in XFF header
return context.WithValue(ctx, clientIPKey{}, rawIPs[0]), nil
}
// Finally, if we can't extract client IP, return forbidden
if config.EnableRejectMalformedRequest {
logger.WarnContext(ctx, "IP Spoofing detected, returning 403 Forbidden",
slog.String("event", "requestcontext/ip_spoofing_detected"),
slog.String("module", "requestcontext/with_clientip"),
slog.String("ip", c.IP()),
slog.Any("ips", rawIPs),
)
return nil, requestcontextError{
status: fiber.StatusForbidden,
message: "not allowed to access",
}
}
// Fallback to first IP in XFF header
return context.WithValue(ctx, clientIPKey{}, rawIPs[0]), nil
}
}
// GetClientIP get clientIP from context. If not found, return empty string
//
// Warning: Request context should be setup before using this function
func GetClientIP(ctx context.Context) string {
if ip, ok := ctx.Value(clientIPKey{}).(string); ok {
return ip
}
return ""
}
type trustedProxy []*net.IPNet
// newTrustedProxy create a new trusted proxies instance for preventing IP spoofing (XFF Attacks)
func newTrustedProxy(ranges []string) (trustedProxy, error) {
nets, err := parseCIDRs(ranges)
if err != nil {
return nil, errors.WithStack(err)
}
return trustedProxy(nets), nil
}
func (t trustedProxy) IsTrusted(ip net.IP) bool {
if ip == nil {
return false
}
for _, r := range t {
if r.Contains(ip) {
return true
}
}
return false
}
func parseCIDRs(ranges []string) ([]*net.IPNet, error) {
nets := make([]*net.IPNet, 0, len(ranges))
for _, r := range ranges {
_, ipnet, err := net.ParseCIDR(r)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse CIDR for %q", r)
}
nets = append(nets, ipnet)
}
return nets, nil
}
func parseIPs(ranges []string) []net.IP {
ip := make([]net.IP, 0, len(ranges))
for _, r := range ranges {
ip = append(ip, net.ParseIP(r))
}
return ip
}

View File

@@ -0,0 +1,47 @@
package requestcontext
import (
"context"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/requestid"
fiberutils "github.com/gofiber/fiber/v2/utils"
)
type requestIdKey struct{}
// GetRequestId get requestId from context. If not found, return empty string
//
// Warning: Request context should be setup before using this function
func GetRequestId(ctx context.Context) string {
if id, ok := ctx.Value(requestIdKey{}).(string); ok {
return id
}
return ""
}
func WithRequestId() Option {
return func(ctx context.Context, c *fiber.Ctx) (context.Context, error) {
// Try to get id from fiber context.
requestId, ok := c.Locals(requestid.ConfigDefault.ContextKey).(string)
if !ok || requestId == "" {
// Try to get id from request, else we generate one
requestId = c.Get(requestid.ConfigDefault.Header, fiberutils.UUID())
// Set new id to response header
c.Set(requestid.ConfigDefault.Header, requestId)
// Add the request ID to locals (fasthttp UserValue storage)
c.Locals(requestid.ConfigDefault.ContextKey, requestId)
}
// Add the request ID to context
ctx = context.WithValue(ctx, requestIdKey{}, requestId)
// Add the requuest ID to context logger
ctx = logger.WithContext(ctx, "requestId", requestId)
return ctx, nil
}
}

View File

@@ -0,0 +1,115 @@
package requestlogger
import (
"log/slog"
"net/http"
"strings"
"time"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
"github.com/gofiber/fiber/v2"
)
type Config struct {
WithRequestHeader bool `env:"REQUEST_HEADER" envDefault:"false" mapstructure:"request_header"`
WithRequestQuery bool `env:"REQUEST_QUERY" envDefault:"false" mapstructure:"request_query"`
Disable bool `env:"DISABLE" envDefault:"false" mapstructure:"disable"` // Disable logger level `INFO`
HiddenRequestHeaders []string `env:"HIDDEN_REQUEST_HEADERS" mapstructure:"hidden_request_headers"`
}
// New setup request context and information
func New(config Config) fiber.Handler {
hiddenRequestHeaders := make(map[string]struct{}, len(config.HiddenRequestHeaders))
for _, header := range config.HiddenRequestHeaders {
hiddenRequestHeaders[strings.TrimSpace(strings.ToLower(header))] = struct{}{}
}
return func(c *fiber.Ctx) error {
start := time.Now()
// Continue stack
err := c.Next()
end := time.Now()
latency := end.Sub(start)
status := c.Response().StatusCode()
baseAttrs := []slog.Attr{
slog.String("event", "api_request"),
slog.Int64("latency", latency.Milliseconds()),
slog.String("latencyHuman", latency.String()),
}
// prep request attributes
requestAttributes := []slog.Attr{
slog.Time("time", start),
slog.String("method", c.Method()),
slog.String("host", c.Hostname()),
slog.String("path", c.Path()),
slog.String("route", c.Route().Path),
slog.String("ip", requestcontext.GetClientIP(c.UserContext())),
slog.String("remoteIP", c.Context().RemoteIP().String()),
slog.Any("x-forwarded-for", c.IPs()),
slog.String("user-agent", string(c.Context().UserAgent())),
slog.Any("params", c.AllParams()),
slog.Int("length", len((c.Body()))),
}
// prep response attributes
responseAttributes := []slog.Attr{
slog.Time("time", end),
slog.Int("status", status),
slog.Int("length", len(c.Response().Body())),
}
// request query
if config.WithRequestQuery {
requestAttributes = append(requestAttributes, slog.String("query", string(c.Request().URI().QueryString())))
}
// request headers
if config.WithRequestHeader {
kv := []any{}
for k, v := range c.GetReqHeaders() {
if _, found := hiddenRequestHeaders[strings.ToLower(k)]; found {
continue
}
kv = append(kv, slog.Any(k, v))
}
requestAttributes = append(requestAttributes, slog.Group("header", kv...))
}
level := slog.LevelInfo
if err != nil || status >= http.StatusInternalServerError {
level = slog.LevelError
// error attributes
logErr := err
if logErr == nil {
logErr = fiber.NewError(status)
}
baseAttrs = append(baseAttrs, slog.Any("error", logErr))
}
if config.Disable && level == slog.LevelInfo {
return errors.WithStack(err)
}
logger.LogAttrs(c.UserContext(), level, "Request Completed", append([]slog.Attr{
{
Key: "request",
Value: slog.GroupValue(requestAttributes...),
},
{
Key: "response",
Value: slog.GroupValue(responseAttributes...),
},
}, baseAttrs...)...,
)
return errors.WithStack(err)
}
}

View File

@@ -1,133 +0,0 @@
// nolint: wrapcheck
package parquetutils
import (
"errors"
"io"
"sync"
"github.com/xitongsys/parquet-go/source"
)
var (
// Make sure Buffer implements the ParquetFile interface.
_ source.ParquetFile = (*Buffer)(nil)
// Make sure Buffer implements the io.WriterAt interface.
_ io.WriterAt = (*Buffer)(nil)
)
// Buffer allows reading parquet messages from a memory buffer.
type Buffer struct {
buf []byte
loc int
m sync.Mutex
}
// NewBuffer creates a new in memory parquet buffer.
func NewBuffer() *Buffer {
return &Buffer{buf: make([]byte, 0, 512)}
}
// NewBufferFrom creates new in memory parquet buffer from the given bytes.
// It uses the provided slice as its buffer.
func NewBufferFrom(s []byte) *Buffer {
return &Buffer{
buf: s,
}
}
func (b *Buffer) Create(string) (source.ParquetFile, error) {
return &Buffer{buf: make([]byte, 0, 512)}, nil
}
func (b *Buffer) Open(string) (source.ParquetFile, error) {
return NewBufferFrom(b.Bytes()), nil
}
// Seek seeks in the underlying memory buffer.
func (b *Buffer) Seek(offset int64, whence int) (int64, error) {
newLoc := b.loc
switch whence {
case io.SeekStart:
newLoc = int(offset)
case io.SeekCurrent:
newLoc += int(offset)
case io.SeekEnd:
newLoc = len(b.buf) + int(offset)
default:
return int64(b.loc), errors.New("Seek: invalid whence")
}
if newLoc < 0 {
return int64(b.loc), errors.New("Seek: invalid offset")
}
if newLoc > len(b.buf) {
newLoc = len(b.buf)
}
b.loc = newLoc
return int64(b.loc), nil
}
// Read reads data form BufferFile into p.
func (b *Buffer) Read(p []byte) (n int, err error) {
n = copy(p, b.buf[b.loc:len(b.buf)])
b.loc += n
if b.loc == len(b.buf) {
return n, io.EOF
}
return n, nil
}
// Write writes data from p into BufferFile.
func (b *Buffer) Write(p []byte) (n int, err error) {
n, err = b.WriteAt(p, int64(b.loc))
if err != nil {
return 0, err
}
b.loc += n
return
}
// WriteAt writes a slice of bytes to a buffer starting at the position provided
// The number of bytes written will be returned, or error. Can overwrite previous
// written slices if the write ats overlap.
func (b *Buffer) WriteAt(p []byte, pos int64) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
pLen := len(p)
expLen := pos + int64(pLen)
if int64(len(b.buf)) < expLen {
if int64(cap(b.buf)) < expLen {
newBuf := make([]byte, expLen)
copy(newBuf, b.buf)
b.buf = newBuf
}
b.buf = b.buf[:expLen]
}
copy(b.buf[pos:], p)
return pLen, nil
}
// Close is a no-op for a memory buffer.
func (*Buffer) Close() error {
return nil
}
// Bytes returns the underlying buffer bytes.
func (b *Buffer) Bytes() []byte {
return b.buf
}
// Reset resets the buffer to be empty,
// but it retains the underlying storage for use by future writes.
func (b *Buffer) Reset() {
b.m.Lock()
defer b.m.Unlock()
b.buf = b.buf[:0]
b.loc = 0
}

View File

@@ -1,26 +0,0 @@
package parquetutils
import (
"github.com/pkg/errors"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/source"
)
// ReaderConcurrency parallel number of file readers.
var ReaderConcurrency int64 = 8
// ReadAll reads all records from the parquet file.
func ReadAll[T any](sourceFile source.ParquetFile) ([]T, error) {
r, err := reader.NewParquetReader(sourceFile, new(T), ReaderConcurrency)
if err != nil {
return nil, errors.Wrap(err, "can't create parquet reader")
}
defer r.ReadStop()
data := make([]T, r.GetNumRows())
if err = r.Read(&data); err != nil {
return nil, errors.Wrap(err, "failed to read parquet data")
}
return data, nil
}