Compare commits

...

9 Commits

Author SHA1 Message Date
Thanee Charattrakool
51fd1f6636 feat: move requestip config to http config (#25) 2024-06-12 22:08:03 +07:00
Thanee Charattrakool
a7bc6257c4 feat(api): add request context and logger middleware (#24)
* feat(api): add request context and logger middleware

* feat(api): add cors and favicon middleware

* fix: solve wrapcheck linter warning

* feat: configurable hidden request headers
2024-06-12 21:47:29 +07:00
gazenw
3bb7500c87 feat: update docker version 2024-06-07 13:55:55 +07:00
Gaze
8c92893d4a feat: release v0.2.1 2024-05-31 01:16:34 +07:00
Nut Pinyo
d84e30ed11 fix: implement Shutdown() for processors (#22) 2024-05-31 01:13:12 +07:00
Thanee Charattrakool
d9fa217977 feat: use current indexed block for first prev block (#23)
* feat: use current indexed block for first prev block

* fix: forgot to set next prev header
2024-05-31 01:11:37 +07:00
Thanee Charattrakool
709b00ec0e build: add Docker cache mound for Go modules (#21)
* build: add cache mount for go modules

* doc(docker): update TZ description

* build: use entrypoint instead cmd exec

* build: add dockerignore

* build: add modules dir to image for migration command

* build: update dockerignore

* doc: fix typo

Co-authored-by: gazenw <163862510+gazenw@users.noreply.github.com>

---------

Co-authored-by: gazenw <163862510+gazenw@users.noreply.github.com>
2024-05-23 17:10:03 +07:00
gazenw
50ae103502 doc: update docker compose example 2024-05-21 14:44:59 +07:00
gazenw
c0242bd555 Update README.md 2024-05-20 18:37:32 +07:00
17 changed files with 469 additions and 19 deletions

18
.dockerignore Normal file
View File

@@ -0,0 +1,18 @@
.git
.gitignore
.github
.vscode
**/*.md
**/*.log
.DS_Store
# Docker
Dockerfile
.dockerignore
docker-compose.yml
# Go
.golangci.yaml
cmd.local
config.*.y*ml
config.y*ml

View File

@@ -3,15 +3,15 @@ FROM golang:1.22 as builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
RUN --mount=type=cache,target=/go/pkg/mod/ go mod download
COPY ./ ./
ENV GOOS=linux
ENV CGO_ENABLED=0
RUN go build \
-o main ./main.go
RUN --mount=type=cache,target=/go/pkg/mod/ \
go build -o main ./main.go
FROM alpine:latest
@@ -19,9 +19,10 @@ WORKDIR /app
RUN apk --no-cache add ca-certificates tzdata
COPY --from=builder /app/main .
COPY --from=builder /app/modules ./modules
# You can set `TZ` environment variable to change the timezone
# You can set TZ identifier to change the timezone, See https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List
# ENV TZ=US/Central
CMD ["/app/main", "run"]
ENTRYPOINT ["/app/main"]

View File

@@ -25,7 +25,7 @@ This allows developers to focus on what **truly** matters: Meta-protocol indexin
### 1. Runes
The Runes Indexer is our first meta-protocol indexer. It indexes Runes states, transactions, runestones, and balances using Bitcoin transactions.
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://documenter.getpostman.com/view/28396285/2sA3Bn7Cxr) for full details.
It comes with a set of APIs for querying historical Runes data. See our [API Reference](https://api-docs.gaze.network) for full details.
## Installation
@@ -51,8 +51,6 @@ Here is our minimum database disk space requirement for each module.
| ------ | -------------------------- | ---------------------------- |
| Runes | 10 GB | 150 GB |
Here is our minimum database disk space requirement for each module.
#### 4. Prepare `config.yaml` file.
```yaml
@@ -108,14 +106,14 @@ We will be using `docker-compose` for our installation guide. Make sure the `doc
# docker-compose.yaml
services:
gaze-indexer:
image: ghcr.io/gaze-network/gaze-indexer:v1.0.0
image: ghcr.io/gaze-network/gaze-indexer:v0.2.1
container_name: gaze-indexer
restart: unless-stopped
ports:
- 8080:8080 # Expose HTTP server port to host
volumes:
- "./config.yaml:/app/config.yaml" # mount config.yaml file to the container as "/app/config.yaml"
command: ["/app/main", "run", "--runes"] # Put module flags after "run" commands to select which modules to run.
command: ["/app/main", "run", "--modules", "runes"] # Put module flags after "run" commands to select which modules to run.
```
### Install from source

View File

@@ -22,10 +22,15 @@ import (
"github.com/gaze-network/indexer-network/pkg/errorhandler"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
"github.com/gaze-network/indexer-network/pkg/middleware/requestlogger"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/compress"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/favicon"
fiberrecover "github.com/gofiber/fiber/v2/middleware/recover"
"github.com/gofiber/fiber/v2/middleware/requestid"
"github.com/samber/do/v2"
"github.com/samber/lo"
"github.com/spf13/cobra"
@@ -135,6 +140,14 @@ func runHandler(cmd *cobra.Command, _ []string) error {
ErrorHandler: errorhandler.NewHTTPErrorHandler(),
})
app.
Use(favicon.New()).
Use(cors.New()).
Use(requestid.New()).
Use(requestcontext.New(
requestcontext.WithRequestId(),
requestcontext.WithClientIP(conf.HTTPServer.RequestIP),
)).
Use(requestlogger.New(conf.HTTPServer.Logger)).
Use(fiberrecover.New(fiberrecover.Config{
EnableStackTrace: true,
StackTraceHandler: func(c *fiber.Ctx, e interface{}) {

View File

@@ -23,6 +23,14 @@ reporting:
# HTTP server configuration options.
http_server:
port: 8080 # Port to run the HTTP server on for modules with HTTP API handlers.
logger:
disable: false # disable logger if logger level is `INFO`
request_header: false
request_query: false
requestip: # Client IP extraction configuration options. This is unnecessary if you don't care about the real client IP or if you're not using a reverse proxy.
trusted_proxies_ip: # Cloudflare, GCP Public LB. See: server/internal/middleware/requestcontext/PROXY-IP.md
trusted_proxies_header: # X-Real-IP, CF-Connecting-IP
enable_reject_malformed_request: false # return 403 if request is malformed (invalid IP)
# Meta-protocol modules configuration options.
modules:

View File

@@ -1,5 +1,5 @@
package constants
const (
Version = "v0.0.1"
Version = "v0.2.1"
)

View File

@@ -91,6 +91,10 @@ func (i *Indexer[T]) Run(ctx context.Context) (err error) {
select {
case <-i.quit:
logger.InfoContext(ctx, "Got quit signal, stopping indexer")
if err := i.Processor.Shutdown(ctx); err != nil {
logger.ErrorContext(ctx, "Failed to shutdown processor", slogx.Error(err))
return errors.Wrap(err, "processor shutdown failed")
}
return nil
case <-ctx.Done():
return nil
@@ -204,9 +208,9 @@ func (i *Indexer[T]) process(ctx context.Context) (err error) {
}
// validate is input is continuous and no reorg
for i := 1; i < len(inputs); i++ {
header := inputs[i].BlockHeader()
prevHeader := inputs[i-1].BlockHeader()
prevHeader := i.currentBlock
for i, input := range inputs {
header := input.BlockHeader()
if header.Height != prevHeader.Height+1 {
return errors.Wrapf(errs.InternalError, "input is not continuous, input[%d] height: %d, input[%d] height: %d", i-1, prevHeader.Height, i, header.Height)
}
@@ -217,6 +221,7 @@ func (i *Indexer[T]) process(ctx context.Context) (err error) {
// end current round
return nil
}
prevHeader = header
}
ctx = logger.WithContext(ctx, slog.Int("total_inputs", len(inputs)))

View File

@@ -29,6 +29,9 @@ type Processor[T Input] interface {
// VerifyStates verifies the states of the indexed data and the indexer
// to ensure the last shutdown was graceful and no missing data.
VerifyStates(ctx context.Context) error
// Shutdown gracefully stops the processor. Database connections, network calls, leftover states, etc. should be closed and cleaned up here.
Shutdown(ctx context.Context) error
}
type IndexerWorker interface {

View File

@@ -11,6 +11,8 @@ import (
runesconfig "github.com/gaze-network/indexer-network/modules/runes/config"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
"github.com/gaze-network/indexer-network/pkg/middleware/requestlogger"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/spf13/pflag"
"github.com/spf13/viper"
@@ -63,7 +65,9 @@ type Modules struct {
}
type HTTPServerConfig struct {
Port int `mapstructure:"port"`
Port int `mapstructure:"port"`
Logger requestlogger.Config `mapstructure:"logger"`
RequestIP requestcontext.WithClientIPConfig `mapstructure:"requestip"`
}
// Parse parse the configuration from environment variables

View File

@@ -31,6 +31,7 @@ type Processor struct {
bitcoinClient btcclient.Contract
network common.Network
reportingClient *reportingclient.ReportingClient
cleanupFuncs []func(context.Context) error
newRuneEntries map[runes.RuneId]*runes.RuneEntry
newRuneEntryStates map[runes.RuneId]*runes.RuneEntry
@@ -40,13 +41,14 @@ type Processor struct {
newRuneTxs []*entity.RuneTransaction
}
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient) *Processor {
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient, cleanupFuncs []func(context.Context) error) *Processor {
return &Processor{
runesDg: runesDg,
indexerInfoDg: indexerInfoDg,
bitcoinClient: bitcoinClient,
network: network,
reportingClient: reportingClient,
cleanupFuncs: cleanupFuncs,
newRuneEntries: make(map[runes.RuneId]*runes.RuneEntry),
newRuneEntryStates: make(map[runes.RuneId]*runes.RuneEntry),
newOutPointBalances: make(map[wire.OutPoint][]*entity.OutPointBalance),
@@ -228,3 +230,13 @@ func (p *Processor) RevertData(ctx context.Context, from int64) error {
}
return nil
}
func (p *Processor) Shutdown(ctx context.Context) error {
var errs []error
for _, cleanup := range p.cleanupFuncs {
if err := cleanup(ctx); err != nil {
errs = append(errs, err)
}
}
return errors.WithStack(errors.Join(errs...))
}

View File

@@ -33,6 +33,7 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
runesDg runesdatagateway.RunesDataGateway
indexerInfoDg runesdatagateway.IndexerInfoDataGateway
)
var cleanupFuncs []func(context.Context) error
switch strings.ToLower(conf.Modules.Runes.Database) {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
@@ -42,7 +43,10 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
}
return nil, errors.Wrap(err, "can't create Postgres connection pool")
}
defer pg.Close()
cleanupFuncs = append(cleanupFuncs, func(ctx context.Context) error {
pg.Close()
return nil
})
runesRepo := runespostgres.NewRepository(pg)
runesDg = runesRepo
indexerInfoDg = runesRepo
@@ -62,7 +66,7 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) {
return nil, errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
}
processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient)
processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient, cleanupFuncs)
if err := processor.VerifyStates(ctx); err != nil {
return nil, errors.WithStack(err)
}

View File

@@ -0,0 +1,7 @@
# Proxies IP Range Resources
- Cloudflare - https://www.cloudflare.com/ips/
- GCP Load Balancer - https://cloud.google.com/load-balancing/docs/health-check-concepts#ip-ranges
- GCP Compute Engine, Customer-usable external IP address ranges - https://www.gstatic.com/ipranges/cloud.json
- Other GCP Services - https://cloud.google.com/compute/docs/faq#networking
- Other Resources - https://github.com/lord-alfred/ipranges

View File

@@ -0,0 +1,21 @@
package requestcontext
// requestcontextError implements error interface
var _ error = requestcontextError{}
type requestcontextError struct {
err error
status int
message string
}
func (r requestcontextError) Error() string {
if r.err != nil {
return r.err.Error()
}
return r.message
}
func (r requestcontextError) Unwrap() error {
return r.err
}

View File

@@ -0,0 +1,44 @@
package requestcontext
import (
"context"
"log/slog"
"net/http"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gofiber/fiber/v2"
)
type Response struct {
Result any `json:"result"`
Error string `json:"error,omitempty"`
}
type Option func(ctx context.Context, c *fiber.Ctx) (context.Context, error)
func New(opts ...Option) fiber.Handler {
return func(c *fiber.Ctx) error {
var err error
ctx := c.UserContext()
for i, opt := range opts {
ctx, err = opt(ctx, c)
if err != nil {
rErr := requestcontextError{}
if errors.As(err, &rErr) {
return c.Status(rErr.status).JSON(Response{Error: rErr.message})
}
logger.ErrorContext(ctx, "failed to extract request context",
err,
slog.String("event", "requestcontext/error"),
slog.String("module", "requestcontext"),
slog.Int("optionIndex", i),
)
return c.Status(http.StatusInternalServerError).JSON(Response{Error: "internal server error"})
}
}
c.SetUserContext(ctx)
return c.Next()
}
}

View File

@@ -0,0 +1,150 @@
package requestcontext
import (
"context"
"log/slog"
"net"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gofiber/fiber/v2"
)
type clientIPKey struct{}
type WithClientIPConfig struct {
// [Optional] TrustedProxiesIP is a list of all proxies IP ranges that's between the server and the client.
//
// If it's provided, it will walk backwards from the last IP in `X-Forwarded-For` header
// and use first IP that's not trusted proxy(not in the given IP ranges.)
//
// **If you want to use this option, you should provide all of probable proxies IP ranges.**
//
// This is lowest priority.
TrustedProxiesIP []string `env:"TRUSTED_PROXIES_IP" mapstructure:"trusted_proxies_ip"`
// [Optional] TrustedHeader is a header name for getting client IP. (e.g. X-Real-IP, CF-Connecting-IP, etc.)
//
// This is highest priority, it will ignore rest of the options if it's provided.
TrustedHeader string `env:"TRUSTED_HEADER" mapstructure:"trusted_proxies_header"`
// EnableRejectMalformedRequest return 403 Forbidden if the request is from proxies, but can't extract client IP
EnableRejectMalformedRequest bool `env:"ENABLE_REJECT_MALFORMED_REQUEST" envDefault:"false" mapstructure:"enable_reject_malformed_request"`
}
// WithClientIP setup client IP context with XFF Spoofing prevention support.
//
// If request is from proxies, it will use first IP from `X-Forwarded-For` header by default.
func WithClientIP(config WithClientIPConfig) Option {
var trustedProxies trustedProxy
if len(config.TrustedProxiesIP) > 0 {
proxy, err := newTrustedProxy(config.TrustedProxiesIP)
if err != nil {
logger.Panic("Failed to parse trusted proxies", err)
}
trustedProxies = proxy
}
return func(ctx context.Context, c *fiber.Ctx) (context.Context, error) {
// Extract client IP from given header
if config.TrustedHeader != "" {
headerIP := c.Get(config.TrustedHeader)
// validate ip from header
if ip := net.ParseIP(headerIP); ip != nil {
return context.WithValue(ctx, clientIPKey{}, headerIP), nil
}
}
// Extract client IP from XFF header
rawIPs := c.IPs()
ips := parseIPs(rawIPs)
// If the request is directly from client, we can use direct remote IP address
if len(ips) == 0 {
return context.WithValue(ctx, clientIPKey{}, c.IP()), nil
}
// Walk back and find first IP that's not trusted proxy
if len(trustedProxies) > 0 {
for i := len(ips) - 1; i >= 0; i-- {
if !trustedProxies.IsTrusted(ips[i]) {
return context.WithValue(ctx, clientIPKey{}, ips[i].String()), nil
}
}
// If all IPs are trusted proxies, return first IP in XFF header
return context.WithValue(ctx, clientIPKey{}, rawIPs[0]), nil
}
// Finally, if we can't extract client IP, return forbidden
if config.EnableRejectMalformedRequest {
logger.WarnContext(ctx, "IP Spoofing detected, returning 403 Forbidden",
slog.String("event", "requestcontext/ip_spoofing_detected"),
slog.String("module", "requestcontext/with_clientip"),
slog.String("ip", c.IP()),
slog.Any("ips", rawIPs),
)
return nil, requestcontextError{
status: fiber.StatusForbidden,
message: "not allowed to access",
}
}
// Fallback to first IP in XFF header
return context.WithValue(ctx, clientIPKey{}, rawIPs[0]), nil
}
}
// GetClientIP get clientIP from context. If not found, return empty string
//
// Warning: Request context should be setup before using this function
func GetClientIP(ctx context.Context) string {
if ip, ok := ctx.Value(clientIPKey{}).(string); ok {
return ip
}
return ""
}
type trustedProxy []*net.IPNet
// newTrustedProxy create a new trusted proxies instance for preventing IP spoofing (XFF Attacks)
func newTrustedProxy(ranges []string) (trustedProxy, error) {
nets, err := parseCIDRs(ranges)
if err != nil {
return nil, errors.WithStack(err)
}
return trustedProxy(nets), nil
}
func (t trustedProxy) IsTrusted(ip net.IP) bool {
if ip == nil {
return false
}
for _, r := range t {
if r.Contains(ip) {
return true
}
}
return false
}
func parseCIDRs(ranges []string) ([]*net.IPNet, error) {
nets := make([]*net.IPNet, 0, len(ranges))
for _, r := range ranges {
_, ipnet, err := net.ParseCIDR(r)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse CIDR for %q", r)
}
nets = append(nets, ipnet)
}
return nets, nil
}
func parseIPs(ranges []string) []net.IP {
ip := make([]net.IP, 0, len(ranges))
for _, r := range ranges {
ip = append(ip, net.ParseIP(r))
}
return ip
}

View File

@@ -0,0 +1,47 @@
package requestcontext
import (
"context"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/requestid"
fiberutils "github.com/gofiber/fiber/v2/utils"
)
type requestIdKey struct{}
// GetRequestId get requestId from context. If not found, return empty string
//
// Warning: Request context should be setup before using this function
func GetRequestId(ctx context.Context) string {
if id, ok := ctx.Value(requestIdKey{}).(string); ok {
return id
}
return ""
}
func WithRequestId() Option {
return func(ctx context.Context, c *fiber.Ctx) (context.Context, error) {
// Try to get id from fiber context.
requestId, ok := c.Locals(requestid.ConfigDefault.ContextKey).(string)
if !ok || requestId == "" {
// Try to get id from request, else we generate one
requestId = c.Get(requestid.ConfigDefault.Header, fiberutils.UUID())
// Set new id to response header
c.Set(requestid.ConfigDefault.Header, requestId)
// Add the request ID to locals (fasthttp UserValue storage)
c.Locals(requestid.ConfigDefault.ContextKey, requestId)
}
// Add the request ID to context
ctx = context.WithValue(ctx, requestIdKey{}, requestId)
// Add the requuest ID to context logger
ctx = logger.WithContext(ctx, "requestId", requestId)
return ctx, nil
}
}

View File

@@ -0,0 +1,115 @@
package requestlogger
import (
"log/slog"
"net/http"
"strings"
"time"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/middleware/requestcontext"
"github.com/gofiber/fiber/v2"
)
type Config struct {
WithRequestHeader bool `env:"REQUEST_HEADER" envDefault:"false" mapstructure:"request_header"`
WithRequestQuery bool `env:"REQUEST_QUERY" envDefault:"false" mapstructure:"request_query"`
Disable bool `env:"DISABLE" envDefault:"false" mapstructure:"disable"` // Disable logger level `INFO`
HiddenRequestHeaders []string `env:"HIDDEN_REQUEST_HEADERS" mapstructure:"hidden_request_headers"`
}
// New setup request context and information
func New(config Config) fiber.Handler {
hiddenRequestHeaders := make(map[string]struct{}, len(config.HiddenRequestHeaders))
for _, header := range config.HiddenRequestHeaders {
hiddenRequestHeaders[strings.TrimSpace(strings.ToLower(header))] = struct{}{}
}
return func(c *fiber.Ctx) error {
start := time.Now()
// Continue stack
err := c.Next()
end := time.Now()
latency := end.Sub(start)
status := c.Response().StatusCode()
baseAttrs := []slog.Attr{
slog.String("event", "api_request"),
slog.Int64("latency", latency.Milliseconds()),
slog.String("latencyHuman", latency.String()),
}
// prep request attributes
requestAttributes := []slog.Attr{
slog.Time("time", start),
slog.String("method", c.Method()),
slog.String("host", c.Hostname()),
slog.String("path", c.Path()),
slog.String("route", c.Route().Path),
slog.String("ip", requestcontext.GetClientIP(c.UserContext())),
slog.String("remoteIP", c.Context().RemoteIP().String()),
slog.Any("x-forwarded-for", c.IPs()),
slog.String("user-agent", string(c.Context().UserAgent())),
slog.Any("params", c.AllParams()),
slog.Int("length", len((c.Body()))),
}
// prep response attributes
responseAttributes := []slog.Attr{
slog.Time("time", end),
slog.Int("status", status),
slog.Int("length", len(c.Response().Body())),
}
// request query
if config.WithRequestQuery {
requestAttributes = append(requestAttributes, slog.String("query", string(c.Request().URI().QueryString())))
}
// request headers
if config.WithRequestHeader {
kv := []any{}
for k, v := range c.GetReqHeaders() {
if _, found := hiddenRequestHeaders[strings.ToLower(k)]; found {
continue
}
kv = append(kv, slog.Any(k, v))
}
requestAttributes = append(requestAttributes, slog.Group("header", kv...))
}
level := slog.LevelInfo
if err != nil || status >= http.StatusInternalServerError {
level = slog.LevelError
// error attributes
logErr := err
if logErr == nil {
logErr = fiber.NewError(status)
}
baseAttrs = append(baseAttrs, slog.Any("error", logErr))
}
if config.Disable && level == slog.LevelInfo {
return errors.WithStack(err)
}
logger.LogAttrs(c.UserContext(), level, "Request Completed", append([]slog.Attr{
{
Key: "request",
Value: slog.GroupValue(requestAttributes...),
},
{
Key: "response",
Value: slog.GroupValue(responseAttributes...),
},
}, baseAttrs...)...,
)
return errors.WithStack(err)
}
}