Merge branch 'feature/bitcoin-indexer' of github.com:gaze-network/indexer-network into feature/bitcoin-indexer

This commit is contained in:
Gaze
2024-04-22 15:58:52 +07:00
9 changed files with 360 additions and 41 deletions

View File

@@ -30,6 +30,7 @@ import (
"github.com/gaze-network/indexer-network/pkg/errorhandler"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/compress"
fiberrecover "github.com/gofiber/fiber/v2/middleware/recover"
@@ -122,6 +123,14 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
// use gracefulEG to coordinate graceful shutdown after context is done. (e.g. shut down http server, shutdown logic of each module, etc.)
gracefulEG, gctx := errgroup.WithContext(context.Background())
var reportingClient *reportingclient.ReportingClient
if !conf.Reporting.Disabled {
reportingClient, err = reportingclient.New(conf.Reporting)
if err != nil {
logger.PanicContext(ctx, "Failed to create reporting client", slogx.Error(err))
}
}
// Initialize Bitcoin Indexer
if opts.Bitcoin {
var (
@@ -203,7 +212,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
return errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource)
}
if !opts.APIOnly {
runesProcessor := runes.NewProcessor(runesDg, indexerInfoDg, bitcoinClient, bitcoinDatasource, conf.Network)
runesProcessor := runes.NewProcessor(runesDg, indexerInfoDg, bitcoinClient, bitcoinDatasource, conf.Network, reportingClient)
runesIndexer := indexers.NewBitcoinIndexer(runesProcessor, bitcoinDatasource)
if err := runesProcessor.VerifyStates(ctx); err != nil {
@@ -260,9 +269,9 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
// mount http handlers from each http-enabled module
for module, handler := range httpHandlers {
if err := handler.Mount(app); err != nil {
logger.PanicContext(ctx, "Failed to mount HTTP handler", slogx.Error(err), slog.String("module", module))
logger.PanicContext(ctx, "Failed to mount HTTP handler", slogx.Error(err), slogx.String("module", module))
}
logger.InfoContext(ctx, "Mounted HTTP handler", slog.String("module", module))
logger.InfoContext(ctx, "Mounted HTTP handler", slogx.String("module", module))
}
go func() {
logger.InfoContext(ctx, "Started HTTP server", slog.Int("port", conf.HTTPServer.Port))

View File

@@ -10,6 +10,13 @@ bitcoin_node:
network: mainnet
reporting:
disabled: false
base_url: "https://indexer.api.gaze.network" # defaults to "https://indexer.api.gaze.network" if empty
name: "local-dev" # name of this instance to show on the dashboard
website_url: "" # public website URL to show on the dashboard. Can be left empty.
indexer_api_url: "" # public url to access this api. Can be left empty.
้http_server:
port: 8080

View File

@@ -14,7 +14,7 @@ import (
)
const (
maxReorgLookBack = 100
maxReorgLookBack = 1000
)
type (

View File

@@ -12,6 +12,7 @@ import (
runesconfig "github.com/gaze-network/indexer-network/modules/runes/config"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)
@@ -31,11 +32,12 @@ var (
)
type Config struct {
Logger logger.Config `mapstructure:"logger"`
BitcoinNode BitcoinNodeClient `mapstructure:"bitcoin_node"`
Network common.Network `mapstructure:"network"`
HTTPServer HTTPServerConfig `mapstructure:"http_server"`
Modules Modules `mapstructure:"modules"`
Logger logger.Config `mapstructure:"logger"`
BitcoinNode BitcoinNodeClient `mapstructure:"bitcoin_node"`
Network common.Network `mapstructure:"network"`
HTTPServer HTTPServerConfig `mapstructure:"http_server"`
Modules Modules `mapstructure:"modules"`
Reporting reportingclient.Config `mapstructure:"reporting"`
}
type BitcoinNodeClient struct {

View File

@@ -14,8 +14,11 @@ const (
)
var startingBlockHeader = map[common.Network]types.BlockHeader{
// TODO: add starting block header for mainnet after block 840,000 is mined
common.NetworkMainnet: {},
common.NetworkMainnet: {
Height: 839999,
Hash: *utils.Must(chainhash.NewHashFromStr("0000000000000000000172014ba58d66455762add0512355ad651207918494ab")),
PrevBlock: *utils.Must(chainhash.NewHashFromStr("00000000000000000001dcce6ce7c8a45872cafd1fb04732b447a14a91832591")),
},
common.NetworkTestnet: {
Height: 2583200,
Hash: *utils.Must(chainhash.NewHashFromStr("000000000006c5f0dfcd9e0e81f27f97a87aef82087ffe69cd3c390325bb6541")),

View File

@@ -16,6 +16,7 @@ import (
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
"github.com/gaze-network/indexer-network/modules/runes/runes"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gaze-network/uint128"
"github.com/samber/lo"
)
@@ -28,6 +29,7 @@ type Processor struct {
bitcoinClient btcclient.Contract
bitcoinDataSource indexers.BitcoinDatasource
network common.Network
reportingClient *reportingclient.ReportingClient
newRuneEntries map[runes.RuneId]*runes.RuneEntry
newRuneEntryStates map[runes.RuneId]*runes.RuneEntry
@@ -37,13 +39,14 @@ type Processor struct {
newRuneTxs []*entity.RuneTransaction
}
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, bitcoinDataSource indexers.BitcoinDatasource, network common.Network) *Processor {
func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, bitcoinDataSource indexers.BitcoinDatasource, network common.Network, reportingClient *reportingclient.ReportingClient) *Processor {
return &Processor{
runesDg: runesDg,
indexerInfoDg: indexerInfoDg,
bitcoinClient: bitcoinClient,
bitcoinDataSource: bitcoinDataSource,
network: network,
reportingClient: reportingClient,
newRuneEntries: make(map[runes.RuneId]*runes.RuneEntry),
newRuneEntryStates: make(map[runes.RuneId]*runes.RuneEntry),
newOutPointBalances: make(map[wire.OutPoint][]*entity.OutPointBalance),
@@ -68,6 +71,11 @@ func (p *Processor) VerifyStates(ctx context.Context) error {
return errors.Wrap(err, "error during ensureGenesisRune")
}
}
if p.reportingClient != nil {
if err := p.reportingClient.SubmitNodeReport(ctx, "runes", p.network); err != nil {
return errors.Wrap(err, "failed to submit node report")
}
}
return nil
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/gaze-network/indexer-network/modules/runes/runes"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gaze-network/uint128"
"github.com/samber/lo"
)
@@ -702,38 +703,36 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
}()
// CreateIndexedBlock must be performed before other flush methods to correctly calculate event hash
{
eventHash, err := p.calculateEventHash(blockHeader)
if err != nil {
return errors.Wrap(err, "failed to calculate event hash")
eventHash, err := p.calculateEventHash(blockHeader)
if err != nil {
return errors.Wrap(err, "failed to calculate event hash")
}
prevIndexedBlock, err := runesDgTx.GetIndexedBlockByHeight(ctx, blockHeader.Height-1)
if err != nil && errors.Is(err, errs.NotFound) && blockHeader.Height-1 == startingBlockHeader[p.network].Height {
prevIndexedBlock = &entity.IndexedBlock{
Height: startingBlockHeader[p.network].Height,
Hash: startingBlockHeader[p.network].Hash,
EventHash: chainhash.Hash{},
CumulativeEventHash: chainhash.Hash{},
}
prevIndexedBlock, err := runesDgTx.GetIndexedBlockByHeight(ctx, blockHeader.Height-1)
if err != nil && errors.Is(err, errs.NotFound) && blockHeader.Height-1 == startingBlockHeader[p.network].Height {
prevIndexedBlock = &entity.IndexedBlock{
Height: startingBlockHeader[p.network].Height,
Hash: startingBlockHeader[p.network].Hash,
EventHash: chainhash.Hash{},
CumulativeEventHash: chainhash.Hash{},
}
err = nil
err = nil
}
if err != nil {
if errors.Is(err, errs.NotFound) {
return errors.Errorf("indexed block not found for height %d. Indexed block must be created for every Bitcoin block", blockHeader.Height)
}
if err != nil {
if errors.Is(err, errs.NotFound) {
return errors.Errorf("indexed block not found for height %d. Indexed block must be created for every Bitcoin block", blockHeader.Height)
}
return errors.Wrap(err, "failed to get indexed block by height")
}
cumulativeEventHash := chainhash.DoubleHashH(append(prevIndexedBlock.CumulativeEventHash[:], eventHash[:]...))
return errors.Wrap(err, "failed to get indexed block by height")
}
cumulativeEventHash := chainhash.DoubleHashH(append(prevIndexedBlock.CumulativeEventHash[:], eventHash[:]...))
if err := runesDgTx.CreateIndexedBlock(ctx, &entity.IndexedBlock{
Height: blockHeader.Height,
Hash: blockHeader.Hash,
PrevHash: blockHeader.PrevBlock,
EventHash: eventHash,
CumulativeEventHash: cumulativeEventHash,
}); err != nil {
return errors.Wrap(err, "failed to create indexed block")
}
if err := runesDgTx.CreateIndexedBlock(ctx, &entity.IndexedBlock{
Height: blockHeader.Height,
Hash: blockHeader.Hash,
PrevHash: blockHeader.PrevBlock,
EventHash: eventHash,
CumulativeEventHash: cumulativeEventHash,
}); err != nil {
return errors.Wrap(err, "failed to create indexed block")
}
// flush new rune entries
{
@@ -808,6 +807,23 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
if err := runesDgTx.Commit(ctx); err != nil {
return errors.Wrap(err, "failed to commit runes tx")
}
// submit event to reporting system
if p.reportingClient != nil {
if err := p.reportingClient.SubmitBlockReport(ctx, reportingclient.SubmitBlockReportPayload{
Type: "runes",
ClientVersion: Version,
DBVersion: DBVersion,
EventHashVersion: EventHashVersion,
Network: p.network,
BlockHeight: uint64(blockHeader.Height),
BlockHash: blockHeader.Hash,
EventHash: eventHash,
CumulativeEventHash: cumulativeEventHash,
}); err != nil {
return errors.Wrap(err, "failed to submit block report")
}
}
logger.InfoContext(ctx, "[RunesProcessor] block flushed")
return nil
}

View File

@@ -0,0 +1,170 @@
package httpclient
import (
"context"
"encoding/json"
"log/slog"
"net/url"
"strings"
"time"
"github.com/Cleverse/go-utilities/utils"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/valyala/fasthttp"
)
type Config struct {
// Enable debug mode
Debug bool
// Default headers
Headers map[string]string
}
type Client struct {
baseURL string
Config
}
func New(baseURL string, config ...Config) (*Client, error) {
if _, err := url.Parse(baseURL); err != nil {
return nil, errors.Wrap(err, "can't parse base url")
}
var cf Config
if len(config) > 0 {
cf = config[0]
}
if len(cf.Headers) == 0 {
cf.Headers = make(map[string]string)
}
return &Client{
baseURL: baseURL,
Config: cf,
}, nil
}
type RequestOptions struct {
path string
method string
Body []byte
Query url.Values
Header map[string]string
FormData url.Values
}
type HttpResponse struct {
URL string
fasthttp.Response
}
func (r *HttpResponse) UnmarshalBody(out any) error {
err := json.Unmarshal(r.Body(), out)
if err != nil {
return errors.Wrapf(err, "can't unmarshal json body from %v, %v", r.URL, string(r.Body()))
}
return nil
}
func (h *Client) request(ctx context.Context, reqOptions RequestOptions) (*HttpResponse, error) {
start := time.Now()
req := fasthttp.AcquireRequest()
req.Header.SetMethod(reqOptions.method)
for k, v := range h.Headers {
req.Header.Set(k, v)
}
for k, v := range reqOptions.Header {
req.Header.Set(k, v)
}
parsedUrl := utils.Must(url.Parse(h.baseURL)) // checked in httpclient.New
parsedUrl.Path = reqOptions.path
parsedUrl.RawQuery = reqOptions.Query.Encode()
// remove %20 from url (empty space)
url := strings.TrimSuffix(parsedUrl.String(), "%20")
url = strings.Replace(url, "%20?", "?", 1)
req.SetRequestURI(url)
if reqOptions.Body != nil {
req.Header.SetContentType("application/json")
req.SetBody(reqOptions.Body)
} else if reqOptions.FormData != nil {
req.Header.SetContentType("application/x-www-form-urlencoded")
req.SetBodyString(reqOptions.FormData.Encode())
}
resp := fasthttp.AcquireResponse()
startDo := time.Now()
defer func() {
if h.Debug {
logger := logger.With(
slog.String("method", reqOptions.method),
slog.String("url", url),
slog.Duration("duration", time.Since(start)),
slog.Duration("latency", time.Since(startDo)),
slog.Int("req_header_size", len(req.Header.Header())),
slog.Int("req_content_length", req.Header.ContentLength()),
)
if resp.StatusCode() >= 0 {
logger = logger.With(
slog.Int("status_code", resp.StatusCode()),
slog.String("resp_content_type", string(resp.Header.ContentType())),
slog.Int("resp_content_length", len(resp.Body())),
)
}
logger.Info("Finished make request")
}
fasthttp.ReleaseResponse(resp)
fasthttp.ReleaseRequest(req)
}()
if err := fasthttp.Do(req, resp); err != nil {
return nil, errors.Wrapf(err, "url: %s", url)
}
httpResponse := HttpResponse{
URL: url,
}
resp.CopyTo(&httpResponse.Response)
return &httpResponse, nil
}
func (h *Client) Do(ctx context.Context, method, path string, reqOptions RequestOptions) (*HttpResponse, error) {
reqOptions.path = path
reqOptions.method = method
return h.request(ctx, reqOptions)
}
func (h *Client) Get(ctx context.Context, path string, reqOptions RequestOptions) (*HttpResponse, error) {
reqOptions.path = path
reqOptions.method = fasthttp.MethodGet
return h.request(ctx, reqOptions)
}
func (h *Client) Post(ctx context.Context, path string, reqOptions RequestOptions) (*HttpResponse, error) {
reqOptions.path = path
reqOptions.method = fasthttp.MethodPost
return h.request(ctx, reqOptions)
}
func (h *Client) Put(ctx context.Context, path string, reqOptions RequestOptions) (*HttpResponse, error) {
reqOptions.path = path
reqOptions.method = fasthttp.MethodPut
return h.request(ctx, reqOptions)
}
func (h *Client) Patch(ctx context.Context, path string, reqOptions RequestOptions) (*HttpResponse, error) {
reqOptions.path = path
reqOptions.method = fasthttp.MethodPatch
return h.request(ctx, reqOptions)
}
func (h *Client) Delete(ctx context.Context, path string, reqOptions RequestOptions) (*HttpResponse, error) {
reqOptions.path = path
reqOptions.method = fasthttp.MethodDelete
return h.request(ctx, reqOptions)
}

View File

@@ -0,0 +1,104 @@
package reportingclient
import (
"context"
"encoding/json"
"log/slog"
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/pkg/httpclient"
"github.com/gaze-network/indexer-network/pkg/logger"
)
type Config struct {
Disabled bool `mapstructure:"disabled"`
BaseURL string `mapstructure:"base_url"`
Name string `mapstructure:"name"`
WebsiteURL string `mapstructure:"website_url"`
IndexerAPIURL string `mapstructure:"indexer_api_url"`
}
type ReportingClient struct {
httpClient *httpclient.Client
config Config
}
const defaultBaseURL = "https://indexer.api.gaze.network"
func New(config Config) (*ReportingClient, error) {
baseURL := utils.Default(config.BaseURL, defaultBaseURL)
httpClient, err := httpclient.New(baseURL)
if err != nil {
return nil, errors.Wrap(err, "can't create http client")
}
return &ReportingClient{
httpClient: httpClient,
config: config,
}, nil
}
type SubmitBlockReportPayload struct {
Type string `json:"type"`
ClientVersion string `json:"clientVersion"`
DBVersion int `json:"dbVersion"`
EventHashVersion int `json:"eventHashVersion"`
Network common.Network `json:"network"`
BlockHeight uint64 `json:"blockHeight"`
BlockHash chainhash.Hash `json:"blockHash"`
EventHash chainhash.Hash `json:"eventHash"`
CumulativeEventHash chainhash.Hash `json:"cumulativeEventHash"`
}
func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitBlockReportPayload) error {
body, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "can't marshal payload")
}
resp, err := r.httpClient.Post(ctx, "/v1/report/block", httpclient.RequestOptions{
Body: body,
})
if err != nil {
return errors.Wrap(err, "can't send request")
}
if resp.StatusCode() >= 400 {
logger.WarnContext(ctx, "failed to submit block report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
}
logger.DebugContext(ctx, "block report submitted", slog.Any("payload", payload))
return nil
}
type SubmitNodeReportPayload struct {
Name string `json:"name"`
Type string `json:"type"`
Network common.Network `json:"network"`
WebsiteURL string `json:"websiteURL,omitempty"`
IndexerAPIURL string `json:"indexerAPIURL,omitempty"`
}
func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, network common.Network) error {
payload := SubmitNodeReportPayload{
Name: r.config.Name,
Type: module,
Network: network,
WebsiteURL: r.config.WebsiteURL,
IndexerAPIURL: r.config.IndexerAPIURL,
}
body, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "can't marshal payload")
}
resp, err := r.httpClient.Post(ctx, "/v1/report/node", httpclient.RequestOptions{
Body: body,
})
if err != nil {
return errors.Wrap(err, "can't send request")
}
if resp.StatusCode() >= 400 {
logger.WarnContext(ctx, "failed to submit node report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
}
logger.InfoContext(ctx, "node report submitted", slog.Any("payload", payload))
return nil
}